ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spring Boot中并发批量处理Kafka消息

2021-03-19 18:01:22  阅读:353  来源: 互联网

标签:ConsumerConfig propsMap Spring Boot factory Kafka topic records CONFIG


在项目开发中,kakfa是我们经常使用的消息中间件,用于上下游解耦合,或者对流量“削峰填谷”。

kafka的写性能非常高,但是消息的消费速度依赖于消费者的处理速度。因此,经常会碰到kafka消息队列拥堵的情况。这时,我们不能直接清理整个topic,因为还有别的服务正在使用该topic,只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,实际上再启动一个新的消费者,没有作用)。

官方文档 https://spring.io/projects/spring-kafka

第一步:设置并发消费

我们使用的是ConcurrentKafkaListenerContainerFactory,并且设置了factory.setConcurrency(4) (我的topic有4个分区,为了加快消费,将并发设置为4,也就是有4个KafkaMessageListenerContainer)。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
}

注意:也可以直接在application.properties中添加spring.kafka.listener.concurrency=4,然后使用@KafkaListener并发消费。

第二步:设置批量消费

1.在ConcurrentKafkaListenerContainerFactory类型的bean对象中,设置factory.setBatchListener(true),用以启用批量消费。

2.在kafka consumer消费者的配置属性中,设置propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50),用以设置每批次最多消费记录数。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说“如果没有达到50条消息,我们就一直等待”。官方的解释是"The maximum number of records returned in a single call to poll()", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中,可以看到还有个 max.poll.interval.ms = 300000,每间隔max.poll.interval.ms,我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. "。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
}

public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

第三步:分区消费

对于只有一个分区的topic,不需要分区消费。如果有多个分区,为了提高消费速度,建议直接针对分区进行消费。

下面的例子是针对有2个分区的情况(如果代码中有4个listenPartitionX方法,可以将topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}
关于分区和消费者关系,官网原文为:if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

总结:如果我们的topic有多个分区,经过以上步骤后,可以很好地加快消息消费。如果只有一个分区,由于已经有一个同名group id在消费了,新启动一个client基本上没有作用。

 

 

标签:ConsumerConfig,propsMap,Spring,Boot,factory,Kafka,topic,records,CONFIG
来源: https://blog.csdn.net/chinawangfei/article/details/115009718

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有