ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Kafka高级API和低级API

2021-04-13 17:58:16  阅读:178  来源: 互联网

标签:offset kafka topic API new import 低级 consumer Kafka


 

Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级API。

高级API

1)高级API优点

高级API 写起来简单

不需要去自行去管理offset,系统通过zookeeper自行管理

不需要管理分区,副本等情况,系统自动管理

消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的的offset)

可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

2)高级API缺点

不能自行控制offset(对于某些特殊需求来说)

不能细化控制如分区、副本、zk等

低级API

1)低级 API 优点

能够开发者自己控制offset,想从哪里读取就从哪里读取。

自行控制连接分区,对分区自定义进行负载均衡

对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)

2)低级API缺点

太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

高级producer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

package com.sinoiov.kafka.test;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

 

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

 

/**

 * Created by caoyu on 16/4/21.

 * By 中交兴路 大数据中心-基础平台部

 */

public class Kafka_produce extends Thread{

    private String topic;

    private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd hh:mm:ss");

 

    public Kafka_produce(String topic){

        super();

        this.topic = topic;

    }

 

    @Override

    public void run() {

        Producer<String, String> producer = createProducer();

        long i = 0;

        while(true){

            i++;

            long now = System.currentTimeMillis();

            KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic,sdf.format(new Date(now))+"_"+i+"");

            producer.send(message);

            try {

                Thread.sleep(1000);

            catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

 

    private Producer<String,String> createProducer(){

        Properties properties = new Properties();

        properties.put("metadata.broker.list","192.168.110.81:9092,192.168.110.82:9092,192.168.110.83:9092");

        properties.put("serializer.class", StringEncoder.class.getName());

        properties.put("zookeeper.connect""nnn1:2181,nnn2:2181,nslave1:2181");

        return new Producer<String, String>(new ProducerConfig(properties));

    }

}

高级consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

package com.sinoiov.kafka.test;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

 

/**

 * Created by caoyu on 16/4/21.

 * By 中交兴路 大数据中心-基础平台部

 */

public class Kafka_consumer extends Thread {

    private String topic;

    private ConsumerConnector consumer;

 

    public Kafka_consumer(String topic){

        super();

        this.topic = topic;

        consumer = createConsumer();

    }

 

    public void shutDown(){

        if(consumer != null)

            consumer.shutdown();

    }

 

    @Override

    public void run() {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> messageSteam = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> steam = messageSteam.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = steam.iterator();

        while(iterator.hasNext()){

            String message = new String(iterator.next().message());

            System.out.println(message);

        }

    }

 

    private ConsumerConnector createConsumer(){

        Properties properties = new Properties();

        properties.put("zookeeper.connect","nnn1:2181,nnn2:2181,nslave1:2181");

        properties.put("group.id""testsecond");

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

    }

}

同样,代码也不复杂,基本都能参考看懂。

高级 API 的特点

优点

  • 高级 API 写起来简单

  • 不需要去自行去 管理offset,系统通过 zookeeper 自行管理

  • 不需要管理分区,副本等情况,系统自动管理

  • 消费者断线会自动根据上一次记录在 zookeeper 中的 offset去接着获取数据(默认设置1分钟更新一下 zookeeper 中存的的 offset)

  • 可以使用 group 来区分对同一个 topic 的不同程序访问分离开来(不同的 group 记录不同的 offset,这样不同程序读取同一个 topic 才不会因为 offset 互相影响)

缺点

  • 不能自行控制 offset(对于某些特殊需求来说)

  • 不能细化控制如分区、副本、zk 等

低级 API 的特点

优点

  • 能够开发者自己控制 offset,想从哪里读取就从哪里读取。

  • 自行控制连接分区,对分区自定义进行负载均衡

  • 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储 offset 即可,比如存在文件或者内存中)

缺点

  • 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等,请参考下面的低级 API 的示例代码

 

低级 API 示例代码

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

package com.sinoiov.kafka.test;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.BrokerEndPoint;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

/**

 * Created by caoyu on 16/4/26.

 * By 中交兴路 大数据中心-基础平台部

 */

public class SimpleExample {

    private List<String> m_replicaBrokers = new ArrayList<String>();

 

    public SimpleExample() {

        m_replicaBrokers = new ArrayList<String>();

    }

 

