ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka

2022-08-16 15:01:30  阅读:161  来源: 互联网

标签:-- offset kafka topic props put


 

另外找一个zk,有客户端命令的
.\zkCli.cmd -server 127.0.0.1:2181
ls /brokers 查看注册信息

1,kafka不支持分布式事务消息 不支持消费失败重试
2,kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker
3,RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。
4,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除
https://blog.csdn.net/pengweismile/article/details/117636252

https://kafka.apache.org/quickstart
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --topic topic-xmh --bootstrap-server localhost:9092
.\bin\windows\kafka-topics.bat --describe --topic topic-xmh --bootstrap-server localhost:9092

.\bin\windows\kafka-console-consumer.bat --topic topic-xmh --from-beginning --bootstrap-server localhost:9092 
.\bin\windows\kafka-console-producer.bat --topic topic-xmh --bootstrap-server localhost:9092


https://blog.csdn.net/syc0616/article/details/118156641
producer配置
bootstrap.servers: kafka的地址。
acks:消息的确认机制,默认值是0。
acks=0:如果设置为0,生产者不会等待kafka的响应。
acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。
batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

public class ProMy {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 1);//16384
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer producer = new KafkaProducer(props);
        producer.send(new ProducerRecord<String, String>("topic-xmh","xingkey2","xingvalues5"));
        producer.close();
        System.out.println("*************end-procuder");
    }
}

consumer配置
bootstrap.servers: kafka的地址。
group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
enable.auto.commit:是否自动提交,默认为true。
auto.commit.interval.ms: 从poll(拉)的回话处理时长。
session.timeout.ms:超时时间。
max.poll.records:一次最大拉取的条数。
auto.offset.reset:消费规则,默认earliest 。
earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
        props.put("group.id", "group_x3");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
//        props.put("max.poll.records", 1);
        props.put("auto.offset.reset", "earliest");
//        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("topic-xmh"));
        //for(;;) {
            ConsumerRecords<String, String> msgList = consumer.poll(1000);
//            System.out.println("consumer*******:" + msgList);
            for (ConsumerRecord<String, String> record:msgList){
                System.out.println("**************consumer*******record1:"+record+","+record.key()+","+record.value());
            }
        //}

//        consumer.close();
    }

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>1.0.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>

  

 

标签:--,offset,kafka,topic,props,put
来源: https://www.cnblogs.com/xingminghui111/p/16591577.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有