ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)

2020-12-04 15:59:09  阅读:133  来源: 互联网

标签:拦截器 Producer send Kafka 发送 record 序列化


Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)

Kafka生产者是整个Kafka架构中的一个角色,可以是不同集成了Kafka的组件,KafkaProducer是线程安全的,可以同时给多个线程使用。

1 如何构建一个KafkaProducer

构建一个KafkaProducer的构造方法有2种:

//首先配置Producer必要配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"fast practice producer");

//根据配置创建生产者实例,这是常用方式
KafkaProducer producer = new KafkaProducer<K,V>(properties)
  
//假如在properties中没有配置序列化器,也可以在构造器中指定,实际底层原理都一样
KafkaProducer producer = new KafkaProducer<K,V>(properties,new StringSerializer(),new StringSerializer())

2 创建生产者消息ProducerRecord并发送消息

ProducerRecord的构造方法有很多种,根据不同的需求可以选择不同的构造器。

//创建一个生产者消息对象
ProducerRecord<String, String> message = new ProducerRecord<String, String>(
  "topic1",
  "hello_" + System.currentTimeMillis());

//通过生产者发送消息
try{
    //Future代表一个任务的生命周期,从Future中可以获取该消息的源数据信息,如partition,offset等
    Future<RecordMetadata> future = producer.send(message);
} catch (Exception e) {
  	e.printStackTrace();
} finally {
  	//关闭生产者
  	producer.close();
}

3 发送消息的3种模式

Kafka生产者发送消息有3种模式

  • fire-and-forget 发送即忘

    • 只管往Broker中发送消息,不管消息是否正确到达,这种模式在大多数场景下是没有问题;
    • 在遇到不可重试异常时可能会造成数据丢失;
    • 这是模式性能最高,可靠性最差。

    实现方式

    try{
    //Future代表一个任务的生命周期
    Future<RecordMetadata> future = producer.send(message);
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
  • sync 同步

    • 可以配合get()方式阻塞kafka的响应,实现消息的同步发送,可靠性很高,但是性能较低,因为需要阻塞等待上一条消息发送完;
    • 该方法的调用要么消息发送成功,要么发送异常,当发送异常时需要使用外部逻辑进行处理。

    实现方式

    try{
    //通过get()阻塞来等待kafka的响应,要么发送成功,要么异常,异常的话交给外部逻辑处理
    producer.send(message).get();
      
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
    try{
    //这种方式实际上与上面的方式是一样的,如果你需要用到消息的元数据信息,就可以选择这种方式,否则使用上面这个方式更省事。
    Future<RecordMetadata> future = producer.send(message);
    RecordMetaData metaData = future.get()
    metaData.offset() //获取元数据信息
    metaData.partition()
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
  • async 异步

    • send()方法本身就是异步的,该方法返回的Future对象代表发送一个消息的生命周期,它可以使调用方在消息发送之后获取发送结果;
    • Future.get()方法可以获取被成功发送消息的元数据信息,如offset、partition
    //实现消息的异步发送一般使用send(message,Callback cb)这种回调函数的重载方式,其中Callback是在kafka有响应之后调用,假如没有响应,那么不会掉用。
    
    try{
      	//使用回调函数
        producer.send(message, new Callback() {
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
              //处理
              e.printStackTrace();
            } else {
              System.out.println(recordMetadata.topic()
                                 + ":" + recordMetadata.partition()
                                 + ":" + recordMetadata.offset());
            }
    
          }
        });
    }catch{
      //handle this
    }
    

    注意:回调函数Callback{}中onComplete()方法中的2个参数是互斥的,如果Exception为Null,那么RecordMetadata就不为Null,反之亦然

4 KafkaProducer中的异常

4.1异常分类

KafkaProducer中一般会发生2种异常

  • 可重试的异常
    • NetworkException
      • 网络瞬间故障导致的异常,重试可以恢复
    • LeaderNotAvailableException
      • 标示分区的leader不可用,通常是发生在旧的leader下线而新的leader被选举出来之前,重试可恢复。
    • UnknownTopicOrPartitionException
    • NotEnoughReplicasException
    • NotCoordinatorException
  • 不可重试的异常
    • RecordTooLargeException
      • 代表生产者发送的消息太大,超过了配置中设置的最大900+kb,kafka对此不会进行任何重试,直接抛出异常!

4.2 可重试异常解决

//设置重试次数,但是重试10次之后没有恢复还是会抛出异常,此时需要外部逻辑处理
props.put(ProducerConfig.RETRIES_CONFIG, 10);

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常 ,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条信息。

5 Key-Value的序列化器

因为Producer的数据按照序列化的方式传输到Kafka,所以需要指定KV的序列化方式。源码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 *  String类型的编码方式默认是UTF-8,编码方式可以通过在properties中指定:
 *  key.serializer.encoding,value.serializer.encoding 或者 serializer.encoding. 
 *  前2个编码方式比后者优先级高。
 */
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8"; //默认编码UTF-8s

    @Override //configure方法确定key、value的编码方式
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override //将String类型按照编码方式转换成字节数组,如果为null,返回null
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

注意,假如Kafka自带的序列化方式都不能满足业务需求,那么可以Avro、JSON Thrift、 ProtoBuf、 Protostuff 等通用的序列化工具来实现,或者使用自定义序列化器的方式实现,在properties配置参数中指定就好。

6 分区器Partitioner

KafkaProducer的消息在send()之后,可能会经过序列化器、拦截器、分区器之后才会真正到达Kafka-Broker中的Partition中!~以下是send()之后数据的流向。

Interceptor => Serializer => Partitioner

6.1 具体源码如下

// step-1
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
  return send(record, null);
}

