标签:educoder org put kafka 入门篇 头歌 props apache import
头歌educoder-kafka入门篇
第一关kafka-初体验
#!/bin/bash
#1.创建一个名为demo的Topic
kafka-topics.sh -create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo
#2.查看所有Topic
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
#3.查看名为demo的Topic的详情信息
kafka-topics.sh -topic demo --describe --zookeeper 127.0.0.1:2181
第二关:生产者-简单模式
package net.educoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* kafka producer 简单模式
*/
public class App {
public static void main(String[] args) {
/**
* 1.创建配置文件对象,一般采用 Properties
*/
/**----------------begin-----------------------*/
Properties props = new Properties();
/**-----------------end-------------------------*/
/**
* 2.设置kafka的一些参数
* bootstrap.servers --> kafka的连接地址 127.0.0.1:9092
* key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer
* acks:1,-1,0
*/
/**-----------------begin-----------------------*/
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**-----------------end-------------------------*/
/**
* 3.构建kafkaProducer对象
*/
/**-----------------begin-----------------------*/
Producer<String, String> producer = new KafkaProducer<>(props);
/**-----------------end-------------------------*/
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("demo", i + "", i + "");
/**
* 4.发送消息
*/
/**-----------------begin-----------------------*/
producer.send(record);
/**-----------------end-------------------------*/
}
producer.close();
}
}
第三关:消费者-自动提交偏移量
package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class App {
public static void main(String[] args) {
Properties props = new Properties();
/**--------------begin----------------*/
//1.设置kafka集群的地址
props.put("bootstrap.servers", "127.0.0.1:9092");
//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "g1");
//3.开启offset自动提交
props.put("enable.auto.commit", "true");
//4.自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
//5.序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**---------------end---------------*/
/**--------------begin----------------*/
//6.创建kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//7.订阅kafka的topic
consumer.subscribe(Arrays.asList("demo"));
/**---------------end---------------*/
int i = 1;
while (true) {
/**----------------------begin--------------------------------*/
//8.poll消息数据,返回的变量为crs
ConsumerRecords<String, String> crs = consumer.poll(100);
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("consume data:" + i);
i++;
}
/**----------------------end--------------------------------*/
if (i > 10) {
return;
}
}
}
}
第四关:消费者-手动提交偏移量
package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class App {
public static void main(String[] args){
Properties props = new Properties();
/**-----------------begin------------------------*/
//1.设置kafka集群的地址
props.put("bootstrap.servers", "127.0.0.1:9092");
//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "g1");
//3.关闭offset自动提交
props.put("enable.auto.commit", "false");
//4.序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**-----------------end------------------------*/
/**-----------------begin------------------------*/
//5.实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//6.消费者订阅主题,订阅名为demo的主题
consumer.subscribe(Arrays.asList("demo"));
/**-----------------end------------------------*/
final int minBatchSize = 10;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
for (ConsumerRecord bf : buffer) {
System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
}
/**-----------------begin------------------------*/
//7.手动提交偏移量
consumer.commitSync();
/**-----------------end------------------------*/
buffer.clear();
return;
}
}
}
}
标签:educoder,org,put,kafka,入门篇,头歌,props,apache,import 来源: https://blog.csdn.net/qq_52313093/article/details/122099805
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。