ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka生产者Producer、消费者Consumer的拦截器interceptor

2021-06-04 14:54:14  阅读:147  来源: 互联网

标签:拦截器 Producer kafka org apache import consumer properties


1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:

  1)、按照某个规则过滤掉不符合要求的消息。
  2)、修改消息的内容。
  3)、统计类需求。

 1 package com.demo.kafka.listener;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.kafka.clients.producer.ProducerInterceptor;
 6 import org.apache.kafka.clients.producer.ProducerRecord;
 7 import org.apache.kafka.clients.producer.RecordMetadata;
 8 
 9 /**
10  * 生产者拦截器
11  * 
12  * @author 生产者拦截器
13  *
14  */
15 
16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
17 
18     // 发送成功计数
19     private volatile long sendSuccess = 0;
20 
21     // 发送失败计数
22     private volatile long sendFailure = 0;
23 
24     /**
25      * 
26      */
27     @Override
28     public void configure(Map<String, ?> configs) {
29 
30     }
31 
32     /**
33      * 发送消息已经操作消息的方法
34      */
35     @Override
36     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
37         String modifiedValue = "前缀prefix : " + record.value();
38         ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
39                 record.topic(), // 主题
40                 record.partition(), // 分区
41                 record.timestamp(), // 时间戳
42                 record.key(), // key值
43                 modifiedValue,  // value值
44                 record.headers()); // 消息头
45         return producerRecord;
46     }
47 
48     /**
49      * ack确认的方法
50      */
51     @Override
52     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
53         if(exception == null) {
54             sendSuccess++;
55         }else {
56             sendFailure++;
57         }
58     }
59 
60     /**
61      * 关闭的方法,发送成功之后会将拦截器关闭,调用此方法
62      */
63     @Override
64     public void close() {
65         double succe***ation = (double)sendSuccess / (sendSuccess + sendFailure);
66         System.out.println("【INFO 】 发送成功率: " + String.format("%f", succe***ation * 100) + "%");
67     }
68 
69 }

生产者客户端要配置一下Producer的拦截器interceptor,如下所示:

 1 package com.demo.kafka.producer;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.ExecutionException;
 5 
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.ProducerConfig;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11 
12 import com.demo.kafka.listener.ProducerInterceptorPrefix;
13 
14 public class KafkaProducerSimple {
15 
16     // 设置服务器地址
17     private static final String brokerList = "192.168.110.142:9092";
18 
19     // 设置主题
20     private static final String topic = "topic-demo";
21 
22     public static void main(String[] args) {
23         Properties properties = new Properties();
24         // 设置key的序列化器
25         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26 
27         // 设置重试次数
28         properties.put(ProducerConfig.RETRIES_CONFIG, 10);
29 
30         // 设置值的序列化器
31         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
32 
33         // 打印输出序列化器的路径信息
34         System.err.println(StringSerializer.class.getName());
35 
36         // 设置集群地址
37         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
38 
39         // 自定义拦截器使用,可以计算发送成功率或者失败率,进行消息的拼接或者过滤操作
40         properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
41 
42         // 将参数配置到生产者对象中
43         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
44 
45         for (int i = 0; i < 100000; i++) {
46             // 生产者消息记录
47             ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i);
48             // 同步获取消息
49 //            RecordMetadata recordMetadata = producer.send(record).get();
50             producer.send(record);
51         }
52 
53         // 关闭
54         producer.close();
55     }
56 
57 }

消费者代码,如下所示:

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Collections;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.ConsumerConfig;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.ProducerConfig;
12 import org.apache.kafka.common.serialization.StringDeserializer;
13 
14 public class KafkaConsumerSimple {
15 
16     // 设置服务器地址
17     private static final String bootstrapServer = "192.168.110.142:9092";
18 
19     // 设置主题
20     private static final String topic = "topic-demo";
21 
22     // 设置消费者组
23     private static final String groupId = "group.demo";
24 
25     public static void main(String[] args) {
26         Properties properties = new Properties();
27         // 设置反序列化key参数信息
28         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
29         // 设置反序列化value参数信息
30         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
31 
32         // 设置服务器列表信息
33         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
34 
35         // 设置消费者组信息
36         properties.put("group.id", groupId);
37 
38         // 将参数设置到消费者参数中
39         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
40 
41         // 消息订阅
42         consumer.subscribe(Collections.singletonList(topic));
43 
44         while (true) {
45             // 每隔一秒监听一次
46             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
47             // 获取到消息信息
48             for (ConsumerRecord<String, String> record : records) {
49                 System.err.println(record.toString());
50             }
51         }
52 
53     }
54 
55 }

 

2、生产者的acks参数,这个参数用来指定分区中必须有多少副本来收到这条消息,之后生产者才会认为这条消息写入成功的。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。

  1)、ack等于0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了,不过因为生产者不需要等待服务器响应,所以他可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  2)、acks等于1,默认值为1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点,比如首领节点崩溃,新的首领节点还没有被选举出来,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没有来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
  3)、acks等于-1,只有当所有参与复制的节点收到消息时候,生产者会收到一个来自服务器额成功响应,这种模式 最安全的,他可以保证不止一个服务器收到消息。

  注意,acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常信息。

 

3、kafka消费者订阅主题和分区,创建完消费者后我们便可以订阅主题了,只需要调用subscribe方法即可,这个方法会接受一个主题列表,如下所示:

  另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组立即对其进行消费。正则表达式在连接kafka与其他系统非常有用。比如订阅所有的测试主题。

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Collections;
 6 import java.util.Properties;
 7 import java.util.regex.Pattern;
 8 
 9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 
17 public class KafkaConsumerSimple {
18 
19     // 设置服务器地址
20     private static final String bootstrapServer = "192.168.110.142:9092";
21 
22     // 设置主题
23     private static final String topic = "topic-demo";
24 
25     // 设置主题
26     private static final String topic2 = "topic-demo2";
27 
28     // 设置消费者组
29     private static final String groupId = "group.demo";
30 
31     public static void main(String[] args) {
32         Properties properties = new Properties();
33         // 设置反序列化key参数信息
34         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35         // 设置反序列化value参数信息
36         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38         // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
39         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41         // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
42         properties.put("group.id", groupId);
43 
44         // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
45         properties.put("client.id", "consumer.client.id.demo");
46 
47         // 将参数设置到消费者参数中
48         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
49 
50         // 消息订阅
51         consumer.subscribe(Collections.singletonList(topic));
52         // 可以订阅多个主题
53         consumer.subscribe(Arrays.asList(topic, topic2));
54         // 可以使用正则表达式进行订阅
55         consumer.subscribe(Pattern.compile("topic-demo*"));
56 
57         // 指定订阅的分区
58         consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
59 
60         while (true) {
61             // 每隔一秒监听一次
62             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
63             // 获取到消息信息
64             for (ConsumerRecord<String, String> record : records) {
65                 System.err.println(record.toString());
66             }
67         }
68 
69     }
70 
71 }

 

标签:拦截器,Producer,kafka,org,apache,import,consumer,properties
来源: https://blog.51cto.com/u_12469213/2858250

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有