ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

十六、kafka消费者之SyncGroup(一)

2022-03-20 22:59:27  阅读:193  来源: 互联网

标签:group generation 十六 kafka member members joinResponse leader SyncGroup


这部分主要来说明消费者对协议的处理。

各个消费者都可设置partition.assignment.strategy(分区分配策略),服务端是如何处理的呢?

这块的代码要追溯到joinGroup请求结束,通过前面的源码分析我们知道joinGroup主要是判断是否发起rebalance以及等待其他组成员加入组,而在所有成员加入或者RebalanceTimeout
之后会调用onCompleteJoin方法,代码如下。

def onCompleteJoin(group: GroupMetadata): Unit = {
    //1.1 针对动态成员的处理
    group.inLock {
      group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
        removeHeartbeatForLeavingMember(group, failedMember)
        group.remove(failedMember.memberId)
        group.removeStaticMember(failedMember.groupInstanceId)
      }

      if (group.is(Dead)) {
        info(s"Group ${group.groupId} is dead, skipping rebalance stage")
        //leader没有rejoin且没有member能选,则group.maybeElectNewJoinedLeader返回false,我们需要再次延时。通过maybeElectNewJoinedLeader选出leader
      } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
        // If all members are not rejoining, we will postpone the completion
        // of rebalance preparing stage, and send out another delayed operation
        // until session timeout removes all the non-responsive members.
        error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
        joinPurgatory.tryCompleteElseWatch(
          new DelayedJoin(this, group, group.rebalanceTimeoutMs),
          Seq(GroupKey(group.groupId)))
      } else {
        //1.2 状态转为CompletingRebalance,投票选择协议,选择票数最多的那个
        group.initNextGeneration()
        if (group.is(Empty)) {
          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          groupManager.storeGroup(group, Map.empty, error => {
            if (error != Errors.NONE) {
              // we failed to write the empty group metadata. If the broker fails before another rebalance,
              // the previous generation written to the log will become active again (and most likely timeout).
              // This should be safe since there are no active members in an empty generation, so we just warn.
              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
            }
          })
        } else {
          //1.3 选举完成后就返回
          // trigger the awaiting join group response callback for all the members after rebalancing
          for (member <- group.allMemberMetadata) {
            val joinResult = JoinGroupResult(
              members = if (group.isLeader(member.memberId)) {
                group.currentMemberMetadata
              } else {
                List.empty
              },
              memberId = member.memberId,
              generationId = group.generationId,
              protocolType = group.protocolType,
              protocolName = group.protocolName,
              leaderId = group.leaderOrNull,
              error = Errors.NONE)

            group.maybeInvokeJoinCallback(member, joinResult)
            completeAndScheduleNextHeartbeatExpiration(group, member)
            member.isNew = false
          }
        }
      }
    }
  }

针对动态成员的处理

这是一个与前面关联的点,在前面一篇的案例中我们知道如果是静态成员会拥有更长的session时间,而动态成员则是在断连之后第一次Rebalance时就剔除掉下线的member,处理就是在1.1的
代码中。在这里会过滤掉静态成员,将没有加入组的成员移除掉。

group状态转为CompletingRebalance,投票选举协议 kafka.coordinator.group.GroupMetadata#initNextGeneration

在initNextGeneration方法中可以看到会对generationId自增加一,generationId相当于group的纪元,每次发生Rebalance都会自增。接着是设置protocolName,针对选举协议的部分就是在Some(selectProtocol)中。

  def initNextGeneration() = {
    if (members.nonEmpty) {
      generationId += 1
      protocolName = Some(selectProtocol)
      subscribedTopics = computeSubscribedTopics()
      transitionTo(CompletingRebalance)
    } else {
      generationId += 1
      protocolName = None
      subscribedTopics = computeSubscribedTopics()
      transitionTo(Empty)
    }
    receivedConsumerOffsetCommits = false
    receivedTransactionalOffsetCommits = false
  }

对于selectProtocol这段代码还挺好理解的,就是对所有的member设置的协议投票,取票数最多的协议,这个代码逻辑也与很多资料说的相符,但针对协议的处理只是简单这样吗?
大家可以想象一个case:如果有四个消费者,其中三个都设置的StickyAssignor,而剩的这个设置的CooperativeStickyAssignor,正好剩的这个被选为leader会发生什么呢?
(前面分析过,服务端选择消费者leader也很随意,就是取member的第一个)。

  def selectProtocol: String = {
    if (members.isEmpty)
      throw new IllegalStateException("Cannot select protocol for empty group")

    // select the protocol for this group which is supported by all members
    val candidates = candidateProtocols

    // let each member vote for one of the protocols and choose the one with the most votes
    val votes: List[(String, Int)] = allMemberMetadata
      .map(_.vote(candidates))
      .groupBy(identity)
      .mapValues(_.size)
      .toList

    votes.maxBy(_._2)._1
  }

