1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM) groupCoordinator.startup()
def startup(enableMetadataExpiration: Boolean = true) { info("Starting up.") groupManager.startup(enableMetadataExpiration) isActive.set(true) info("Startup complete.") }
private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging { private var state: GroupState = initialState var generationId = 0 private var leaderId: Option[String] = None private var protocol: Option[String] = None private val members = new mutable.HashMap[String, MemberMetadata] private var numMembersAwaitingJoin = 0 private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0) private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] }
private[group] class MemberMetadata(val memberId: String, val groupId: String, val clientId: String, val clientHost: String, val rebalanceTimeoutMs: Int, val sessionTimeoutMs: Int, val protocolType: String, var supportedProtocols: List[(String, Array[Byte])])
|