ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Kafka java消费者拉取消息源码

2021-10-09 01:04:35  阅读:231  来源: 互联网

标签:java 拉取 return records 源码 timeout props new poll


消费者Demo

 1 import org.apache.kafka.clients.consumer.ConsumerRecord;
 2 import org.apache.kafka.clients.consumer.ConsumerRecords;
 3 import org.apache.kafka.clients.consumer.KafkaConsumer;
 4 import org.apache.kafka.common.serialization.StringDeserializer;
 5 
 6 import java.util.Arrays;
 7 import java.util.Properties;
 8 
 9 public class ConsumerDemo {
10     private final KafkaConsumer<String, String> consumer;
11     private ConsumerRecords<String, String> msgList;
12     private final String topic;
13     private static final String GROUPID = "groupA";
14 
15     public ConsumerDemo(String topicName) {
16         Properties props = new Properties();
17         props.put("bootstrap.servers", "localhost:9092");
18         props.put("group.id", GROUPID);
19         props.put("enable.auto.commit", "true");
20         props.put("auto.commit.interval.ms", "1000");
21         props.put("session.timeout.ms", "30000");
22         props.put("auto.offset.reset", "earliest");
23         props.put("key.deserializer", StringDeserializer.class.getName());
24         props.put("value.deserializer", StringDeserializer.class.getName());
25         this.consumer = new KafkaConsumer<String, String>(props);
26         this.topic = topicName;
27         this.consumer.subscribe(Arrays.asList(topic));
28     }
29 
30 
31     public void receiveMsg() {
32         int messageNo = 1;
33         System.out.println("---------开始消费---------");
34         try {
35             for (;;) {
36                 msgList = consumer.poll(1000);
37                 if(null!=msgList&&msgList.count()>0){
38                     for (ConsumerRecord<String, String> record : msgList) {
39                         System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
40                     }
41                 }else{
42                     Thread.sleep(1000);
43                 }
44             }
45         } catch (InterruptedException e) {
46             e.printStackTrace();
47         } finally {
48             consumer.close();
49         }
50     }
51     public static void main(String args[]) {
52         ConsumerDemo consumerDemo = new ConsumerDemo("KAFKA_TEST");
53         consumerDemo.receiveMsg();
54     }
55 }

36行开始拉取kafka服务器消息,进入源码KafkaConsumer.java poll方法

 1     @Override
 2     public ConsumerRecords<K, V> poll(long timeout) {
 3         acquireAndEnsureOpen();
 4         try {
 5             if (timeout < 0)
 6                 throw new IllegalArgumentException("Timeout must not be negative");
 7 
 8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
 9                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
10 
11             // poll for new data until the timeout expires
12             long start = time.milliseconds();
13             long remaining = timeout;
14             do {
15                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
16                 if (!records.isEmpty()) {
17                     // before returning the fetched records, we can send off the next round of fetches
18                     // and avoid block waiting for their responses to enable pipelining while the user
19                     // is handling the fetched records.
20                     //
21                     // NOTE: since the consumed position has already been updated, we must not allow
22                     // wakeups or any other errors to be triggered prior to returning the fetched records.
23                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
24                         client.pollNoWakeup();
25 
26                     if (this.interceptors == null)
27                         return new ConsumerRecords<>(records);
28                     else
29                         return this.interceptors.onConsume(new ConsumerRecords<>(records));
30                 }
31 
32                 long elapsed = time.milliseconds() - start;
33                 remaining = timeout - elapsed;
34             } while (remaining > 0);
35 
36             return ConsumerRecords.empty();
37         } finally {
38             release();
39         }
40     }

15行pollOnce()方法拉取消息,进入pollOnce()方法

 1     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
 2         client.maybeTriggerWakeup();
 3         coordinator.poll(time.milliseconds(), timeout);
 4 
 5         // fetch positions if we have partitions we're subscribed to that we
 6         // don't know the offset for
 7         if (!subscriptions.hasAllFetchPositions())
 8             updateFetchPositions(this.subscriptions.missingFetchPositions());
 9 
10         // if data is available already, return it immediately
11         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
12         if (!records.isEmpty())
13             return records;
14 
15         // send any new fetches (won't resend pending fetches)
16         fetcher.sendFetches();
17 
18         long now = time.milliseconds();
19         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
20 
21         client.poll(pollTimeout, now, new PollCondition() {
22             @Override
23             public boolean shouldBlock() {
24                 // since a fetch might be completed by the background thread, we need this poll condition
25                 // to ensure that we do not block unnecessarily in poll()
26                 return !fetcher.hasCompletedFetches();
27             }
28         });
29 
30         // after the long poll, we should check whether the group needs to rebalance
31         // prior to returning data so that the group can stabilize faster
32         if (coordinator.needRejoin())
33             return Collections.emptyMap();
34 
35         return fetcher.fetchedRecords();
36     }

11行拉取本地缓存的消息,本地消息为空,16行重新去服务器拉取

 

标签:java,拉取,return,records,源码,timeout,props,new,poll
来源: https://www.cnblogs.com/luckygxf/p/15383633.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有