ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka

2022-03-27 13:04:53  阅读:176  来源: 互联网

标签:-- kafka arg0 kafkaProps put Kafka public


1、命令:

 启动zookeeper:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

   启动kafka:

bin\windows\kafka-server-start.bat config\server.properties

   创建topic:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic test

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

   创建生产者:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

   创建消费者:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

   展示主题信息(副本,ISR等)

bin\windows\kafka-topics.bat  --zookeeper localhost:2181 --describe --topic test

 

  查看所有主题

bin\windows\kafka-topics.bat  --zookeeper localhost:2181 --list

 

2、生产者操作

重要参数:

buffer.memory  //RecordAccumulator消息收集器的缓存大小,默认32MB。超过会抛出异常或阻塞,取决于max.block.ms参数

max.block.ms  //缓存溢出后,阻塞时间。默认60秒。

retries     //重发次数,

vetry.backoff.ms  //重发间隔时间

batch.size   //缓存参数,决定创建ProducterBatch的大小,关于ProducterBatch在P38。

acks      //1:leader收到消息即为成功,0:生产者发送消息后不等待任何相应 ,-1/all:leader和follwer全部都写入消息才为成功

 

 

   (1)、生产者添加消息:

复制代码
public class Kafka {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("acks", "all");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new  KafkaProducer<String,String>(kafkaProps);
//        consumer=new KafkaConsumer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("test", "444");
        try {
            System.out.println(producer.send(record).get());
            List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
            partitions = producer.partitionsFor("test");
            for(PartitionInfo p:partitions)
            {
                System.out.println(p);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

 

 

  (2)、生产者回调函数

复制代码
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata arg0, Exception arg1) {
                        System.out.println("回调函数!" + arg0.topic() + "   " + arg1);
                    }
                });
复制代码

 

 注:当执行成功时回调函数的Exception是null.

 

 (3)、生产者拦截器

 第一步、创建拦截器类

复制代码
public class MyInterceptor implements ProducerInterceptor<String, String>{
    @Override
    public void configure(Map<String, ?> arg0) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void onAcknowledgement(RecordMetadata arg0, Exception arg1) {
        // TODO Auto-generated method stub
        System.out.println("");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> arg0) {
        System.out.println("现在是拦截器"+arg0.topic());
        if(arg0.value().contains("wgy")) {
            return new ProducerRecord<String,String>(arg0.topic(),arg0.partition(),arg0.timestamp(),arg0.key(),"we is good");
        }
        return arg0;
    }
}
复制代码

 

 第二步、在生产者配置中配置

kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName());

 

可以配置多个拦截器,对个拦截器之间使用逗号隔开,例如:

        kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                MyInterceptor.class.getName()+","+MyInterceptor.class.getName()
                );

 

 

 消费者操作

重要参数

 

 

   (1)、消费者读取消息:

复制代码
public class KafkaRead {
    private static KafkaConsumer<String,String> consumer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<>(kafkaProps);
        consumer.subscribe(java.util.Collections.singletonList("test"));
        try {
            while(true) {
                ConsumerRecords<String, String> records=consumer.poll(100);

                for(ConsumerRecord<String, String> record:records) {
                    System.out.println(record.toString());
                }
            }
        }catch (Exception e) {
            // TODO: handle exception
        }
    }
}
复制代码

 

(5)、

标签:--,kafka,arg0,kafkaProps,put,Kafka,public
来源: https://www.cnblogs.com/wanglala/p/16062480.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有