当前位置 : 主页 > 编程语言 > java >

寻找协调器FindCoordinatorRequest请求流程

来源:互联网 收集:自由互联 发布时间:2022-09-29
文章目录 ​​客户端发起请求​​ ​​Broker处理请求​​ ​​简单校验​​ ​​获取分区号和元信息​​ ​​构建返回数据 createResponse​​ ​​问题​​ 客户端发起请求 我们在分


文章目录

  • ​​客户端发起请求​​
  • ​​Broker处理请求​​
  • ​​简单校验​​
  • ​​获取分区号和元信息​​
  • ​​构建返回数据 createResponse​​
  • ​​问题​​

客户端发起请求

我们在分析消费者的时候, 有看到调用FindCoordinatorRequest的请求

private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}

Broker处理请求

def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val findCoordinatorRequest = request.body[FindCoordinatorRequest]

// 根据协调器类型判断是否授权过
if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
!authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
!authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else {
// get metadata (and create the topic if necessary)
val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
case CoordinatorType.GROUP =>
val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
(partition, metadata)

case CoordinatorType.TRANSACTION =>
val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
(partition, metadata)

case _ =>
throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
}

def createResponse(requestThrottleMs: Int): AbstractResponse = {
def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setErrorCode(error.code)
.setErrorMessage(error.message)
.setNodeId(node.id)
.setHost(node.host)
.setPort(node.port)
.setThrottleTimeMs(requestThrottleMs))
}
val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
val coordinatorEndpoint = topicMetadata.partitions.asScala
.find(_.partitionIndex == partition)
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
.flatMap(_.getNode(request.context.listenerName))
.filterNot(_.isEmpty)

coordinatorEndpoint match {
case Some(endpoint) =>
createFindCoordinatorResponse(Errors.NONE, endpoint)
case _ =>
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
}
trace("Sending FindCoordinator response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
}

简单校验

根据协调器类型判断是否有被授权。协调器类型有​​ GROUP((byte) 0), TRANSACTION((byte) 1)​​两种

获取分区号和元信息

这里的接口分两种情况,一个是协调列席为GROUP 一个是 TRANSACTION
他们的处理逻辑都是一样的,只是处理的Topic不一样

GROUP 对应的Topic是 ​​__consumer_offsets​​

TRANSACTION 对应的Topic是​​__transaction_state​​

这里我们主要分析一下 GROUP的情况

  • 去zk获取​​/brokers/topic/__consumer_offsets ​​数据 找到消费者Topic的分区总数。默认是50. (由​​offsets.topic.num.partitions ​​ 控制)找到分区数之和后, 则计算​​ Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount​​(groupID按分区数取模运算)获取到了分区号​​partition​​;
  • 然后接着获取该Topic的元信息, 这里需要注意的是 去获取元信息应该走的是什么 监听协议(listenerName) 呢?这个主要是看当前处理请求的Broker是通过哪个入口来的。比如说该Broker有两个监听口,​​listeners = INTER://xxx.xx.xx.100:9091, OUTSIDE://xxx.xx.xx.101:9092​​ .如果客户端发起请求的时候是对​​xxx.xx.xx.101:9092​​发起的请求,那么这个对应的监听器就是​​ OUTSIDE​​ . 那么Broker去获取​​__consumer_offsets​​元信息发起请求的时候也是会用的​​ OUTSIDE​​ 协议。
  • 如果发现没有这个Topic的元信息,则需要去创建​​__consumer_offsets​​Topic 。
    注意:创建这个Topic的的几个特殊属性:
  • 属性


    描述

    cleanup.policy

    compact

    日志清理策略为 :紧缩

    segment.bytes

    10010241024

    一个日志段的大小

    compression.type

    producer

    压缩类型 为跟生产者保持一致

    构建返回数据 createResponse

    这里才是真正的找到协调器的主要逻辑, 这里的判断逻辑是

    上面我们获取到的分区号是​​partition​​​, 我们同样获取到了​​__consumer_offsets​​的元信息Metadata。

    那我们就可以获取到这个分区号, 并且就能够找到该分区的LeaderId所属在哪个Broker上。

    知道了哪个Broker, 那我们就能够获取到对应的EndPoint, 一个Broker可能同时有多个EndPoint(配置了多个监听器),那么我们应该使用哪个EndPoint呢?

    这个的判断逻辑与上面说过的一样,客户端发起请求时候的监听器是哪个,那么这里就应该用哪个监听器。

    寻找协调器FindCoordinatorRequest请求流程_消费者

    注意:如果找到的分区Leader不存在 那么这个协调器就不存在

    然后会返回异常:


    The coordinator is not available

    问题

  • 如果客户端走的外网监听器访问的集群,那么在客户端发起请求之后到集群内部,触发内部调用链的请求,那么内部这个调用链是用什么监听器访问的呢?
  • 从客户端 -> Broker -> 其他Broker. 这是一个调用链路,从最开始用的是什么监听器那么这条链路上都是用的这个监听器!具体请看:​​多网络情况下,Kafka客户端如何选择合适的网络发起请求​​

    寻找协调器FindCoordinatorRequest请求流程_servlet_02


    上一篇:Kafka 消费者之 findCoordinator源码解析
    下一篇:没有了
    网友评论