1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| # 记录一下坑 1.TableAPI目前不支持HiveStreamTableSink,所以写不进去,可以读 2.CDH集群一定要注意引用的pom是CDH版本的
# 测试代码 object HiveDemoOnTable { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env)
val ds1 = env.socketTextStream("hadoop01", 9999, '\n')
val hiveCatalog = new HiveCatalog("test", "default", "hive_conf", "1.2.1") tEnv.registerCatalog("test", hiveCatalog) tEnv.useCatalog("test")
val table = tEnv.sqlQuery("select `topic`,`partition`,`offset`,msg,`c_date` from user_test_orc")
table.insertInto("user_test_orc")
env.execute("test") }
case class Order(topic: String, partition: Integer, offset: Integer, msg: String, c_date: String) }
|