ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka 笔记

2022-01-23 20:33:49  阅读:139  来源: 互联网

标签:消费 -- 分区 笔记 kafka 消息 Kafka consumer


仅仅记录最近学习Kafka笔记

视频地址:https://www.bilibili.com/video/BV1Xy4y1G7zA?p=25

kafka视频笔记
命令:
创建生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic yi
创建消费者(带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testgroup --topic yi
创建消费者(不带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic yi
查询所有消费组:kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
查询消费组详细信息:kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group testgroup
查询主题详细信息:kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
创建2分区3副本的主题:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
集群创建生产者:kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic
集群创建消费者(带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

 

创建2分区3副本的主题 这个命令可以通过看zk的节点或者看对应的log目录是否多出同名多分区的文件夹。


重点关注:
current-offset:最后被消费的消息的偏移量
log-end-offset:消息总量(最后一条消息的偏移量)
lag:积压了多少条消息

稀疏存储
详细的解释:https://blog.csdn.net/shudaqi2010/article/details/90815675
其实就是根据24小时根据数据条数 默认将id放到index 和 timeindex.log文件内 方便快速查找

分区:
分区的概念就是为了将数据给分开存储 否则几个T的数据都在一个topic下会很大
默认的主题__consumer_offsets(整个kafka集群里只有一份)
kafka内部创建了__consumer_offsets主题包含了50个分区。这个主题用于存放消费者某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费者的主题的偏移量自助上报给kafka的默认的主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数
提交到该主题的内容是:key是consumerGroupid+topid+分区号,value是当前offset值
文件中保存的消息,默认保存7天,七天到后消息会被删除
一个分区最多只能被一个消费组里的一个消费者所消费


副本:
可以理解为备份

集群中有多个broker,创建主题时可以指明主题有多个分区(把分区拆分到不同的分区中存储)当ISR 是指当leader挂了的时候会从leader里面集合中读取,可以为分区创建多个副本,不同的副本存放在不同的broker里

创建带有多分区、多副本 查看详细信息里 有如下几个信息
leader:
kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生新的leader
isr:
可以同步和已同步的节点会被存入isr集合中。这里有一个细节:如果isr中节点性能较差,会被剔除ist集合,当leader挂了的时候会从leader里面集合中读取

多播和单播区别
多播(多个地方消费):是指多个消费组里 最新的消费者可以接收到消息 多消费者接收
单播(只有一个地方消费):是指一个消费组里 最新的消费组可以接收到消息 一个消费者接收

在企业中为了保证数据的安全性和一致性 它数据存储在队列上,实际是放到内存上rabbitmq
如果是java 协议支持很多,使用其他的程序,除了java的 它数据存储在队列上,实际是放到内存上 avtivemq
如果你的数据很庞大(大数据) 日志分析,用户行为分析,支持动态扩容,它数据存储在队列上,实际是放到磁盘上,(某个文件)坏处就是不能保证消息的可靠性,kafka

生产者中的ack配置:
在同步发送的前提下,生产者在获得集群返回的ack之前会一直堵塞。那么集群什么时候返回ack呢?此时ack有三个配置
ack = 0: kafka-cluster 不需要任何broker收到消息,就立即返回ack给生产者,最容易丢消息,效率是最高的
ack = 1(默认): 多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的。
ack = 1/all 。 里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2,如果这个走默认配置,那么和ack = 1 无任何区别),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已经完成了数据接收),这种方式最安全,但是性能最差

java客户端配置的
生产者配置:缓冲区
kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
ProducerConfig.BUGGER_MEMORY_CONFIG,33554432
kafka本地线程会去缓冲区中拉一次16k的数据,发送到broker
ProducerConfig.BATCH_SIZE_CONFIG,16384
如果线程拉不到16k的数据,间隔10毫秒,也会将已拉到的数据发到broker
ProducerConfig.LINGER_MS_CONFIG,10

消费者配置:自动/手动 提交
关于消费者自动提交和手动提交offset
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息的提交到_consumer_offsets主题里面
自动提交
消费者poll消息下来以后就会自动提交offset(这玩意吧。按照我理解 他同步的topic内,这个客户端把消息都拿来了 那么offset就将下标回馈到broker的是下拿到的数据lengh+1的offset)
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"
注意:自动提交会丢消息,因为消费者在消费前提交的offset,有可能提交完后,消息者挂了

手动提交
需要把自动提交的配置改成false
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"
手动提交分2种(一般用的是同步,懒得解释同步异步啥区别)
手动同步提交
手动异步提交

消费者配置:长轮询poll消息
默认的情况,消费者会一次性poll500条消息
CosumerConfig.MAX_POLL_RECORED_CONFIG,"500"
代码中设置了长轮询的时间是1000毫秒
consumer.poll(Duration.ofMillis(1000));
意味着
如果一次poll到500条数据,就直接执行for循环
如果这一次没有poll到500条数据。且时间在一秒内,那么长轮训继续poll,要么到500条,要么到1秒
如果多次poll都没达到500条,且一秒时间到了,那么直接执行for循环
如果两次poll的间隔超过30秒,集群会认为该消费者的能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息少一点
CosumerConfig.MAX_POLL_RECORED_CONFIG,"500"
CosumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000

消费者配置:指定分区和偏移量、时间消费
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)))
从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)))
consumer.seek(new TopicPartition(TOPIC_NAME,0),10)
指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费

消费者配置:新消费者的消费offset的规则
新消费组的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息。)可以通过以下设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
Latest:默认,消费新消息
earliest:第一次从头开始消费。之后开始消费新的消息
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"


消息的细节
生产者将消息发送给broker,broker会将消息保存在本地的日志文件中?
C:\Kafka\kafka_2.12-2.8.1\kafka-logs\主题-分区\0000000000000.log
消息的保存是有序的,通过offset偏移量来描述消息的有效性
消费者消费消息时也是通过offset来描述当前消费的那条消息的位置

kafka为了保证消费的顺序,所以一个分区只能被同一个消费组里的一个消费组所消费
如果消息发给多个分区,那么顺序是不可以保证的

kafka: 3.0开始不需要zk也可以了


stream 是干嘛的? 消息二次加工!


磁盘带索引的读取 要比内存随机读取要快??

zookeeper 主要功能就是做分布式协调的

标签:消费,--,分区,笔记,kafka,消息,Kafka,consumer
来源: https://www.cnblogs.com/yi1036943655/p/15837183.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有