ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

kafka[低阶api & 高阶api & 新api区别|auto.offset.reset参数选择]_CodingPark编程公园

2021-04-13 18:00:41  阅读:290  来源: 互联网

标签:reset String auto partition kafka topic api offset new



文章介绍
本文主要介绍了kafka低阶api & 高阶api & 新api区别以及auyo.offset.reset的参数如何使用
文章开头展示这张图的意义在于让读者清楚看到Broker |Partition |rep 三者关系


低阶api & 高阶api & 新api区别

低阶 API 的特点

优点
● 开发者自己控制offset,想从哪里读取就从哪里读取。
● 自行控制连接分区,对分区自定义进行负载均衡
● 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可,比如存在文件或者内存中)

缺点
● 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等

package com.csdn.kafka.consumer;

import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import util.ZkUtil;

import java.nio.ByteBuffer;
import java.util.*;

/**
 * Created by ag on 2020/5/9.
 */
public class SimpleConsumerAPI {


    public static void main(String[] args)throws Exception {

        String zkString = "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181";
        String broker = "192.168.1.115";

        int port = 9092;
        int buffersize = 64*1024;
        String clientId = "clientId";
        String topic = "test";

        long whichTime = kafka.api.OffsetRequest.EarliestTime();

        int timeout = 6000;
        ZooKeeper zk = new ZooKeeper(zkString, timeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent);
            }
        });

        List<String> partitions = zk.getChildren("/brokers/topics/test/partitions", true);
        System.out.println(partitions);

        for(String p:partitions){
            int partition = Integer.valueOf(p);
            String leader = getLeader(timeout, broker, port, partition,
                    buffersize, clientId, topic);

            byte[] data = ZkUtil.getData("/consumers/test/testgroup/" + partition);
            long readOffset = Long.valueOf(new String(data).trim());
            System.out.println(readOffset);

            new Thread(new ReadDataTask(timeout, port, partition, buffersize,
                    clientId, topic, leader, readOffset,14110)).start();

        }

    }

    private static void fetchData(int timeout, int port, int partition, int buffersize,
                                  String clientId, String topic, String leader, long readOffset) {
        SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
        kafka.api.FetchRequest request = new FetchRequestBuilder()
                .addFetch(topic,partition,readOffset,100000)
                .clientId(clientId)
                .build();
        FetchResponse fetch = simpleConsumer.fetch(request);
        ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
        Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
        while(iterator.hasNext()){
            MessageAndOffset next = iterator.next();
            long offset = next.offset();
            long nextoffset = next.nextOffset();
            Message message = next.message();
            ByteBuffer payload = message.payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(partition
                    +"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
        }
    }


    private static long getReadOffset(long whichTime,int timeout, int port,
                                      int partition, int buffersize, String clientId, String topic, String leader) {
        SimpleConsumer offsetConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
                = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);
        PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);

        requestInfo.put(topicAndPartition,partitionOffsetRequestInfo);

        OffsetRequest offsetRequest =
                new OffsetRequest(requestInfo,kafka.api.OffsetRequest.CurrentVersion(),clientId);
        OffsetResponse offsetsBefore = offsetConsumer.getOffsetsBefore(offsetRequest);
        long[] offsets = offsetsBefore.offsets(topic, partition);
        return offsets[0];
    }

    private static String getLeader(int timeout, String broker, int port,
                                    int partition, int buffersize, String clientId, String topic) {
        String leader = "";
        SimpleConsumer leaderConsumer = new SimpleConsumer(broker,port,timeout,buffersize,clientId);
        TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic));
        TopicMetadataResponse topicMetadataResponse = leaderConsumer.send(topicMetadataRequest);
        List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
        for(TopicMetadata topicMetadata:topicMetadatas){
            List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
            for(PartitionMetadata partitionMetadata:partitionMetadatas){
                if(partitionMetadata.partitionId() == partition){
                    leader =  partitionMetadata.leader().host();
                }
            }
        }
        return leader;
    }
}


class ReadDataTask implements Runnable{

    int timeout;
    int port;
    int partition;
    int buffersize;
    String clientId;
    String topic;
    String leader;
    long readOffset;
    long stopOffset;

    public ReadDataTask(int timeout, int port, int partition,
                        int buffersize, String clientId,
                        String topic, String leader, long readOffset,long stopOffset) {
        this.timeout = timeout;
        this.port = port;
        this.partition = partition;
        this.buffersize = buffersize;
        this.clientId = clientId;
        this.topic = topic;
        this.leader = leader;
        this.readOffset = readOffset;
        this.stopOffset= stopOffset;
    }

