1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.test.flink.stream.sql
import java.util import java.util.{HashMap, Map} import org.apache.flink.addons.hbase.{HBaseOptions, HBaseTableFactory, HBaseTableSchema, HBaseTableSource, HBaseUpsertTableSink, HBaseWriteOptions} import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableSchema} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.descriptors.DescriptorProperties import org.apache.flink.table.descriptors.Schema.SCHEMA import org.apache.flink.table.factories.TableFactoryService import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.flink.types.Row
object HBaseSinkExample { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
val columnNames = Array("test", "info") val f1 = Types.ROW_NAMED(Array[String]("name", "age"), Types.STRING, Types.STRING) val columnTypes = Array[TypeInformation[_]](Types.STRING, f1) val tableSchema = new TableSchema(columnNames, columnTypes)
val tableProperties = new util.HashMap[String, String] tableProperties.put("connector.type", "hbase") tableProperties.put("connector.version", "1.4.3") tableProperties.put("connector.property-version", "1") tableProperties.put("connector.table-name", "user") tableProperties.put("connector.zookeeper.quorum", "hadoop01:2181") tableProperties.put("connector.zookeeper.znode.parent", "/hbase") tableProperties.put("connector.write.buffer-flush.max-size", "10mb") tableProperties.put("connector.write.buffer-flush.max-rows", "1000") tableProperties.put("connector.write.buffer-flush.interval", "10s")
val descriptorProperties = new DescriptorProperties(true) descriptorProperties.putTableSchema(SCHEMA, tableSchema) descriptorProperties.putProperties(tableProperties) val sink = TableFactoryService.find(classOf[HBaseTableFactory], descriptorProperties.asMap).createTableSink(descriptorProperties.asMap) tableEnv.registerTableSink("hbaseTable", sink)
val ds = bsEnv.socketTextStream("hadoop01", 9999, '\n') val source = ds.flatMap(_.split(" ")).map(x => { Source(x, "name", "age") }) tableEnv.registerDataStream("demoTable", source, 'user, 'result, 'age, 'proctime.proctime) val sql = "insert into hbaseTable select user, ROW(`result`,age) from demoTable" tableEnv.sqlUpdate(sql) tableEnv.execute("test") }
case class Source(user: String, result: String, age: String)
}
|