带着上面的疑问我们来测试一下

  • 准备三个消费者:
    消费者1:设置StickyAssignor
    消费者2:设置StickyAssignor
    消费者3:设置CooperativeStickyAssignor
  • 测试结果:
    消费者3会收到一个异常

    Exception in thread “main” org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member’s supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.

  • 抛出此异常的代码如下,memberProtocolType就是customer,memberProtocols即为消费者设置的protocols,memberProtocols.exists(supportedProtocols
    (_) == members.size)是scala的语法,supportedProtocols是Map结构,key为protocols,value为member中对应protocol
    的个数,这段代码意思是说如果memberProtocols中存在supportedProtocols中包含,且对应的value值等于member的个数,则为true,反之为false
    。也就是说如果第一个加入组的消费者设置了两个协议,则这两个协议的value值都为1,而第二个加入组的消费者就必须要包含其中一个协议,给其中一个协议的value加1,然后第三个消费者也必须包含前面加1
    的那个协议,否则抛异常。所以,实际上为协议投票并没有前面说的那么民主,在加入组时会针对协议做校验,防止最后选择的leader没有这个协议。刚开始我想的是按照这个设定的话,那每次加入组的消费者都必须包含与member
    大小相等的协议的话,那后面针对协议再投票是不是没有必要了?直接取跟memberSize相等的协议不行吗?并不是这样,经过测试同一个消费者还可以为同一个协议投两次票,毕竟protocols的类型是List,然而有一个case
    ,如果第一个消费者只为StickyAssignor投两次票,第二个消费者只投一次,则也会抛InconsistentGroupProtocolException
    ,这种设计看似高大上,实际有种自己跟自己玩然后没啥意思的感觉,我也不继续深究了。
  def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]) = {
    if (is(Empty))
      !memberProtocolType.isEmpty && memberProtocols.nonEmpty
    else
      protocolType.contains(memberProtocolType) && memberProtocols.exists(supportedProtocols(_) == members.size)
  }

joinGroup返回处理

对于成为leader的消费者,服务端会返回成员信息,其他的则返回空,返回参数样例如下

  • 成为leader的消费者

JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=3, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, memberId=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_2-4c364a51-2d52-446f-b4d4-2b61ae3738c0’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0]), JoinGroupResponseMember(memberId=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0])])

  • 其他消费者

JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=3, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, memberId=‘mykafka-group_4_2-4c364a51-2d52-446f-b4d4-2b61ae3738c0’, members=[])

收到策略之后消费者是如何处理的呢?

在发送joinGroup请求之后有给response设置处理类JoinGroupResponseHandler,最终返回的是JoinGroupResponseHandler
处理之后的处理结果,我们来看JoinGroupResponseHandler中是如何处理的。

if (error == Errors.NONE) {
    if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
        log.debug("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}",
            joinResponse.data().protocolType(), protocolType());
        future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
    } else {
        log.info("Received successful JoinGroup response: {}", joinResponse);
        sensors.joinSensor.record(response.requestLatencyMs());

        synchronized (AbstractCoordinator.this) {
            if (state != MemberState.REBALANCING) {
                // if the consumer was woken up before a rebalance completes, we may have already left
                // the group. In this case, we do not want to continue with the sync group.
                future.raise(new UnjoinedGroupException());
            } else {
                //根据返回参数的回调来做处理
                AbstractCoordinator.this.generation = new Generation(
                    joinResponse.data().generationId(),
                    joinResponse.data().memberId(), joinResponse.data().protocolName());
                //针对是否是leader分开处理
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            }
        }
    }
}

针对leader的处理

根据投票决定的分配规则分配分区,分配结束后发送SyncGroupRequest请求,在方法performAssignment中还会更新subscriptionState中的groupSubscription
及subscription,以及org.apache.kafka.clients.consumer.internals.ConsumerCoordinator中的 assignmentSnapshot以及 
metadataSnapshot
 private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {
            // perform the leader synchronization and send back the assignment for the group
            //根据分区分配策略来分配组成员处理的分区
            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                    joinResponse.data().members());

            List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
            for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
                groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
                        .setMemberId(assignment.getKey())
                        .setAssignment(Utils.toArray(assignment.getValue()))
                );
            }

            SyncGroupRequest.Builder requestBuilder =
                    new SyncGroupRequest.Builder(
                            new SyncGroupRequestData()
                                    .setGroupId(rebalanceConfig.groupId)
                                    .setMemberId(generation.memberId)
                                    .setProtocolType(protocolType())
                                    .setProtocolName(generation.protocolName)
                                    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                    .setGenerationId(generation.generationId)
                                    .setAssignments(groupAssignmentList)
                    );
            log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
            return sendSyncGroupRequest(requestBuilder);
        } catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

针对follower的处理

针对follower就直接发送SyncGroupRequest

private RequestFuture<ByteBuffer> onJoinFollower() {
    // send follower's sync group with an empty assignment
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(
                    new SyncGroupRequestData()
                            .setGroupId(rebalanceConfig.groupId)
                            .setMemberId(generation.memberId)
                            .setProtocolType(protocolType())
                            .setProtocolName(generation.protocolName)
                            .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                            .setGenerationId(generation.generationId)
                            .setAssignments(Collections.emptyList())
            );
    log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
    return sendSyncGroupRequest(requestBuilder);
}

总结

这部分到发送同步数据请求这里就结束了,下一篇会来继续分析发送同步请求之后做的事,以及针对四个分配规则来深入分析

标签:group,generation,十六,kafka,member,members,joinResponse,leader,SyncGroup
来源: https://blog.csdn.net/qq_34306010/article/details/123623887

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有