ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka 生产者(二)

2022-03-30 07:31:07  阅读:166  来源: 互联网

标签:ProducerConfig 生产者 KafkaProducer put new kafka CONFIG properties


1、提高吞吐量

想要提高生产者的吞吐量可以通过调整一下4个参数来实现

  1. batch.size:批次大小,默认16k
  2. linger.ms:等待时间,修改为5-100ms
  3. compression.type:压缩snappy
  4. RecordAccumulator:缓冲区大小,修改为64m

代码实现

public class CustomProducerParameters {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //链接
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092");
        //KV 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//32M
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//16k
        //linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//1ms
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello wdh01"));
        }
        //关闭资源
        kafkaProducer.close();
    }
}

 2、提高数据可靠性

发送流程

ACK 应答级别

 

 数据完全可靠条件=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2

 可靠性总结:

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

  测试

public class CustomProducerAcks {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //配置 ACK 级别
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        //重试最大次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        //指定 kv 的序列化类型
        //1、创建 生产者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //2、发送数据 put异步发送
        for (int i = 0; i < 5; i++) {
            KafkaProducer.send(new ProducerRecord<>("first", i + "  hello wdh01"));
        }
        //3、关闭资源
        KafkaProducer.close();
    }
}

3、数据去重

3.1、数据传输语义

  • 至少一次(AtLeastOnce)=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
  • 最多一次(AtMostOnce)=ACK级别设置为0•总结:AtLeastOnce可以保证数据不丢失,但是不能保证数据不重复;AtMostOnce可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次(ExactlyOnce):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka0.11版本以后,引入了一项重大特性:幂等性和事务

3.2、幂等性

幂等性指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(ExactlyOnce)=幂等性+至少一次(ack=-1+分区副本数>=2+ISR最小副本数量>=2)

重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;SequenceNumber是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复

 如何使用幂等性:开启参数enable.idempotence 默认为true,false关闭

3.3、事务

说明:开启事务,必须开启幂等性。

 

Producer 在使用事务功能前,必须先自定义一个唯一的transactional.id。有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务

事务测试

public class CustomProducerTranaction {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop103:9092,");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myshiwu");
        //指定 kv 的序列化类型
        //1、创建 生产者
        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<String, String>(properties);
        //初始化事务
        KafkaProducer.initTransactions();
        //开启事务
        KafkaProducer.beginTransaction();

        //2、发送数据 put异步发送
        try {
            for (int i = 0; i < 5; i++) {
                KafkaProducer.send(new ProducerRecord<>("first", i + "  hello wdh01"));
            }
            //提交事务
            KafkaProducer.commitTransaction();
        } catch (Exception e) {
            //终止事务
            KafkaProducer.abortTransaction();
        } finally {
            //3、关闭资源
            KafkaProducer.close();
        }
    }
}

4、数据有序

单分区内,有序(有条件的,详见下节);多分区,分区与分区间无序;

5、数据无序

kafka在1.x版本之前保证数据单分区有序的条件 max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

kafka在1.x及以后版本保证数据单分区有序分两者情况

  1. 未开启幂等性 max.in.flight.requests.per.connection需要设置为1
  2. 开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5

原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

 

标签:ProducerConfig,生产者,KafkaProducer,put,new,kafka,CONFIG,properties
来源: https://www.cnblogs.com/wdh01/p/16072595.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有