1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 向GroupCoordinator发送join-group,sync-group请求,获取assign的TP-list ensureCoordinatorReady()->startHeartbeatThreadIfNeeded()->joinGroupIfNeeded() joinGroupIfNeeded()->initiateJoinGroup()->sendJoinGroupRequest()->JoinGroupResponseHandler.handle()->onJoinLeader/onJoinFollower->sendSyncGroupRequest()->SyncGroupResponseHandler onJoinComplete 1.如果Group是新的GroupId,那么此时group初始化状态为Empty 2.当GroupCoordinator接收到consumer的join-group请求后,group的member列表为空,第一个被加入的member被选为leader 3.如果GroupCoordinator接收到leader发送join-group请求,将会触发rebalance,group状态变为PreparingRebalance 4.此时GroupCoordinator将会等待,在一定时间内,接收到join-group请求的consumer将被认为是存活的,此时group变为AwaitSync状态,并且GroupCoordinator会向这个group的所有member返回其response 5.consumer在接收到GroupCoordinator的response后,如果这个consumer是group的leader,那么这个consumer将会负责为整个group assign partition订阅安排,然后leader将分配后的信息以sendSyncGroupResult()请求的方式发给GroupCoordinator,而作为follower的consumer实例会发送一个空列表 6.GroupCoordinator在接收到leader发来的请求后,将assign的结果返回给所有已经发送sync-group请求的consumer实例,并且group的状态变为Stable,如果后续再收到sync-group请求,将会直接返回其分配结果
当一个consumer实例加入group成功后,触发onJoinComplete() 更新订阅的tp列表,更新其对应的metadata以及触发注册的listener
|