//step-2
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  // 此处拦截消息记录, 这个拦截之后将数据操作之后然后发送到下一层,onsend()方法就是将record修改之后生成拦截修改之后的interceptedRecord。
  ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  return doSend(interceptedRecord, callback); //该方法doSend()也是kafkaProducer中的方法,下一次进入该方法
}

//step-3
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // 首先确认topic的元数据是可以用的
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey; //定义Key的序列化结果
            try {
              	//将key进行序列化
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
              //将value进行序列化
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
          	//该方法根据传的参数,按照分区器DefaultPartioner为每条Record指定所属分区。
            int partition = partition(record, serializedKey, serializedValue, cluster);
						........
         		//and so on 
              
//step-4
//我们点进去partition()方法,发现这个方法来自于一个接口Partitioner,Partitioner implements Configuable,是可以配置的
//Kafka源码中该Partitioner的实现类只有一个DefaultPartitioner,点进该类,查看重写的partition()方法,如下:
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //为每个序列化之后的key的字节数组按照Murmur2算法进行hash,然后找到对应的Record的分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
            

7 生产者-拦截器Interceptor

Kafka的拦截器在Kafka-0.10.0.0被引入,Kafka有2种拦截器

  • 生产者拦截器
  • 消费者拦截器

我们自定义拦截器只需要implements ProducerInterceptor就行,ProducerInterceptor与Partitioner一样,都继承自同一个父接口Configurable。

ProducerInterceptor中有3种方法,在Kafka中拦截器没有默认的实现类。

onSend(ProducerRecord<K, V> record); //
onAcknowledgement(RecordMetadata metadata, Exception exception);
close()

7.1 简单的拦截器示例

public class ProducerinterceptorPrefix implements 
Producerinterceptor<String, String>{
  //to do something that you aim to 
}

7.2 拦截器部署加载

在自定义完拦截器之后,我们需要在Producer的配置参数中设置。

//这里可以指定多个类,类的全类名之间通过`,`进行分割
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.shufang.interceptor.ProducerinterceptorPrefix"+","+"com.shufang.interceptor.ProducerinterceptorPrefix2");

标签:拦截器,Producer,send,Kafka,发送,record,序列化
来源: https://blog.csdn.net/shufangreal/article/details/110650641

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有