    public static void main(String args[]) {

        SimpleExample example = new SimpleExample();

        // 最大读取消息数量

        long maxReads = Long.parseLong("3");

        // 要订阅的topic

        String topic = "test1";

        // 要查找的分区

        int partition = Integer.parseInt("0");

        // broker节点的ip

        List<String> seeds = new ArrayList<String>();

        seeds.add("192.168.110.81");

        seeds.add("192.168.110.82");

        seeds.add("192.168.110.83");

        // 端口

        int port = Integer.parseInt("9092");

        try {

            example.run(maxReads, topic, partition, seeds, port);

        catch (Exception e) {

            System.out.println("Oops:" + e);

            e.printStackTrace();

        }

    }

 

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {

        // 获取指定Topic partition的元数据

        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);

        if (metadata == null) {

            System.out.println("Can't find metadata for Topic and Partition. Exiting");

            return;

        }

        if (metadata.leader() == null) {

            System.out.println("Can't find Leader for Topic and Partition. Exiting");

            return;

        }

        String leadBroker = metadata.leader().host();

        String clientName = "Client_" + a_topic + "_" + a_partition;

 

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 10000064 1024, clientName);

        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;

        while (a_maxReads > 0) {

            if (consumer == null) {

                consumer = new SimpleConsumer(leadBroker, a_port, 10000064 1024, clientName);

            }

            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();

            FetchResponse fetchResponse = consumer.fetch(req);

 

            if (fetchResponse.hasError()) {

                numErrors++;

                // Something went wrong!

                short code = fetchResponse.errorCode(a_topic, a_partition);

                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);

                if (numErrors > 5)

                    break;

                if (code == ErrorMapping.OffsetOutOfRangeCode()) {

                    // We asked for an invalid offset. For simple case ask for

                    // the last element to reset

                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);

                    continue;

                }

                consumer.close();

                consumer = null;

                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);

                continue;

            }

            numErrors = 0;

 

            long numRead = 0;

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {

                long currentOffset = messageAndOffset.offset();

                if (currentOffset < readOffset) {

                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);

                    continue;

                }

 

                readOffset = messageAndOffset.nextOffset();

                ByteBuffer payload = messageAndOffset.message().payload();

 

                byte[] bytes = new byte[payload.limit()];

                payload.get(bytes);

                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " new String(bytes, "UTF-8"));

                numRead++;

                a_maxReads--;

            }

 

            if (numRead == 0) {

                try {

                    Thread.sleep(1000);

                catch (InterruptedException ie) {

                }

            }

        }

        if (consumer != null)

            consumer.close();

    }

 

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

        OffsetResponse response = consumer.getOffsetsBefore(request);

 

        if (response.hasError()) {

            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

            return 0;

        }

        long[] offsets = response.offsets(topic, partition);

        return offsets[0];

    }

 

    /**

     * @param a_oldLeader

     * @param a_topic

     * @param a_partition

     * @param a_port

     * @return String

     * @throws Exception

     *             找一个leader broker

     */

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {

        for (int i = 0; i < 3; i++) {

            boolean goToSleep = false;

            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);

            if (metadata == null) {

                goToSleep = true;

            else if (metadata.leader() == null) {

                goToSleep = true;

            else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {

                // first time through if the leader hasn't changed give

                // ZooKeeper a second to recover

                // second time, assume the broker did recover before failover,

                // or it was a non-Broker issue

                //

                goToSleep = true;

            else {

                return metadata.leader().host();

            }

            if (goToSleep) {

                try {

                    Thread.sleep(1000);

                catch (InterruptedException ie) {

                }

            }

        }

        System.out.println("Unable to find new leader after Broker failure. Exiting");

        throw new Exception("Unable to find new leader after Broker failure. Exiting");

    }

 

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {

        PartitionMetadata returnMetaData = null;

        loop: for (String seed : a_seedBrokers) {

            SimpleConsumer consumer = null;

            try {

                consumer = new SimpleConsumer(seed, a_port, 10000064 1024"leaderLookup");

                List<String> topics = Collections.singletonList(a_topic);

                TopicMetadataRequest req = new TopicMetadataRequest(topics);

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {

                    for (PartitionMetadata part : item.partitionsMetadata()) {

                        if (part.partitionId() == a_partition) {

                            returnMetaData = part;

                            break loop;

                        }

                    }

                }

            catch (Exception e) {

                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);

            finally {

                if (consumer != null)

                    consumer.close();

            }

        }

        if (returnMetaData != null) {

            m_replicaBrokers.clear();

            for (BrokerEndPoint replica : returnMetaData.replicas()) {

                m_replicaBrokers.add(replica.host());

            }

        }

        return returnMetaData;

    }

}

标签:offset,kafka,topic,API,new,import,低级,consumer,Kafka
来源: https://blog.csdn.net/u011250186/article/details/115674130

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有