Flink的Runtime
介绍Flink Runtime的作业执行的核心机制
架构
首先Flink是可以运行在多种环境中的,如Standalone,Yarn,K8S之类;Flink Runtime层采用了标准的Master-Slave架构.
Client
(不属于Runtime)Master
- JobManager
- Dispatcher
- ResourceManager
Slave
- TaskManager
Akka
(角色通信)Netty
(数据传输)
Dispatcher
负责负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的JobManager
组件。ResourceManager
负责资源的管理,在整个Flink集群中只有一个ResourceManager
JobManager
负责管理作业的执行,在一个Flink集群中可能有多个作业同时执行,每个作业都有自己的JobManager
组件
执行流程
用户提交作业,提交脚本会首先启动一个Client
进程负责作业的编译与提交
首先将用户编写的代码编译为一个JobGraph
(会进行一些检查或优化等工作)Client
将产生的JobGraph
提交到集群中执行
两种情况
1 | Standalone这种Session模式,AM会预先启动,此时Client直接与Dispatcher建立连接并提交作业即可 |
作业到Dispatcher
后,Dispatcher
会首先启动一个JobManager
组件JobManager
会向ResourceManager
申请资源来启动作业中具体的任务
两种情况
1 | 根据Session和Per-Job模式的区别,TaskExecutor可能已经启动或者尚未启动 |
TaskExecutor
的资源是通过Slot
来描述的,一个Slot
一般可以执行一个具体的Task
ResourceManager
选择到空闲的Slot
之后,就会通知相应的TM
将该Slot
分配分JobManager
TaskExecutor
进行相应的记录后,会向JobManager
进行注册JobManager
收到TaskExecutor
注册上来的Slot
后,就可以实际提交Task
了TaskExecutor
收到JobManager
提交的Task
之后,会启动一个新的线程来执行该Task
Task
启动后就会开始进行预先指定的计算,并通过数据Shuffle
模块互相交换数据
注意
1 | Flink支持两种不同的模式,Per-job模式与Session模式 |
作业调度
Flink中,资源是由TaskExecutor
上的Slot
来表示的,每个Slot
可以用来执行不同的Task
任务即Job
中实际的Task
,它包含了待执行的用户逻辑
调度的主要目的就是为了给Task
找到匹配的Slot
在ResourceManager
中有一个子组件叫做SlotManager
,它维护了当前集群中所有TaskExecutor
上的Slot
的信息与状态
如该Slot
在哪个TaskExecutor
中,该Slot
当前是否空闲等
当JobManger
来为特定Task
申请资源的时候,根据当前是Per-job
还是Session
模式,ResourceManager
可能会去申请资源来启动新的TaskExecutor
当TaskExecutor
启动之后,它会通过服务发现找到当前活跃的ResourceManager
并进行注册
注册信息中,会包含该TaskExecutor
中所有Slot
的信息ResourceManager
收到注册信息后,其中的SlotManager
就会记录下相应的Slot
信息
当JobManager
为某个Task
来申请资源时,SlotManager
就会从当前空闲的Slot
中按一定规则选择一个空闲的Slot
进行分配
当分配完成后RM
会首先向TaskManager
发送RPC
要求将选定的Slot
分配给特定的JobManager
TaskManager
如果还没有执行过该JobManager
的Task
的话,它需要首先向相应的JobManager
建立连接,然后发送提供Slot
的RPC
请求
在JobManager
中,所有Task
的请求会缓存到SlotPool
中
当有Slot
被提供之后,SlotPool
会从缓存的请求中选择相应的请求并结束相应的请求过程
当Task
结束之后,无论是正常结束还是异常结束,都会通知JobManager
相应的结束状态TaskManager
端将Slot
标记为已占用但未执行任务的状态JobManager
会首先将相应的Slot
缓存到SlotPool
中,但不会立即释放
这种方式避免了如果将Slot
直接还给ResourceManager
,在任务异常结束之后需要重启时,需要立刻重新申请Slot
的问题
通过延时释放,Failover
的Task
可以尽快调度回原来的TaskManager
,从而加快Failover
的速度
当SlotPool
中缓存的Slot
超过指定的时间仍未使用时,SlotPool
就会发起释放该Slot
的过程
与申请Slot
的过程对应,SlotPool
会首先通知TaskManager
来释放该Slot
TaskExecutor
通知ResourceManager
该Slot
已经被释放,从而最终完成释放的逻辑
注意
1 | 除了正常的通信逻辑外,在ResourceManager和TaskExecutor之间还存在定时的心跳消息来同步Slot的状态 |
调度方式
1 | Eager调度(适用于流作业) |
错误恢复
整体上来说,错误可能分为两大类:Task
执行出现错误或Flink
集群的Master
出现错误
第一类错误恢复策略
1 | Restart-all,重启所有的Task |
第二类错误恢复策略
1 | Restart-individual |