ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

kafka2.5.0生产者与消费者,java普通main方法示例

2020-06-22 22:58:08  阅读:332  来源: 互联网

标签:kafka2.5 java 示例 kafka record org apache import properties


生产者:

import cn.enjoyedu.config.BusiConst;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * @author King老师   
 */
public class HelloKafkaProducer {

    public static void main(String[] args) {
        //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.2.61:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        try {
            ProducerRecord<String,String> record;
            try {
                //  发送4条消息
                for(int i=0;i<4;i++){
// 这里的key值为null,所以kafka会根据分区总数把数据负载均衡到每个分区,如果有值,则根据值来判断存到哪个分区。 record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC, null,"lison"+i); producer.send(record); System.out.println(i+",message is sent"); } } catch (Exception e) { e.printStackTrace(); } } finally { producer.close(); } } }

 

消费者:

import cn.enjoyedu.config.BusiConst;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author King老师   
 */
public class HelloKafkaConsumer {

    public static void main(String[] args) {
        /* 消费者三个属性必须指定(broker地址清单、key和value的反序列化器) */
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.2.61:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        //  群组并非完全必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //消费者订阅主题(可以多个)
            consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
            while(true){
                //TODO 拉取(新版本)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                            record.offset(),record.key(),record.value()));
                    // TODO
                }
            }

            //通过另外一个线程 consumer. wakeup()
        } finally {
            consumer.close();
        }

    }




}

 

end.

标签:kafka2.5,java,示例,kafka,record,org,apache,import,properties
来源: https://www.cnblogs.com/zhuwenjoyce/p/13179609.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有