当前位置 : 主页 > 编程语言 > java >

Kafka消费客户端协调器GroupCoordinator详解

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 协调器的生命周期 GroupCoordinator的创建 offsetConfig相关配置 groupConfig相关配置 groupMetadataManager heartbeatPurgatory GroupCoordinator的启动 GroupCoordinator OnElection GroupCoordinator onResignation 协调器的
目录
  • 协调器的生命周期
    • GroupCoordinator的创建
    • offsetConfig相关配置
  • groupConfig相关配置
    • groupMetadataManager
    • heartbeatPurgatory
    • GroupCoordinator的启动
    • GroupCoordinator OnElection
    • GroupCoordinator onResignation

协调器的生命周期

  • 什么是协调器
  • 协调器工作原理
  • 协调器的Rebalance机制

GroupCoordinator的创建

在Kafka启动的时候, 会自动创建并启动GroupCoordinator

这个GroupCoordinator对象创建的时候传入的几个属性需要介绍一下

    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)

offsetConfig相关配置

  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
    maxMetadataSize = config.offsetMetadataMaxSize,
    loadBufferSize = config.offsetsLoadBufferSize,
    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
  )
属性介绍默认值offset.metadata.max.bytes  offsets.load.buffer.size  offsets.retention.minutes  offsets.retention.check.interval.ms  offsets.topic.num.partitions  offsets.commit.timeout.ms  offsets.topic.segment.bytes  offsets.topic.replication.factor  offsets.topic.compression.codec  offsets.commit.timeout.ms  offsets.commit.required.acks  

groupConfig相关配置

属性介绍默认值group.min.session.timeout.ms  group.max.session.timeout.ms  group.initial.rebalance.delay.ms  group.max.size  group.initial.rebalance.delay.ms  

groupMetadataManager

组元信息管理类

heartbeatPurgatory

心跳监测操作,每一秒执行一次

joinPurgatory

GroupCoordinator的启动

  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

这个启动对于GroupCoordinator来说只是给属性isActive标记为了true, 但是同时呢也调用了GroupMetadataManager.startup

定时清理Group元信息

这个Group元信息管理类呢启动了一个定时任务, 名字为:delete-expired-group-metadata

每隔600000ms的时候就执行一下 清理过期组元信息的操作, 这个600000ms时间是代码写死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

当内部topic __consumer_offsets 有分区的Leader变更的时候,比如触发了 LeaderAndIsr的请求, 发现分区Leader进行了切换。

那么就会执行 GroupCoordinator#OnElection 的接口, 这个接口会把任务丢个一个单线程的调度程序, 专门处理offset元数据缓存加载和卸载的。线程名称前缀为group-metadata-manager- ,一个分区一个任务

最终执行的任务内容是:GroupMetadataManager#doLoadGroupsAndOffsets

__consumer_offsets 的key有两种消息类型

key version 0: 消费组消费偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消费组消费偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消费组的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue

Version-0

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-1

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-2

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-3

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		group_instance_id: NULLABLE_STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Value每个版本的 Scheme如下

  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))

GroupCoordinator onResignation

以上就是Kafka消费客户端协调器GroupCoordinator详解的详细内容,更多关于Kafka GroupCoordinator的资料请关注自由互联其它相关文章!

上一篇:Java中泛型的示例详解
下一篇:没有了
网友评论