ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka多线程消费

2021-09-24 18:01:17  阅读:168  来源: 互联网

标签:消费 partition private Kafka new kafka import 多线程 consumer


本文简单介绍下如何使用多线程消费kafka

注: 以下示例采用Kafka版本2.2

消费者配置

消费者从Kafka读取消息,需要考虑以下消费者配置。

参数 说明
max.poll.records(default=500) 消费者单次获取的最大消息条数
fetch.max.bytes(default=52428800) 服务器应为获取请求返回的最大数据量。记录由消费者分批获取,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍会返回该记录批次,以确保消费者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过message.max.bytes(broker config) 或max.message.bytes(topic config) 定义的。请注意,消费者并行执行多次提取。
session.timeout.ms(default=10000) 消费者定期向broker发送心跳,如果在此会话超时到期之前broker没有收到心跳,则broker将从组中删除消费者,并启动重新平衡
max.poll.interval.ms(default=300000) 消费者两次调用poll()之间的最大延迟,如果超过这个时间,则broker将从组中删除消费者并启动重新平衡
heartbeat.interval.ms(default=3000) 定义消费者发送心跳的频率

Rebalance

什么情况会触发重新平衡?

Kafka在管理消费者组时,只要消费者组成员发生变化或消费者组订阅发生变化,就会触发分区重新平衡。比如:

  • topic 添加了新的分区

  • 一个消费者加入一个组:部署了新的程序,使用了一样的groupId

  • 一个消费者离开一个组:

    • max.poll.interval.ms 超时,未及时处理轮询记录
    • session.timeout.ms超时,由于应用程序崩溃或者网络错误,没有发送心跳
    • 消费者关闭,服务停掉

重新平衡该怎么做?

如果是启用自动偏移提交,您不必担心组重新平衡,一切都由 poll 方法自动完成。但是,如果您禁用自动偏移提交并手动提交,你需要在发送组请求之前提交偏移量。您可以通过两种方式执行此操作:

  1. 在处理完一批消息后执行commitSync()

       while (true) {
    	   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    	   for (ConsumerRecord<String, String> record : records) {
    		   buffer.add(record);
    	   }
    	   if (buffer.size() >= minBatchSize) {
    		   insertIntoDb(buffer);
    		   // 在入库之后,提交offset之前失败的话,也会导致重复消费
    		   consumer.commitSync();
    		   buffer.clear();
    	   }
      }
    
  2. 实现ConsumerRebalanceListener,在分区即将被撤销时得到通知,并在此时提交相应的offset。

    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    private Consumer<?,?> consumer;
    
    public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
       this.consumer = consumer;
    }
       
    @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions){
      // 提交offset
      for(TopicPartition partition: partitions)
       saveOffsetInExternalStore(consumer.position(partition));
      }
    
    }
    

第一种方法更简单,但处理速度非常快的情况可能会导致偏移提交过于频繁。第二种方法更有效,并且对于完全分离的消费和处理是必要的。

示例代码

思路:主线程拉取一批Kafka消息,以分区为最小粒度创建任务,交给线程池处理,每个任务处理一个分区的数据,主线程轮询任务消费情况,提交offset。

创建任务

package com.mirai.boot.kafka;  
  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
  
import java.util.List;  
import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.atomic.AtomicLong;  
import java.util.concurrent.locks.ReentrantLock;  
  
/**  
 * @author mirai  
 * @version 2021/9/23  
 * @since 1.8  
 */
@Slf4j  
public class MyConsumerTask implements Runnable {  
    private final List<ConsumerRecord<String, String>> records;  
  
 private volatile boolean stopped = false;  
  
 private volatile boolean started = false;  
  
 private volatile boolean finished = false;  
  
 private final CompletableFuture<Long> completion = new CompletableFuture<>();  
  
 private final ReentrantLock startStopLock = new ReentrantLock();  
  
 private final AtomicLong currentOffset = new AtomicLong();  
  
 public MyConsumerTask(List<ConsumerRecord<String, String>> records) {  
        this.records = records;  
 }  
  
    @Override  
 public void run() {  
        startStopLock.lock();  
 try {  
            if (stopped) {  
                return;  
 }  
            started = true;  
 } finally {  
            startStopLock.unlock();  
 }  
  
        for (ConsumerRecord<String, String> record : records) {  
            if (stopped) {  
                break;  
 }  
            // process record here and make sure you catch all exceptions;  
 currentOffset.set(record.offset() + 1);  
 }  
        finished = true;  
 completion.complete(currentOffset.get());  
 }  
  
