标签:val addedPartitionReplicaAssignment 创建 topics kafka topic controllerContext
执行 windows 脚本
kafka-topics.bat --create --zookeeper localhost:2181/kafka-zhang --replication-factor 1 --partitions 1 --topic zhang
命令行客户端直接把 topic 元数据写入 zk
// TopicCommand adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
KafkaController 监听 zk 节点的变化,并产生 TopicChange 事件
case object TopicChange extends ControllerEvent { override def state: ControllerState = ControllerState.TopicChange override def process(): Unit = { if (!isActive) return val topics = zkClient.getAllTopicsInCluster.toSet val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " + s"[$addedPartitionReplicaAssignment]") if (addedPartitionReplicaAssignment.nonEmpty) onNewPartitionCreation(addedPartitionReplicaAssignment.keySet) } }
在 KafkaController.TopicChange#process 中触发 KafkaController#onNewPartitionCreation
创建 partition,并改变 partition 状态,选出 leader
选择 leader 的策略也很简单,取 isr 的第一个。
选 leader 的策略可以参考单测:
// kafka.controller.PartitionLeaderElectionAlgorithmsTest
标签:val,addedPartitionReplicaAssignment,创建,topics,kafka,topic,controllerContext 来源: https://www.cnblogs.com/allenwas3/p/12864532.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。