1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| val sql = s""" |CREATE TABLE fs_table ( | user_uid STRING, | `ref` BIGINT, | reply_attach STRING, | dt STRING, | h string |) PARTITIONED BY (dt,h) WITH ( | 'connector'='filesystem', | 'path'='hdfs:///tmp/test', | 'sink.partition-commit.policy.kind' = 'success-file', | 'format'='orc' |) |""".stripMargin
tableEnv.executeSql(sql);
tableEnv.executeSql( s""" |insert into fs_table |select before.user_id,before.`ref`,before.reply_attach, |DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), |DATE_FORMAT(LOCALTIMESTAMP, 'HH') |FROM test |""".stripMargin)
|