Flink代码编写中的环境配置

在使用Env时可以配置调优的参数

OptimizerConfigOptions(优化配置)

BATCH_STREAMING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 聚合阶段策略
TABLE_OPTIMIZER_AGG_PHASE_STRATEGY 默认值AUTO
table.optimizer.agg-phase-strategy
AUTO: 聚合阶段没有特殊的执行器
TWO_PHASE: 强制使用具有localAggregate和globalAggregate的两阶段聚合;如果聚合调用不支持将优化分为两个阶段,则我们仍将使用一个阶段的聚合
ONE_PHASE: 强制使用只有CompleteGlobalAggregate的一个阶段聚合

# 重用sub-plans
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED 默认值true
table.optimizer.reuse-sub-plan-enabled
如果为true,优化器将尝试找出重复的sub-plans并重用它们

# 重用源表
TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED 默认值true
table.optimizer.reuse-source-enabled
如果为true,优化器将尝试找出重复的表源并重用它们,只有当TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED为true工作

# 谓词下推
TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED 默认值true
table.optimizer.source.predicate-pushdown-enabled
如果为true,优化器将把谓词下推到FilterableTableSource中

# 联接排序
TABLE_OPTIMIZER_JOIN_REORDER_ENABLED 默认值false
table.optimizer.join-reorder-enabled
在优化器中启用联接重新排序

BATCH

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 联接广播字节大小
TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD 默认值1048576L
table.optimizer.join.broadcast-threshold
配置执行联接时将广播给所有工作节点的表的最大字节大小.通过将此值设置为-1来禁用广播

# 聚合下推
TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED 默认值true
table.optimizer.source.aggregate-pushdown-enabled
如果为true,优化器将把聚合下推到SupportsAggregatePushDown中

# 多输入流
TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED 默认值true
table.optimizer.multiple-input-enabled
如果为true,优化器会将带有流水线shuffle的操作符合并为多输入操作符,以减少shuffle并提高性能

STREAMING

1
2
3
4
5
6
7
8
9
10
11
12
13
# 拆分聚合
TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED 默认值false
table.optimizer.distinct-agg.split.enabled
告诉优化器是否将不同聚合(例如COUNT(distinct col),SUM(distinct col))拆分为两个级别.
第一个聚合由一个额外的key进行洗牌,该key使用distinct_key的hashcode和bucket的数量进行计算.
当不同的聚合中存在数据倾斜时,这种优化非常有用,并提供了扩展作业的能力

# 拆分聚合桶数
TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM 默认值1024
table.optimizer.distinct-agg.split.bucket-num
在拆分不同聚合时配置存储桶数
该数字在第一级聚合中用于计算一个bucket key 'hash_code(distinct_key)%bucket_NUM'
它在拆分后用作另一个组key

ExecutionConfigOptions(执行配置)

BATCH_STREAMING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 插入NULL值到NOT NULL表
TABLE_EXEC_SINK_NOT_NULL_ENFORCER 默认值NotNullEnforcer.ERROR
table.exec.sink.not-null-enforcer
表上的NOT NULL列约束强制不能将NULL值插入表中

# 默认并行度
TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM 默认值-1
table.exec.resource.default-parallelism
为所有要与并行实例一起运行的运算符(如聚合,联接,筛选器)设置默认并行度
此配置的优先级高于StreamExecutionEnvironment的并行性
实际上,此配置会覆盖StreamExecutionEnvironment的并行性
值-1表示未设置默认并行性,然后它将回退以使用StreamExecutionEnvironment的并行性

# 最大异步查找JOIN数
TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY 默认值100
table.exec.async-lookup.buffer-capacity
异步查找联接可以触发的最大异步i/o操作数

# 异步操作超时时间
TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT 默认值3 min
table.exec.async-lookup.timeout
异步操作完成的异步超时

