ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka Poll轮询机制与消费者组的重平衡分区策略剖析

2020-02-28 10:04:20  阅读:573  来源: 互联网

标签:val Consumer 轮询 kafka topic Poll consumer 分区 位移


注意本文采用最新版本进行Kafka的内核原理剖析,新版本每一个Consumer通过独立的线程,来管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。这就是新版的技术革新。类似于Linux I/O模型或者Select NIO 模型。

Poll为什么要设置一个超时参数

  • 条件:
  • 1:获取足够多的可用数据
  • 2:等待时间超过指定的超时时间。
  • 目的在于让Consumer主线程定期的””苏醒”去做其他事情。比如:定期的执行常规任务,(比如写日志,写库等)。
  • 获取消息,然后执行业务逻辑。

位移精度

  • 最少一次 -> 消息会被重复处理
  • 最多一次 -> 消息会丢失,但不会被重复处理。
  • 精确一次 -> 一定会被处理,且也只会处理一次。

位移角色

  • 上次提交位移 :last committed offset
  • 当前位置 :current position
  • 水位 : High watermark
  • 日志终端位移: (Log End Offset)

位移管理

consumer的位移提交最终会向group coordinator来提交,不过这里重点需要重新说明一下:组协调者coordinator负责管理所有的Consumer实例。而且coordinator运行在broker上(通过选举出某个broker),不过请注意新版本coordinator只负责做组管理。

但是具体的reblance分区分配策略目前已经交由Consumer客户端。这样就解耦了组管理和分区分配。

权利下放的优势:

  • 如果需要分配就貌似需要重启整个kafka集群。
  • 在Consumer端可以定制分区分配策略。
  • 每一个consumer位移提交时,都会向_consumer_offsets对应的分区上追加写入一条消息。如果某一个consumer为同一个group的同一个topic同一个分区提交多次位移,很显然我们只关心最新一次提交的位移。

reblance的触发条件

  • 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
  • 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
  • 组成员发生变更:新加入组以及离开组。

reblance 分配策略

range分区分配策略

举例如下:一个拥有十个分区(0,1,2…..,9)的topic,相同group拥有三个consumerid为a,b,c的消费者:

  • consumer a分配对应的分区号为[0,4),即0,1,2,3前面四个分区

  • consumer b 分配对应分区4,5,6中间三个分区

  • consumer c 分配对应分区7,8,9最后三个分区。

    class RangeAssignor() extends PartitionAssignor with Logging {

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    def (ctx: AssignmentContext) = {
    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
    val partitionAssignment =
    new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
    for (topic <- ctx.myTopicThreadIds.keySet) {
    val curConsumers = ctx.consumersForTopic(topic)
    val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

    val nPartsPerConsumer = curPartitions.size / curConsumers.size
    val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

    info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
    " for topic " + topic + " with consumers: " + curConsumers)

    for (consumerThreadId <- curConsumers) {
    val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
    assert(myConsumerPosition >= 0)
    val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
    val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)


    * Range-partition the sorted partitions to consumers for better locality.
    * The first few consumers pick up an extra partition, if any.
    */
    if (nParts <= 0)
    warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
    else {
    for (i <- startPart until startPart + nParts) {
    val partition = curPartitions(i)
    大专栏

标签:val,Consumer,轮询,kafka,topic,Poll,consumer,分区,位移
来源: https://www.cnblogs.com/lijianming180/p/12375856.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有