1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| def createKafkaSinkTable(): String = { """ |CREATE TABLE demo1 ( | business VARCHAR COMMENT 'uid', | pv BIGINT COMMENT 'pv', | t_start TIMESTAMP(3) COMMENT '开始时间', | t_end TIMESTAMP(3) COMMENT '结束时间' |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = 'universal', -- kafka 版本 | 'connector.topic' = 'test01_sink', -- kafka topic | 'connector.properties.0.key' = 'zookeeper.connect', -- zk连接信息 | 'connector.properties.0.value' = 'cdh04:2181', -- zk连接信息 | 'connector.properties.1.key' = 'bootstrap.servers', -- broker连接信息 | 'connector.properties.1.value' = 'cdh04:9092', -- broker连接信息 | 'connector.sink-partitioner' = 'fixed', | 'update-mode' = 'append', | 'format.type' = 'json', -- 数据源格式为 json | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) """.stripMargin }
val query = """ |SELECT business,COUNT(1) as pv, | HOP_START(rowtime, INTERVAL '5' second, INTERVAL '10' second) as t_start, | HOP_END(rowtime, INTERVAL '5' second, INTERVAL '10' second) as t_end |FROM test |GROUP BY business,HOP(rowtime, INTERVAL '5' second, INTERVAL '10' second) """.stripMargin val res1 = tEnv.sqlQuery(query)
tEnv.sqlUpdate(createKafkaSinkTable)
tEnv.sqlUpdate( s""" |INSERT INTO demo1 |SELECT * |FROM $res1 |""".stripMargin )
|