ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kafka(五):消费组信息提取

2021-09-17 13:01:59  阅读:288  来源: 互联网

标签:消费 信息提取 kafka util org apache import append


文章目录


2021-09-17

说明

  • 本博客每周五更新一次。
  • 本文功能性博文,提取kafka所有消费组相关信息,整理后,提供为prometheus。

分享

环境

  • kafka2.3.0

实现

maven

<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>2.3.0</version>  
</dependency>  

代码

  • kerberos认证相关功能,此处不再附属,如有需要,可参照之前文章。
  • topic最大offset需要单独提取,输出数据结构为:时间|消费组|topic|最大offset|已消费offset|lag
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TimerTask;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kafka.customer.util.CollectionUtil;
import com.kafka.customer.util.ConfigUtil;
import com.kafka.customer.util.FileUtil;
import com.kafka.customer.util.TimeUtil;


public class KafkaLogTimer extends TimerTask {
	
	private final Logger log= LoggerFactory.getLogger(KafkaLogTimer.class);
	
	/**
	 * admin 客户端
	 */
	private AdminClient adminClient;
	
	/**
	 * consumer客户端
	 */
	private KafkaConsumer<String, String> consumer;
	
	/**
	 * 时间
	 */
	private long time;
	
	/**
	 * 输出文件名
	 */
	private String fileName;
	
	private String intervalChar="|";
	
	/**
	 * 执行状态 0  待执行  1 执行完毕
	 */
	private int state;
	
	
	@Override
	public void run() {
		String timeStr=TimeUtil.getFileTime();
		time=TimeUtil.getUtcMinuteTime();
		fileName=timeStr+ConfigUtil.txt;
		
		log.info("{} Start To Run",timeStr);
		state=0;
		
		KafkaLogThread kafkaLogThread=new KafkaLogThread();
		kafkaLogThread.start();
		long createTime=System.currentTimeMillis();
		
		while(true&&state==0) {
			//如果传入线程没有执行完
            if ((System.currentTimeMillis() - createTime) >= ConfigUtil.taskTime) {
                log.error("Time {} Deal Over Time Will To Stop",timeStr);
                kafkaLogThread.interrupt();
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("Wanting Thread Run Error");
            }
		}
		log.info("{}  End To Finish",timeStr);
	}
	
	/**
	 * 
	 * @author wangzonghui
	 * @date 2021-08-27 10:13:47
	 * @Description kafka信息获取线程
	 */
	class KafkaLogThread extends Thread{
		
		@Override
		public void run() {
			
			try {
				//连接
				init();
				//获取日志信息
				List<String> logdataList =createLog();
				//关闭连接
				close();
				
				//存储文件
				if(CollectionUtil.isNotEmptyCollection(logdataList)) {
					String outputFile=ConfigUtil.outputPath+fileName;
					FileUtil.createDataFile(logdataList, outputFile);
				}else {
					log.info("{} Get Log Is Null",TimeUtil.getTime());
				}

			} catch (Exception e) {
				log.info(fileName+" Create Error",e);
			}
			
			
			state=1;
		}
		
		/**
		 * 初始化环境变量
		 */
		public void init() {
			Properties props = new Properties();
		    props.put("bootstrap.servers", "localhost:9092");
		    
		    //kerbores安全认证
		    if(ConfigUtil.kerberos==0){
		    	props.put("security.protocol", "SASL_PLAINTEXT");
		    	props.put("sasl.mechanism", "GSSAPI");
		    	props.put("sasl.kerberos.service.name", "kafka");
		    }
		    
		    adminClient= AdminClient.create(props);

		    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		    consumer = new KafkaConsumer<>(props);
		}
		
		/**
		 * 生成日志主类
		 */
		public List<String> createLog() {
			//消费组信息获取
			ListConsumerGroupsResult list=adminClient.listConsumerGroups();
			KafkaFuture<Collection<ConsumerGroupListing>> data=list.all();
			List<String> logdataList=new ArrayList<>();
			
			try {
				if(data!=null&&data.get()!=null&&CollectionUtil.isNotEmptyCollection(data.get())) {
					Collection<ConsumerGroupListing>resultlist=data.get();
					Map<TopicPartition, OffsetAndMetadata> resultOffset;
					String context;
					StringBuffer buf;
					
					TopicPartition topicPartition;
					long endOffset,consumerOffset,lag;
					
					for(ConsumerGroupListing consumerGroupListing:resultlist) {
						
						resultOffset=adminClient.listConsumerGroupOffsets(consumerGroupListing.groupId()).partitionsToOffsetAndMetadata().get();
						if(resultOffset!=null&&CollectionUtil.isNotEmptyMap(resultOffset)) {
							for(Entry<TopicPartition, OffsetAndMetadata> mapItem :resultOffset.entrySet()) {
								context="";
								buf=new StringBuffer();
								buf.append(time).append(intervalChar);
								
								topicPartition=mapItem.getKey();
								endOffset=getLogEndOffset(topicPartition);
								consumerOffset=mapItem.getValue().offset();
								lag=endOffset-consumerOffset;
								
								buf.append(consumerGroupListing.groupId()).append(intervalChar).append(topicPartition.topic()).append(intervalChar).append(endOffset).append(intervalChar).append(consumerOffset).append(intervalChar).append(lag);
								context=buf.toString();

								logdataList.add(context);
							}
						}else {
							log.error("Get Consumer Group {} Of Null",consumerGroupListing.groupId());
						}
						
					}
				}else {
					log.info("No Found Consumer Gourp");
				}
			} catch (Exception e) {
				log.error("Get Kafka Consumer Error:"+e.toString(),e);
			}
			return logdataList;
		}
		
		/**
		 * 获取topic endoffset
		 * @param topicPartition
		 * @return
		 */
		private long getLogEndOffset(TopicPartition topicPartition) {
			
			consumer.assign(Arrays.asList(topicPartition));
			consumer.seekToEnd(Arrays.asList(topicPartition));
			return   consumer.position(topicPartition);
		}
		
		/**
		 * 关闭连接
		 */
		public void close() {
			if(adminClient!=null) {
				adminClient.close();
			}
			
			if(consumer!=null) {
				consumer.close();
			}
		}
	}
}

总结

  • 人生有岸,知识无涯,未来路上好好奋斗加油,少年。

标签:消费,信息提取,kafka,util,org,apache,import,append
来源: https://blog.csdn.net/qq_22973811/article/details/120346821

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有