ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka拦截器

2021-02-13 03:01:04  阅读:154  来源: 互联网

标签:拦截器 队列 Kafka headers 消息 延时 超时


一、消费者拦截器

  消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。

  自定义消费者拦截器需实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,源码如下:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {

    /**
     * 在 KafkaConsumer#poll 方法返回之前调用此方法
     * 可以对消息做一些定制化操作,如修改消息内容、按照某种规则过滤消息、生成新的消息记录等
     * 如果此方法抛出异常,会被捕获并记录在日志中,但是不会再向上传递
     * 若配置了多个拦截器,调用顺序按照 ConsumerConfig#INTERCEPTOR_CLASSES_CONFIG 配置的先后
顺序。后面的拦截器得到的可能是前面拦截器修改后的记录 * * @param records * @return 处理之后的记录,将会传递给消费者或下一个拦截器 */ public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); /** * 提交offset之后调用此方法 * 可以用来记录跟踪所提交的位移信息 * * @param offsets */ public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); /** * 在拦截器关闭后调用 * 可用来执行一些资源清理工作 */ public void close(); }

 

  使用场景:

  在某些业务中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效,它也就不需要再被继续处理了。

  消费者拦截器可以用来实现消息的TTL(Time To Live)功能,根据消息的timestamp,将过期的消息过滤掉(放入死信队列中),不再投递给具体的消费者

 

  配置消费者拦截器:

  props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerTtlInterceptor.class.getName());

 

二、过期时间

  通过消息拦截器的方式可以过滤处理超时的消息,那超时多久可以认为是超时呢?是固定的时间,还是从消息中去获取超时时间?显然,消息自带超时时间是最好的。

  在消息发送时可以将超时时间以键值对的方式保存在消息的headers字段中。这样消费者消费到这条消息的时候可以在拦截器中根据headers字段设定的超时时间来判断此条消息是否超时。

 1、发送消息时设置headers

  headers字段涉及Headers和Header两个接口,Headers是对多个Header的封装,Header接口表示的是一个键值对。

  我们可以直接使用Kafka提供的 org.apache.kafka.common.header.internals.RecordHeaders 和 org.apache.kafka.common.header.internals.RecordHeader。

   如发送消息:

 // 构造方法:ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord(
                "test", // topic
                null,         // 不指定partition
                System.currentTimeMillis(), // 发送消息的事件戳
                null,               // key不指定
                "这是一条带headers的消息",  // msg
                new RecordHeaders().add(new RecordHeader("ttl", "20".getBytes(StandardCharsets.UTF_8)))
        );
        kafkaTemplate.send(producerRecord);

  接收消息:

       consumerRecord.timestamp();

            Headers headers = consumerRecord.headers();
            for (Header header : headers) {
                header.key();
                header.value();
            }

 

 

 

三、延时队列

1、什么是延时队列

  队列是存储消息的载体,延时队列存储的对象是延时消息。

  所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。

 

2、延时队列使用场景

  1)在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将被关闭,这时就可以使用延时队列来处理这些订单。

  2)订单完成1小时后通知用户进行评价

  3)用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时就可以将用户指令发送到延时队列,当指令的时间到了之后再将它推送到智能设备

 

3、Kafka实现延时队列

  原生的Kafka并不具备延时队列的功能,不过可以改造来实现延时队列(不建议使用Kafka来实现的延时队列)。

  方案1:(延时精度较低)

  1)在发送延时消息时,先将消息投递到延时队列(delay_topic)中(headers中设置延时时间,timestamp存消息发送初始发送时间戳)

  2)定义一个服务去消费延时队列中的消息,将满足条件的消息再投递到目标队列(target_topic)中。

  

标签:拦截器,队列,Kafka,headers,消息,延时,超时
来源: https://www.cnblogs.com/yangyongjie/p/14398369.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有