BATCH

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 排序后限制大小
TABLE_EXEC_SORT_DEFAULT_LIMIT 默认值-1
table.exec.sort.default-limit
当用户未在order by之后设置限制时的默认限制
-1表示忽略此配置

# 外部合并排序的最大扇入
TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES 默认值128
table.exec.sort.max-num-file-handles
外部合并排序的最大扇入
它限制每个操作员的文件句柄数
如果太小,可能会导致中间合并
但如果太大,会造成同时打开的文件太多,消耗内存,导致随机读取

# 合并已排序的溢出文件
TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED 默认值true
table.exec.sort.async-merge-enabled
是否异步合并已排序的溢出文件

# 压缩溢出数据
TABLE_EXEC_SPILL_COMPRESSION_ENABLED 默认值true
table.exec.spill-compression.enabled
是否压缩溢出的数据
目前我们只支持sort和hash agg以及hash join运算符的压缩溢出数据

# 压缩内存
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE 默认值64 kb
table.exec.spill-compression.block-size
溢出数据时用于压缩的内存大小
内存越大,压缩比越高,但作业将消耗更多的内存资源

# GroupWindow聚合元素限制
TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT 默认值100000
table.exec.window-agg.buffer-size-limit
设置组窗口agg运算符中使用的窗口元素缓冲区大小限制

# 禁用运算符
TABLE_EXEC_DISABLED_OPERATORS 无默认值
table.exec.disabled-operators
主要用于测试,运算符名称的逗号分隔列表,每个名称表示一种禁用的运算符
可以禁用的运算符包括
NestedLoopJoin
ShuffleHashJoin
BroadcastHashJoin
SortMergeJoin
HashAgg
SortAgg
默认情况下,不禁用任何运算符

# shuffle执行模式
TABLE_EXEC_SHUFFLE_MODE 默认值batch
table.exec.shuffle-mode
设置执行shuffle模式,只能设置batch或pipeline
batch: 作业将逐步运行
pipeline: 作业将以流模式运行,但当发送方持有资源等待向接收方发送数据时,接收方等待资源启动可能会导致资源死锁

STREAMING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 状态清除时间
IDLE_STATE_RETENTION 默认值0ms
table.exec.state.ttl
指定空闲状态的最短时间间隔,默认值为0,这意味着它永远不会清除状态

# Source空闲超时时间
TABLE_EXEC_SOURCE_IDLE_TIMEOUT 默认值-1 ms
table.exec.source.idle-timeout
当一个源在超时时间内没有接收到任何元素时,它将被标记为临时空闲
这允许下游任务在水印空闲时无需等待来自此源的水印就可以推进其水印

# CDC事件合并
TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE 默认值false
table.exec.source.cdc-events-duplicate
指示作业中的CDC(更改数据捕获)源是否会产生重复的更改事件
这些事件需要框架进行重复数据消除并获得一致的结果

# upsert-materialize操作
TABLE_EXEC_SINK_UPSERT_MATERIALIZE 默认值UpsertMaterialize.AUTO
table.exec.sink.upsert-materialize
在分布式系统中,由于混洗造成的ChangeLog数据的无序,Sink接收到的数据可能不是全局upsert的顺序
因此,在upsert sink之前添加upsert-materialize操作符
它接收上游变更日志记录,并为下游生成upsert视图
默认情况下,当唯一键上出现分布式无序时,将添加materialize操作符
您还可以选择无物化(NONE)或强制物化(FORCE)

# 小批量优化
TABLE_EXEC_MINIBATCH_ENABLED 默认值false
table.exec.mini-batch.enabled
指定是否启用小批量优化
MiniBatch是一种缓冲输入记录以减少状态访问的优化
这在默认情况下是禁用的
要启用此功能,用户应将此配置设置为true
注意: 如果启用了小批量,则必须设置'table.exec.mini batch.allow latency''table.exec.mini batch.size'