    public long getCurrentOffset() {  
        return currentOffset.get();  
 }  
  
    public void stop() {  
        startStopLock.lock();  
 try {  
            this.stopped = true;  
 if (!started) {  
                finished = true;  
 completion.complete(currentOffset.get());  
 }  
        } finally {  
            startStopLock.unlock();  
 }  
    }  
  
    public long waitForCompletion() {  
        try {  
            return completion.get();  
 } catch (InterruptedException | ExecutionException e) {  
            return -1;  
 }  
    }  
  
    public boolean isFinished() {  
        return finished;  
 }  
}

实现 ConsumerRebalanceListener

package com.mirai.boot.kafka.demo;  
  
import lombok.AllArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.clients.consumer.OffsetAndMetadata;  
import org.apache.kafka.common.TopicPartition;  
  
import java.util.Collection;  
import java.util.HashMap;  
import java.util.Map;  
  
/**  
 * @author mirai  
 */
@Slf4j  
@AllArgsConstructor  
public class MultiThreadedRebalancedListener implements ConsumerRebalanceListener {  
    private final KafkaConsumer<String, String> consumer;  
 private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks;  
 private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;  
  
 @Override  
 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {  
  
        // 1. Stop all tasks handling records from revoked partitions  
 Map<TopicPartition, MyConsumerTask<String, String>> stoppedTask = new HashMap<>();  
 for (TopicPartition partition : partitions) {  
            MyConsumerTask<String, String> task = activeTasks.remove(partition);  
 if (task != null) {  
                task.stop();  
 stoppedTask.put(partition, task);  
 }  
        }  
  
        // 2. Wait for stopped tasks to complete processing of current record  
 stoppedTask.forEach((partition, task) -> {  
            long offset = task.waitForCompletion();  
 if (offset > 0) {  
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));  
 }  
        });  
  
 // 3. collect offsets for revoked partitions  
 Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();  
 partitions.forEach(partition -> {  
            OffsetAndMetadata offset = offsetsToCommit.remove(partition);  
 if (offset != null) {  
                revokedPartitionOffsets.put(partition, offset);  
 }  
        });  
  
 // 4. commit offsets for revoked partitions  
 try {  
            consumer.commitSync(revokedPartitionOffsets);  
 } catch (Exception e) {  
            log.warn("Failed to commit offsets for revoked partitions!");  
 }  
    }  
  
    @Override  
 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {  
        consumer.resume(partitions);  
 }  
  
}

多线程消费

package com.mirai.boot.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author mirai
 * @version 2021/9/23
 * @since 1.8
 */
@Slf4j
public class MyMultiTreadConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks = new HashMap<>();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private long lastCommitTime = System.currentTimeMillis();

    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            0L
            , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            },
            new ThreadPoolExecutor.DiscardPolicy()
    );

    public MyMultiTreadConsumer(Properties properties, String topic) {
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic)
                , new MultiThreadedRebalancedListener(consumer, activeTasks, offsetsToCommit));
    }

    @Override
    public void run() {
        try {
            while (!stopped.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                handleFetchedRecords(records);
                checkActiveTasks();
                commitOffsets();
            }
        } catch (WakeupException we) {
            if (!stopped.get()) {
                throw we;
            }
        } finally {
            consumer.close();
        }
    }

    private void handleFetchedRecords(ConsumerRecords<String, String> records) {
        if (records.count() > 0) {
            List<TopicPartition> partitionsToPause = new ArrayList<>();
            records.partitions().forEach(partition -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                MyConsumerTask<String, String> task = new MyConsumerTask<>(partitionRecords);
                partitionsToPause.add(partition);
                EXECUTOR.submit(task);
                activeTasks.put(partition, task);
            });
            consumer.pause(partitionsToPause);
        }
    }

    private void checkActiveTasks() {
        List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
        activeTasks.forEach((partition, task) -> {
            if (task.isFinished()) {
                finishedTasksPartitions.add(partition);
            }
            long offset = task.getCurrentOffset();
            if (offset > 0) {
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
            }
        });
        finishedTasksPartitions.forEach(activeTasks::remove);
        consumer.resume(finishedTasksPartitions);
    }

    private void commitOffsets() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - lastCommitTime > 5000) {
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit);
                    offsetsToCommit.clear();
                }
                lastCommitTime = currentTimeMillis;
            }
        } catch (Exception e) {
            log.error("Failed to commit offsets!", e);
        }
    }
}

参考连接

标签:消费,partition,private,Kafka,new,kafka,import,多线程,consumer
来源: https://www.cnblogs.com/zhangxijun/p/15325611.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有