1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| package com.test.flink.stream.dim.redis
import com.test.flink.redis.RedisAsyncLookupTableSource import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableSchema} import org.apache.flink.api.java.io.jdbc.{JDBCAppendTableSink, JDBCOptions, JDBCUpsertTableSink} import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row
object DoubleStreamRedisDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tEnv = StreamTableEnvironment.create(env, settings) val ds = env.socketTextStream("hadoop01", 9999, '\n') val demo = ds.flatMap(_.split(" ")).map(x => { val arr = x.split(",") Demo(arr(0), arr(1), arr(2)) })
tEnv.registerDataStream("user_click_name", demo, 'id, 'user_click, 'time, 'proctime.proctime)
val redisSource = RedisAsyncLookupTableSource.Builder.newBuilder().withFieldNames(Array("id", "name", "age")) .withFieldTypes(Array(Types.STRING, Types.STRING, Types.STRING)) .build() tEnv.registerTableSource("info", redisSource)
val sql = "select * " + " from user_click_name as t1" + " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" + " on t1.id = t2.id"
val table = tEnv.sqlQuery(sql) val tableName = table.toString tEnv.toAppendStream[Row](table).print()
val sinkA = JDBCAppendTableSink.builder() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/world?autoReconnect=true&failOverReadOnly=false&useSSL=false") .setUsername("root") .setPassword("123456") .setQuery("insert into test (uid) values (?)") .setBatchSize(1) .setParameterTypes(Types.STRING) .build() val sinkB = JDBCUpsertTableSink.builder() .setOptions(JDBCOptions.builder() .setDriverName("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/world?autoReconnect=true&failOverReadOnly=false&useSSL=false") .setUsername("root") .setPassword("123456") .setTableName("test") .build()) .setTableSchema(TableSchema.builder() .field("uid", DataTypes.STRING()) .build()) .setFlushIntervalMills(1) .build()
tEnv.execute("") }
case class Demo(id: String, user_click: String, time: String)
}
|