ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka手动拉取消费Topic

2021-11-03 21:33:43  阅读:198  来源: 互联网

标签:kafkaConsumer partition kafka Topic item recordMap offset put 拉取


一般在SpringBoot使用kafka,通常用@KafkaListener注解来进行监听消费。然而某些时候,我们不需要监听而要以定时拉取的方式进行消费,本文主要就是简单记录此方式的实现方法。


//批次大小
private static Integer batchSize = 3;
//批次时间
private static Integer batchTime = 5;

@Resource
private KafkaProperties kafkaProperties;

@Test
void kafkaTest() {

    //配置消费者
    Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消费组
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消费条数
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自动提交
    //建立消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    //获取所有partition信息
    List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
    Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
    partitionList.forEach(item
            -> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
    //订阅topic并设置起始offset
    kafkaConsumer.assign(topicPartitionMap.keySet());
    topicPartitionMap.forEach(kafkaConsumer::seek);

    //启动消费线程(仅用作示例)
    ((Runnable) () -> {
        Duration duration = Duration.ofSeconds(batchTime);
        long batchTimeMs = batchTime * 1000L;
        Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
        while (true) {
            try {
                TimeInterval interval = DateUtil.timer();
                ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);

                int count = records.count();
                log.info("测试消费获取到数据 => {} 条", count);
                if (count > 0) {
                    //处理数据
                    List<String> values = CollUtil.newArrayList();
                    records.forEach(item -> values.add(item.value()));
                    //记录当前批次每个Partition最小offset
                    for (ConsumerRecord<String, String> item : records) {
                        values.add(item.value());
                        if (recordMap.containsKey(item.partition())) {
                            ConsumerRecord<String, String> original = recordMap.get(item.partition());
                            if (item.offset() < original.offset()) {
                                recordMap.put(item.partition(), item);
                            }
                        } else {
                            recordMap.put(item.partition(), item);
                        }
                    }
                    //执行业务,抛出异常
                    throw new RuntimeException("测试错误");
                    //同步提交offset
                    kafkaConsumer.commitSync();
                    //正常提交后清除记录
                    recordMap.clear();
                }

                //批次消费达到上限,不休眠直接进行下一次消费
                if (batchSize == count) {
                    continue;
                }
                //计算消费耗时并休眠
                long used = interval.intervalMs();
                if (used < batchTimeMs) {
                    ThreadUtil.safeSleep(batchTimeMs - used);
                }

            } catch (Exception e) {
                log.error("消费出错 => {}", e.getMessage());
                recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
                log.error(ExceptionUtil.stacktraceToString(e));
                ThreadUtil.safeSleep(batchTimeMs);
            }
        }
    }).run();
}

(备注:主要涉及依赖:spring-kafkahutool)


文章转载自我的个人博客:https://blog.fordes.top,欢迎访问交流,文章如有谬误请务必指出~

标签:kafkaConsumer,partition,kafka,Topic,item,recordMap,offset,put,拉取
来源: https://www.cnblogs.com/fordes/p/15505818.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有