在上一篇有大概的涉及到一点,但方向更多的是Coordinator上,所以这里补充一下
调用流程
1 2 3 4 5 6
| KafkaConsumer.poll() ->KafkaConsumer.pollForFetches() ->Fetcher.fetchedRecords() ->Fetcher.fetchRecords(PartitionRecords partitionRecords, int maxRecords) ->Fetcher.fetchRecords(int maxRecords) ->Fetcher.parseRecord()
|
描述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| KafkaConsumer.poll() 检查这个consumer是否订阅相应的tp 调用pollForFetches()方法获取相应的records 如果在给定时间内获取不到可用的records返回空数据
pollForFetches() coordinator.timeToNextPoll(): 下一次调用poll的时间 fetchedRecords(): 实际获取数据方法 sendFetches(): 发送fetch请求 client.poll(): 调用底层NetworkClient发送相应的请求 coordinator.rejoinNeededOrPending(): 如果实例分配的tp列表发生变化,consumergroup需要rebalance fetchRecords(PartitionRecords partitionRecords, int maxRecords) 处理PartitionRecords对象,在这个里面会去验证fetchOffset是否能对得上 只有fetchOffset是一致的情况下才会去处理相应的数据,并更新the fetch offset信息 如果不一致,不会处理,the fetch offset就不会更新 下次fetch请求时会接着这个位置去请求相应的数据
|
更新offset的position
1 2 3 4 5 6 7
| 主要实现在KafkaConsumer.poll中 updateAssignmentMetadataIfNeeded()方法 updateFetchPositions() resetOffsetsIfNeeded() resetOffsetsAsync() resetOffsetIfNeeded() seek()
|