ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka:(2) 生产者

2021-10-18 09:34:24  阅读:119  来源: 互联网

标签:重试 生产者 分区 kafka 发送 消息 ProducerRecord


一、向Kafka发送消息的主要步骤

  • 我们从创建一个 ProducerRecord 对象开始,ProducerRecord对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输 。
  • 接下来,数据被传给分区器。如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord对象的键来选择一个分区 。选好分区以后 ,生产者就知道该往哪个主题和分区发送这条记录了。
  • 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
  • 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

  

二、整体架构

  消息在通过 send()方法发往 broker 的过程中,有可能需要经历拦截器(lnterceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。

  • 拦截器:生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
  • 序列化器:把对象转换成字节数组才能通过网络发送给Kafka。在对侧,消费者需要用反序列化器把从Kafka中收到的字节数组转换成相应的对象。
  • 分区器:拦截器 一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。在默认分区器DefaultPartitioner的实现中,如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法 ,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区。如果key不为null,那么计算得到的分区号会是所有分区中的任意一个,如果key为null并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个,两者是有区别的。

  

  那么在此之后又会发生什么呢?

  整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

三、发送消息的3种方式

  • 发送并忘记(fire-and-forget):我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发。不过这种方式有时候也会丢失一些消息。
  • 同步发送:我们使用send()方法发送消息,它会返回一个Future对象。调用get()方法进行等待,就可以知道消息是否发送成功。
ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

  调用 Future对象的get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它获取消息的偏移量。

  • 异步发送:我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
producer.send(producerRecord,new DemoProducerCallBack());

class DemoProducerCallBack implements Callback {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

  为了使用回调,需要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个接口只有一个 onCompletion方法。如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常。

  生产者是可以使用多线程来发送消息的,刚开始的时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以在生产者数量不变的前提下增加线程数量。如果这样做还不够,可以增加生产者数量。

  KafkaProducer一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,"无主"错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试来解决,比如消息过大异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。

四、生产者的配置

  • acks:指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。

  如果 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。如果当中出现了问题,导致服务器没有收到消息,生产者就无从得知,消息也就丢失了。可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  如果 acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没选举出来)生产者收到一个错误响应,为了避免数据丢失,生产者会重发消息。
  如果 ack=all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不止一个服务器节点接收消息。

  • buffer.memory:用来设置生产者内存缓冲区的大小,用来缓冲发送到服务器的消息。
  • retries:生产者从服务器收到的错误有可能是临时性的错误(例如分区找不到首领),retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,可以通过 retry.backoff.ms 参数来改变这个时间间隔。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误,只需要处理那些不可重试的错误或重试次数超出上限的情况。
  • batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,当批次被填满,批次里的所有消息会被发送出去。
  • linger.ms:指定生产者在发送批次之前等待更多消息加入批次的时间。kafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。
  • max.in.flight.requests.per.connection:指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提高吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发送了重试。
  • timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms:request.timeout.ms 指定了生产者发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据时等待服务器返回响应的时间。如果等待超时,那么生产者要么重试发送,要么返回一个错误。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 acks 的配置相匹配,如果在指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误。
  • max.request.size:用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,或者生产者可以在单个请求里发一个批次,该批次有100个消息,每个消息大小为10KB,另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes)所有两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒接。
  • receive.buffer.bytes 和 send.buffer.bytes:分别指 TCP socket 接收和发送数据包的缓冲区大小。

五、顺序保证

  Kafka可以保证同一个分区里的消息是有序的。即如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序去读取它们。
  如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retires设为0 。可以把max.in.flight.request.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

六、分区

  ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数的应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。
  如果键为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
  如果键不为空,并且使用了默认分区器,那么 Kafka 会对键进行散列(使用Kafka 自己的散列算法)然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。意味着,如果写入数据的分区时不可用的,那么就会发生错误。
  只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,这些就无法保证了,旧数据仍然留在旧分区,但新记录可能被写到其他分区上。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

  自定义分区: 

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {} 
    
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String))) ➋
    throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))
    return numPartitions; // Banana总是被分配到最后一个分区
    
   // 其他记录被散列到其他分区
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }    

  public void close() {}
}
  • Partitioner接口包含了configure、partition和close这3个方法,不应该直接在partition方法里硬编码客户的名字,应该通过configure方法传进来。
  • 这里只接受字符串作为键,如果不是字符串,就抛出异常。

 

标签:重试,生产者,分区,kafka,发送,消息,ProducerRecord
来源: https://www.cnblogs.com/zjxiang/p/15383949.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有