1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| # 定义时间表函数 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env)
val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L))
val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
tEnv.registerTable("RatesHistory", ratesHistory)
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)
tEnv.registerFunction("Rates", rates)
# 定义时间表,只被Blink支持 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)
val rates = new HBaseTableSource(conf, "Rates") rates.setRowKey("currency", String.class) rates.addColumn("fam1", "rate", Double.class)
tEnv.registerTableSource("Rates", rates)
|