# 最大延迟
TABLE_EXEC_MINIBATCH_ALLOW_LATENCY 默认值-1 ms
table.exec.mini-batch.allow-latency
最大延迟可用于小批量缓冲输入记录
MiniBatch是一种缓冲输入记录以减少状态访问的优化
当达到最大缓冲记录数时,将以允许的延迟间隔触发MiniBatch
注意: 如果TABLE_EXEC_MINIBATCH_ENABLED设置为true,则其值必须大于零

# MiniBatch最大输入记录数
TABLE_EXEC_MINIBATCH_SIZE 默认值-1L
table.exec.mini-batch.size
可以为MiniBatch缓冲的最大输入记录数
MiniBatch是一种缓冲输入记录以减少状态访问的优化
当达到最大缓冲记录数时,将以允许的延迟间隔触发MiniBatch
注意: MiniBatch当前仅适用于非窗口聚合,如果TABLE_EXEC_MINIBATCH_ENABLED设置为true,则其值必须为正

不被包括的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 从Flink1.10开始,这被解释为一个权重提示,而不是绝对的内存需求
# 用户不需要更改这些经过仔细调整的权重提示

# 外部缓冲区大小
TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY 默认值10 mb
table.exec.resource.external-buffer-memory
设置"排序合并联接","嵌套联接""OVER"窗口中使用的外部缓冲区内存大小
注意: 内存大小只是一个权重提示,它会影响任务中单个操作员可以应用的内存权重
实际使用的内存取决于运行环境

# 哈希聚合managed内存大小
TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY 默认值128 mb
table.exec.resource.hash-agg.memory
设置哈希聚合运算符的managed内存大小
注意: 内存大小只是一个权重提示,它会影响任务中单个操作员可以应用的内存权重
实际使用的内存取决于运行环境

# 哈希联接managed内存大小
TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY 默认值128 mb
table.exec.resource.hash-join.memory
设置哈希联接运算符的managed内存,它定义了下限
注意: 内存大小只是一个权重提示,它会影响任务中单个操作员可以应用的内存权重
实际使用的内存取决于运行环境

# 排序缓冲区大小
TABLE_EXEC_RESOURCE_SORT_MEMORY 默认值128 mb
table.exec.resource.sort.memory
设置排序运算符的managed缓冲区内存大小
注意: 内存大小只是一个权重提示,它会影响任务中单个操作员可以应用的内存权重
实际使用的内存取决于运行环境

常用的配置

RelNodeBlock

1
2
3
4
5
# 在构造公共子图时禁用union all节点作为断点
table.optimizer.union-all-as-breakpoint-disabled 默认false

# 当为true时,优化器将尝试通过摘要找出重复的子计划来构建优化块(又称公共子图),每个优化块都将独立优化
table.optimizer.reuse-optimize-block-with-digest-enabled 默认false

WindowEmitStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# WaterMark到达窗口结束前的发射策略(提前)
table.exec.emit.early-fire.enabled 默认false

# WaterMark到达窗口结束前的发射间隔时间
table.exec.emit.late-fire.delay

# WaterMark到达窗口结束后的发射策略(延迟)
table.exec.emit.late-fire.enabled 默认false

# WaterMark到达窗口结束后的发射间隔时间
table.exec.emit.late-fire.delay

# 设置允许元素延迟的时间,到达水印后超过指定时间的元素将被删除
table.exec.emit.allow-lateness
注意:如果设置了该值,则使用该值,否则在表配置中使用"minIdleStateRetentionTime"
<0是非法配置
0表示禁用允许延迟
>0表示允许延迟

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// BATCH
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tEnv = TableEnvironment.create(settings)

// STREAMING
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
val tEnv = StreamTableEnvironment.create(env, settings)

tEnv.getConfig.getConfiguration.setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true)
tEnv.getConfig.getConfiguration.setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, false)
tEnv.getConfig.getConfiguration.setLong(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, 10485760L)
tEnv.getConfig.getConfiguration.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
tEnv.getConfig.getConfiguration.setInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
tEnv.getConfig.addConfiguration(GlobalConfiguration.loadConfiguration)