Flink的Runtime

介绍Flink Runtime的作业执行的核心机制

架构

首先Flink是可以运行在多种环境中的,如Standalone,Yarn,K8S之类;Flink Runtime层采用了标准的Master-Slave架构.

  • Client(不属于Runtime)
  • Master
    • JobManager
    • Dispatcher
    • ResourceManager
  • Slave
    • TaskManager
  • Akka(角色通信)
  • Netty(数据传输)

Dispatcher负责负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的JobManager组件。
ResourceManager负责资源的管理,在整个Flink集群中只有一个ResourceManager
JobManager负责管理作业的执行,在一个Flink集群中可能有多个作业同时执行,每个作业都有自己的JobManager组件


执行流程

用户提交作业,提交脚本会首先启动一个Client进程负责作业的编译与提交
首先将用户编写的代码编译为一个JobGraph(会进行一些检查或优化等工作)
Client将产生的JobGraph提交到集群中执行
两种情况

1
2
Standalone这种Session模式,AM会预先启动,此时Client直接与Dispatcher建立连接并提交作业即可
Per-Job模式,AM不会预先启动,此时Client将首先向资源管理系统<如Yarn,K8S>申请资源来启动AM,然后再向AM中的Dispatcher提交作业

作业到Dispatcher后,Dispatcher会首先启动一个JobManager组件
JobManager会向ResourceManager申请资源来启动作业中具体的任务
两种情况

1
2
3
根据Session和Per-Job模式的区别,TaskExecutor可能已经启动或者尚未启动
Session模式,ResourceManager中已有记录了TaskExecutor注册的资源,可以直接选取空闲资源进行分配
Per-Job模式,ResourceManager也需要首先向外部资源管理系统申请资源来启动TaskExecutor,然后等待TaskExecutor注册相应资源后再继续选择空闲资源进程分配

TaskExecutor的资源是通过Slot来描述的,一个Slot一般可以执行一个具体的Task
ResourceManager选择到空闲的Slot之后,就会通知相应的TM将该Slot分配分JobManager
TaskExecutor进行相应的记录后,会向JobManager进行注册
JobManager收到TaskExecutor注册上来的Slot后,就可以实际提交Task
TaskExecutor收到JobManager提交的Task之后,会启动一个新的线程来执行该Task
Task启动后就会开始进行预先指定的计算,并通过数据Shuffle模块互相交换数据

注意

1
2
3
4
5
6
7
8
Flink支持两种不同的模式,Per-job模式与Session模式
Per-job模式下整个Flink集群只执行单个作业,即每个作业会独享Dispatcher和ResourceManager组件
Per-job模式下AppMaster和TaskExecutor都是按需申请的
Per-job模式更适合运行执行时间较长的大作业,这些作业对稳定性要求较高,并且对申请资源的时间不敏感

Session模式下,Flink预先启动AppMaster以及一组TaskExecutor
然后在整个集群的生命周期中会执行多个作业
Session模式更适合规模小,执行时间短的作业。

作业调度

Flink中,资源是由TaskExecutor上的Slot来表示的,每个Slot可以用来执行不同的Task
任务即Job中实际的Task,它包含了待执行的用户逻辑
调度的主要目的就是为了给Task找到匹配的Slot


ResourceManager中有一个子组件叫做SlotManager,它维护了当前集群中所有TaskExecutor上的Slot的信息与状态
如该Slot在哪个TaskExecutor中,该Slot当前是否空闲等
JobManger来为特定Task申请资源的时候,根据当前是Per-job还是Session模式,ResourceManager可能会去申请资源来启动新的TaskExecutor


TaskExecutor启动之后,它会通过服务发现找到当前活跃的ResourceManager并进行注册
注册信息中,会包含该TaskExecutor中所有Slot的信息
ResourceManager收到注册信息后,其中的SlotManager就会记录下相应的Slot信息
JobManager为某个Task来申请资源时,SlotManager就会从当前空闲的Slot中按一定规则选择一个空闲的Slot进行分配
当分配完成后RM会首先向TaskManager发送RPC要求将选定的Slot分配给特定的JobManager
TaskManager如果还没有执行过该JobManagerTask的话,它需要首先向相应的JobManager建立连接,然后发送提供SlotRPC请求
JobManager中,所有Task的请求会缓存到SlotPool
当有Slot被提供之后,SlotPool会从缓存的请求中选择相应的请求并结束相应的请求过程


Task结束之后,无论是正常结束还是异常结束,都会通知JobManager相应的结束状态
TaskManager端将Slot标记为已占用但未执行任务的状态
JobManager会首先将相应的Slot缓存到SlotPool中,但不会立即释放
这种方式避免了如果将Slot直接还给ResourceManager,在任务异常结束之后需要重启时,需要立刻重新申请Slot的问题
通过延时释放,FailoverTask可以尽快调度回原来的TaskManager,从而加快Failover的速度


SlotPool中缓存的Slot超过指定的时间仍未使用时,SlotPool就会发起释放该Slot的过程
与申请Slot的过程对应,SlotPool会首先通知TaskManager来释放该Slot
TaskExecutor通知ResourceManagerSlot已经被释放,从而最终完成释放的逻辑
注意

1
除了正常的通信逻辑外,在ResourceManager和TaskExecutor之间还存在定时的心跳消息来同步Slot的状态

调度方式

1
2
3
4
5
6
7
8
9
10
11
12
Eager调度(适用于流作业)
Eager调度如其名子所示,它会在作业启动时申请资源将所有的Task调度起来
这种调度算法主要用来调度可能没有终止的流作业

Lazy From Source(适用于批作业)
Lazy From Source是从Source开始,按拓扑顺序来进行调度
简单来说,Lazy From Source会先调度没有上游任务的Source任务
当这些任务执行完成时,它会将输出数据缓存到内存或者写入到磁盘中
然后,对于后续的任务,当它的前驱任务全部执行完成后
Flink就会将这些任务调度起来
这些任务会从读取上游缓存的输出数据进行自己的计算
这一过程继续进行直到所有的任务完成计算

错误恢复

整体上来说,错误可能分为两大类:Task执行出现错误或Flink集群的Master出现错误

第一类错误恢复策略

1
2
3
4
Restart-all,重启所有的Task
对于Flink的流任务,由于Flink提供Checkpoint机制
因此当任务重启后可以直接从上次的Checkpoint开始继续执行
因此这种方式更适合于流作业

第二类错误恢复策略

1
2
3
Restart-individual
只适用于Task之间没有数据传输的情况
这种情况下,我们可以直接重启出错的任务