Flink的SavePoint和CheckPoint

最初的目的是为了处理Flink流程序异常退出,如何恢复数据

1.配置文件

1
2
3
4
5
6
7
8
9
vi /usr/local/flink-1.7.2/conf/flink-conf.yaml

state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs:///flink/checkpoints

state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs:///flink/savepoints

# 没有指定上述两个目录,执行命令时需要手动指定

2.CheckPoint

用处

  • CheckPoint主要用于自动故障恢复.
  • 由Flink自动创建,拥有和发布,不需要用户区交互.
  • 当作业被cancel之后,CheckPoint会被删除,除非设置了ExternalizedCheckpoint的保留机制.

配置

1
2
3
vi /usr/local/flink-1.7.2/conf/flink-conf.yaml
# 设置CheckPoint默认保留数量
state.checkpoints.num-retained: 20

代码

1
2
3
4
5
6
7
8
9
10
11
senv.enableCheckpointing(500)
// 设置checkpoint保存目录
senv.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"))
val conf = senv.getCheckpointConfig
// 取消作业时删除检查点.
conf.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 取消作业时保留检查点.
conf.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

脚本指定对应的checkpoint:
flink1.8 run -m yarn-cluster -ynm FlinkBehaviorTrace1.8 -yn 1 -ys 1 -ytm 1024 -s hdfs:///flink/checkpoints/data/FlinkBehaviorTrace1.8/check_id/1e87f3f4092026ef36f115f073147c39/chk-2064658/_metadata /home/etiantian/zsd/flink-project/flink1.8-behavior-trace-graphic/flink1.8-behavior-trace/flink1.8-behavior-trace-graphic-full.jar /home/etiantian/zsd/config/flink-config.properties

3.SavePoint

用处

  • SavePoint是通过CheckPoint机制为Streaming Job创建的一致性快照.
  • 需要手动触发,这点与CheckPoint有区别.
  • SavePoint由用户拥有,创建和删除,在作业停止之后仍然保存.
  • 一般用于Flink版本升级,业务迁移,集群迁移,数据不允许丢失的情况

命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 触发SavePoint
## Flink not on Yarn
flink savepoint 1a32cab47537102d70e3a1a885fc431c hdfs:///flink/savepoints
## Flink on Yarn
flink savepoint 1a32cab47537102d70e3a1a885fc431c hdfs:///flink/savepoints -yid application_1562025913394_0001

# 查看list
## Flink not on Yarn
flink list
## Flink on Yarn
flink list yarn-cluster -yid application_1562025913394_0001

# cancel触发savepoint
## Flink not on Yarn
flink cancel -s hdfs:///flink/savepoints 1a32cab47537102d70e3a1a885fc431c
## Flink on Yarn
flink cancel -s hdfs:///flink/savepoints 1a32cab47537102d70e3a1a885fc431c yarn-cluster -yid application_1562025913394_0001

# 使用savepoint
## Flink not on Yarn
flink run -s hdfs:///flink/savepoints -m valid1.jar
## Flink on Yarn
flink run -s hdfs:///flink/savepoints -m yarn-cluster valid1.jar

# 删除savepoint
## Flink not on Yarn
flink savepoint -d hdfs:///flink/savepoints
## Flink on Yarn
flink savepoint -d hdfs:///flink/savepoints yarn-cluster -yid application_1562025913394_0001

4.疑惑

如果Flink有自定义的变量值,那么从检查点恢复,这个变量值是初始的,还是程序当前的值.