官网例子实现,主要还是语法与数据类型的使用,官网并没有详细的Demo
KafkaSQL代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.test.flink.stream.sqlimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.types.Row import org.apache.flink.table.api.scala._import org.apache.flink.api.scala._object KafkaSQLExample { def main (args: Array [String ]): Unit = { val bsEnv = StreamExecutionEnvironment .getExecutionEnvironment val bsSettings = EnvironmentSettings .newInstance().useBlinkPlanner().inStreamingMode().build val tableEnv = StreamTableEnvironment .create(bsEnv, bsSettings) val sql = "create table test (" + "`business` varchar," + "`ts` bigint" + ") with (" + " 'connector.type' = 'kafka', " + " 'connector.version' = '0.10', " + " 'connector.topic' = 'test', " + " 'update-mode' = 'append', " + " 'connector.properties.0.key' = 'zookeeper.connect', " + " 'connector.properties.0.value' = 'hadoop01:2181', " + " 'connector.properties.1.key' = 'bootstrap.servers', " + " 'connector.properties.1.value' = 'hadoop01:9092', " + " 'connector.properties.2.key' = 'group.id', " + " 'connector.properties.2.value' = 'kafkasql', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json', " + " 'format.derive-schema' = 'true' " + ")" tableEnv.sqlUpdate(sql) tableEnv.toAppendStream[Row ](tableEnv.sqlQuery("select * from test" )).print() tableEnv.execute("" ) } }
HBaseSQL代码 注意: 不知道是否是版本问题,HBase-1.2.0版本运行下列代码会报Can't get the location for replica 0
错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.test.flink.stream.sqlimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.scala._import org.apache.flink.api.scala._import org.apache.flink.types.Row object HBaseSQLExample { def main (args: Array [String ]): Unit = { System .setProperty("HADOOP_USER_NAME" , "hdfs" ) val bsEnv = StreamExecutionEnvironment .getExecutionEnvironment val bsSettings = EnvironmentSettings .newInstance().useBlinkPlanner().inStreamingMode().build val tableEnv = StreamTableEnvironment .create(bsEnv, bsSettings) val sql = "create table test (" + "`name` string," + "`info` ROW<name varchar, age varchar>" + ") with (" + " 'connector.type' = 'hbase', " + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'user', " + " 'connector.zookeeper.quorum' = 'hadoop01:2181', " + " 'connector.zookeeper.znode.parent' = '/hbase', " + " 'connector.write.buffer-flush.max-size' = '1mb', " + " 'connector.write.buffer-flush.max-rows' = '1', " + " 'connector.write.buffer-flush.interval' = '2s' " + ")" tableEnv.sqlUpdate(sql) tableEnv.toAppendStream[Row ](tableEnv.sqlQuery("select * from test" )).print() tableEnv.execute("" ) } }