    public void run() {
        boolean flag =  true;
        int count = 0;
        while(flag){
            SimpleConsumer simpleConsumer = new SimpleConsumer(leader,port,timeout,buffersize,clientId);
            kafka.api.FetchRequest request = new FetchRequestBuilder()
                    .addFetch(topic,partition,readOffset,100000)
                    .clientId(clientId)
                    .build();
            FetchResponse fetch = simpleConsumer.fetch(request);
            ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic, partition);
            Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
            while(iterator.hasNext()){
                count ++;
                MessageAndOffset next = iterator.next();
                long offset = next.offset();
                if(offset>stopOffset){
                    flag = false;
                    break;
                }
                long nextoffset = next.nextOffset();
                readOffset = nextoffset;
                Message message = next.message();
                ByteBuffer payload = message.payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(Thread.currentThread().getName()+"\t"+partition
                        +"\t"+new String (bytes)+"\t"+offset+"\t"+nextoffset);
            }

            try {
                ZkUtil.setData("/consumers/test/testgroup/"+partition,readOffset+"");
            } catch (Exception e) {
                e.printStackTrace();
            }


            if(count ==0){
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()+":sleep 1000ms");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            count = 0;
        }

    }
}


 

线程与分区的关系
3个分区 2个线程 ——> 一个线程消费两个,另一个线程消费一个(线程一旦消费了某分区则不再中途变换)
3个分区 3个线程 ——> 每一个线程消费一个
3个分区 5个线程 ——> 5个线程都进来了,但是只有3个干活

高阶 API 的特点

优点
● 高级API写起来简单
● 不需要去自行去管理offset,系统通过zookeeper自行管理
● 不需要管理分区,副本等情况,系统自动管理
● 消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据(默认设置5s更新一下 zookeeper 中存的的offset),版本为0.10.2
● 可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

缺点
● 不能自行控制 offset(对于某些特殊需求来说)
● 不能细化控制如分区、副本、zk 等

package com.csdn.kafka.consumer;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 set /consumers/testgroup2/offsets/test/0 14000
 set /consumers/testgroup2/offsets/test/1 14000
 set /consumers/testgroup2/offsets/test/2 14000
 */
public class HighLevelConsumer {


    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.1.115:2181,192.168.1.116:2181,192.168.1.117:2181");
        props.put("group.id", "last");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "5000");
        props.put("auto.offset.reset", "largest");//largest,smallest
        props.put("auto.commit.enable", "true");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector javaConsumerConnector =
                Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        String topic = "test";
        topicCountMap.put(topic,3);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =
                javaConsumerConnector.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get(topic);
        for(KafkaStream<byte[], byte[]> kafkaStream:kafkaStreams){
            new Thread(new ReadDataHigh(kafkaStream)).start();
        }

//        Thread.sleep(12000);
//        javaConsumerConnector.shutdown();
//        System.out.println("------------------------------------------------------------");


    }
}


class ReadDataHigh implements Runnable{

    KafkaStream<byte[], byte[]> kafkaStream;

    public ReadDataHigh(KafkaStream<byte[], byte[]> kafkaStream) {
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
        System.out.println(Thread.currentThread().getName()+"==============================");
        while(iterator.hasNext()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            MessageAndMetadata<byte[], byte[]> next = iterator.next();
            int partition = next.partition();
            long offset = next.offset();
            byte[] message = next.message();
            System.out.println(Thread.currentThread().getName()+"\t"
                    +partition+"\t"+offset+"\t"+new String(message));
        }
    }
}


 

package com.csdn.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class NewConsumer {

    public static void main(String[] rgs) throws InterruptedException {

//        autoOffset();
        manualOffset();

//        int partition = Math.abs("testasdf1".hashCode()) % 50 ;
//        System.out.println(partition);


    }

    private static void manualOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
        props.put("group.id", "testasdf1");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        String topic = "test";
//        kafkaConsumer.subscribe(Collections.singletonList(topic));

        TopicPartition topicPartiton0 = new TopicPartition(topic,0);
        TopicPartition topicPartiton1 = new TopicPartition(topic,1);
        TopicPartition topicPartiton2 = new TopicPartition(topic,2);
        kafkaConsumer.assign(Arrays.asList(topicPartiton0,topicPartiton1,topicPartiton2));
        kafkaConsumer.seek(topicPartiton0,21000);
        kafkaConsumer.seek(topicPartiton1,21000);
        kafkaConsumer.seek(topicPartiton2,21000);

        int count = 100;
        List<String> values = new ArrayList<String>();
        while(true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                int partition = record.partition();
                long offset = record.offset();
                String value = record.value();
                values.add(value);
                System.out.println(partition+"\t"+offset+"\t"+value);
            }

            if(values.size()>count){
                kafkaConsumer.commitSync();
                values.clear();
                System.out.println("manual commit offset");
            }
        }
    }

    private static void autoOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
        props.put("group.id", "testasdfg");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");//默认是lastest; latest, earliest
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        String topic = "test";
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while(true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                int partition = record.partition();
                long offset = record.offset();
                String value = record.value();
                System.out.println(partition+"\t"+offset+"\t"+value);
            }
        }
    }


}


相关资料

标签:reset,String,auto,partition,kafka,topic,api,offset,new
来源: https://blog.csdn.net/u011250186/article/details/115674070

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有