ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka——kafkaProducer 分析

2019-04-03 14:48:59  阅读:288  来源: 互联网

标签:分析 序列化 分区 Kafka topic 拦截器 key kafka kafkaProducer


由于本人最近在学习 kafka,看了kafka 的源码解析以及厮大的深入理解 kafka 之后决定自己在源码 debug 更加深入的学习 kafka。
先从 producer 看起:
在这里插入图片描述
运行 zookeeper,kafka server, producer 之后。在控制台随意输入一条消息进行 debug。
首先他会把消息封装成 ProducerRecord
在这里插入图片描述
主要的6个参数:
headers:可以是多个 header 组成也可以为 null 也可以是单个 header。这个 header 会和 kafka 的幂等,事务有所关联
timestamp:如果没有指定的话,会默认使用System.currentTimeMillis()当前时间戳。
key:如果指定 key 的话 会影响到 log compression 以及分区
value:具体的消息内容
topic:主题
partition:分区

之后通过 KafkaProducer 发送消息:
在这里插入图片描述
1.首先如果你经过的是拦截器。默认是没有拦截器的,可以自己实现拦截器实现ProducerInterceptor 接口即可。
在这里插入图片描述
其中有两个方法
在这里插入图片描述
在这里插入图片描述
在 onSend 中可以进行自己的业务逻辑,onAcknowledgement 可以对 ack 响应进行预处理,这个方法是最先获取从服务端获取的 response。
在这里插入图片描述
之后 producer 调用了 doSend 方法
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
该方法的整体流程:
2.首先根据 topic,partition,maxBlcokTimeMs(等待更新 kafka 元数据的最长时间默认60000ms)在 waitOnMetadata方法中获取该 topic 的元数据 如果集群中没有该topic 那就会添加,通过partitionCountForTopic(topic)获取该 topic 下的分区数。如果有如果有缓存的元数据,并且记录的分区没有定义,则返回缓存的元数据。
返回元数据以及等待时间:
元数据
元数据元数据
3.然后对key以及 value 进行序列化。这里也是可以自定义序列化 只要实现 Serializer 和 Deserializer 即可
在这里插入图片描述
4.下一步操作就是选择消息的分区,优先根据 ProducerRecord 指定的分区,如果没有则通过partitioner.partition 方法进行分区
在这里插入图片描述
在这里插入图片描述
具体的分区策略:
在这里插入图片描述
首先根据 topic 找出该 topic 下的所有 partitions 的数量
keyBytes 是 producerRecord 中的 key序列化之后的值,所以说如果指定 key 是会对分区造成影响
如果有 key 会对 key 进行 hash(murmur2这种hash 的方式) 然后跟分区数量进行取模
如果没有key且可用分区大于0的话:通过 counter 和可用分区进行取模 counter 不断自增
如果没有key且可用分区小于0的话:通过 counter 和分区进行取模 counter 不断自增

5.然后通过计算出来的 partition 和 topic 封装成 TopicPartition 形成 topic-partition 的映射关系
6.预估这个 record 处理大小的上限值,没有考虑到压缩算法的开销
7.生产回调讲调用拦截器的回调,这就是为什么之前说可以在onAcknowledgement做预处理
8.判断是否开启事务
9.把TopicPartition,时间戳,序列化 key,序列化值,请求头,拦截器回调,剩余等待时间 预估一下需要处理的值放入 buffer 存入RecordAccumulator 中
10.如果batch已经满了或者创建了新的batch 唤醒 sender 所以说 KafkaProducer 它不会对消息进行发送,而且将消息进行处理存入RecordAccumulator 中,有 sender 线程进行发送数据。

标签:分析,序列化,分区,Kafka,topic,拦截器,key,kafka,kafkaProducer
来源: https://blog.csdn.net/weixin_39829400/article/details/88994430

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有