Flink1.11新特性

整理下Flink1.11有什么新的功能,以及坑,只有自己用到的

Orc格式支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
OrcBulkWriterFactory
使用方式与ParquetAvroWriters
但是,目前并没有一个完善的Demo
本人在使用时出现有文件生成却没有数据的情况
无法判断是使用问题,还是Flink本身问题

val schema: String = "struct<platform:string,event:string,dt:string>"
val writerProperties: Properties = new Properties()
writerProperties.setProperty("orc.compress", "LZ4")

val writerFactory = new OrcBulkWriterFactory(new DemoVectorizer(schema), writerProperties, new Configuration())
import org.apache.flink.core.fs.Path
val sink = StreamingFileSink.forBulkFormat(new Path("./resources"),
writerFactory
).build()

student.addSink(sink).setParallelism(1)

execute的使用

1
2
3
4
5
6
7
8
9
1.11之前TableEnvironmentImpl与StreamExecutionEnvironment的execute方法实现一致
无论用哪一个都可以
1.11修改了TableEnvironmentImpl中execute的实现逻辑
如果代码中涉及了DataStream的操作,则需要使用StreamExecutionEnvironment的execute方法

简单概述为:
StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业
Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业
新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业 (异步提交作业),不需要再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()