ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【kafka】生产者API 回调 同步

2020-01-13 15:53:44  阅读:240  来源: 互联网

标签:生产者 partition kafkaProducer kafka API put new properties metadata


普通实现


public class MyProducer {
    public static void main(String[] args) {
        /**
         * 创建Kafka生产者配置信息:ProducerConfig类中记录了Kafka需要的所有参数信息
         * 1.指定连接的Kafka集群
         * 2.ack应答级别
         * 3.发送失败的重试次数
         * 4.批次大小(一次发送多少大小数据)
         * 5.等待时间
         * 6.RecordAccumulator缓冲区大小
         * 7.指定key,value序列化类
         */
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         *  通过配置文件创建生产者对象
         */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
        // 创建记录ProducerRecord("Topic","partition","key","value")
            ProducerRecord<String, String> message =
                    new ProducerRecord<String, String>("test", 0,"MyProducer","hello" + i);
            // send:异步方法,发送之后,立即返回,并不是说调用了,就真的发送成功了;
            kafkaProducer.send(message);
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

同步实现

public class MyProducerFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "hello" + i);
            /**
             * 同步发送,send返回 Future对象
             * 调用get()
             * 返回RecordMetadata元数据记录,记录了发送的topic,partition,offset
             */
            RecordMetadata metadata = kafkaProducer.send(message).get();
            String topic = metadata.topic();
            int partition = metadata.partition();
            long offset = metadata.offset();
            System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

回调实现

public class MyProducerCallback {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "messuesein--" + i);
            /**
             * 发送消息:带回调
             * 传入CallBack函数接口,参数:
             * 1. RecordMetadata:成功返回元数据记录
             * 2. Exception:失败返回异常
             */
            kafkaProducer.send(message, (metadata, exception) -> {
                // exception==null,即成功
                if (exception == null) {
                    /**
                     * metadata记录元数据信息
                     */
                    String topic = metadata.topic();
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
                } else {
                    exception.printStackTrace();
                }
            });
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

标签:生产者,partition,kafkaProducer,kafka,API,put,new,properties,metadata
来源: https://www.cnblogs.com/mussessein/p/12187626.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有