ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka学习(十三) api讲解篇(转载)

2022-07-23 20:05:30  阅读:196  来源: 互联网

标签:my value Kafka topic api key 讲解 kafka consumer


1. kafka-python的安装

  pip3 install kafka-python

2.kafka-python的基本使用

  • 最简单使用实例

1.消费端

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for msg in consumer:
    print(msg)
  • 第1个参数为 topic的名称
  • group_id : 指定此消费者实例属于的组名,可以不指定
  • bootstrap_servers : 指定kafka服务器

2.生产端

复制代码
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
result = future.get(timeout= 10)
print(result)
复制代码

producer.send函数为发送消息

  • 第1个参数为 topic名称,必须指定
  • key : 键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
  • value : 值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
  • partition : 指定发送的partition,由于kafka默认配置1个partition,固为0

future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替

 

3.发送或接收消息解析

消费者端接收消息如下:

ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
  • topic
  • partition
  • offset : 这条消息的偏移量
  • timestamp : 时间戳
  • timestamp_type : 时间戳类型
  • key : key值,字节类型
  • value : value值,字节类型
  • checksum : 消息的校验和
  • serialized_key_size : 序列化key的大小
  • serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1

 

KafkaConsumer

    • 手动分配partition
复制代码
from kafka import KafkaConsumer
from kafka import TopicPartition

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
for msg in consumer:
    print(msg)
复制代码
    • 超时处理

 

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
    print(msg)

若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待

consumer_timeout_ms : 毫秒数

 

    • 订阅多个topic

 

复制代码
from kafka import KafkaConsumer

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
    print(msg)
复制代码

可同时接收多个topic消息

也可用正则订阅一类topic

复制代码
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
    print(msg)
复制代码
  • 解码json数据

编码(生产者):value_serializer

解码(消费者):value_deserializer

1.先看producer发送的json数据

复制代码
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
future = producer.send('my_topic' ,  value= {'value_1' : 'value_2'}, partition= 0)
future.get(timeout= 10)
复制代码

2.consumer没有解码收到的数据

ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到value为原始的json字节数据,接下来可以再做一步解码操作

3.consumer自动解码

复制代码
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
    print(msg)
复制代码

接收结果:

ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
  • 可以看到接收结果中,value已经自动解码,并为字符串类型
  • 不仅value可以json,key也可以,只需指定 key_deserializer

 

KafkaProducer

    • 发送字符串类型的key和value

 

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
future = producer.send('my_topic' ,  key= 'key_3', value= 'value_3', partition= 0)
future.get(timeout= 10)

指定 key_serializer 和 value_serializer 为 str.encode,但消费者收到的还是字节字符串

若想要消费者收到的为字符串类型,就需要解码操作,key_deserializer= bytes.decode

  复制代码
from kafka import KafkaConsumer

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
    print(msg)
复制代码
  • 可压缩消息发送

compression_type='gzip'

若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or None

 
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
future = producer.send('my_topic' ,  key= b'key_3', value= b'value_3', partition= 0)
future.get(timeout= 10)
  • 发送msgpack消息

msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效

producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

 

end

标签:my,value,Kafka,topic,api,key,讲解,kafka,consumer
来源: https://www.cnblogs.com/yinging/p/16512867.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有