标签:hdfs sinks kafka a1 sources 埋点 电商 日志 channels
https://www.bilibili.com/video/BV1L4411K7hW?p=31&spm_id_from=pageDriver
架构:
数据流
生产Flume读取日志文件做简单ETL后写入到kafka,然后消费Flume从kafka中将数据读出写入到hdfs。项目中还应用了zookeeper来协调的分布式kafka和分布式Hadoop。
步骤
1)制作埋点日志dummy文件
1.1)编写Java程序生成埋点日志文件,并Maven打包(期望完成时间2021-7-2,实际完成时间2021-7-3)
1.2)将打好的jar包传到66服务器和88服务器。
2)搭建Hadoop环境
安装Hadoop,并进行验证和调优。
<Hadoop_Home>/etc/hadoop/hdfs-site.xml <Hadoop_Home>/etc/hadoop/core-site.xml <Hadoop_Home>/etc/hadoop/yarn-site.xml <Hadoop_Home>/etc/hadoop/mapred-site.xml <Hadoop_Home>/etc/hadoop/hadoop-env.sh <Hadoop_Home>/etc/hadoop/mapred-env.sh <Hadoop_Home>/etc/hadoop/yarn-env.sh <Hadoop_Home>/etc/hadoop/workers zkServer.sh start /home/user/hadoop-3.2.2/sbin/hadoop-daemon.sh start zkfc /home/user/hadoop-3.2.2/sbin/start-dfs.sh /home/user/hadoop-3.2.2/sbin/start-yarn.sh
3)安装配置Flume。
安装Flume,并进行验证和调优。
4)配置生产Flume,从日志文件中读取数据,写入到kafka。
4.1)在66服务器上配置文件file-flume-kafka.conf如下,然后分发到88和99服务器。
[user@NewBieMaster ~]$ vi /home/user/flume-1.9/conf/file-flume-kafka.conf
a1.source=r1 a1.channels= c1 c2 #configure source a1.sources.r1.type=TAILDIR a1.sources.r1.positionFile=/home/user/flume-1.9/test/log_position_json a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1=/tmp/debug.+ a1.sources.r1.fileHeader = true a1.sources.channels = c1 c2 #interceptor a1.sources.r1.interceptors= i1 i2 a1.sources.r1.interceptors.i1.type=com.example.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type=com.example.flume.interceptor.LogTypeInterceptor$Builder a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=topic a1.sources.r1.selector.mapping.topic_start=c1 a1.sources.r1.selector.mapping.topic_event=c2 #configure channel a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers=192.168.1.66:9092,192.168.1.88:9092,192.168.1.99:9092 a1.channels.c1.kafka.topic=topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.groupd.id=flume-consumer a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers=192.168.1.66:9092,192.168.1.88:9092,192.168.1.99:9092 a1.channels.c2.kafka.topic=topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.groupd.id=flume-consumer
4.2)创建Java Maven工程,制作拦截器com.example.flume.interceptor.LogETLInterceptor$Builder和com.example.flume.interceptor.LogTypeInterceptor$Builder。
5)安装kafka
安装Kafka,并进行验证和调优
6)在66服务器上启动flume agent,查看kafka队列
~/flume-1.9/bin/flume-ng agent --name a1 -con-file ./conf/file-flume-kafka.conf -Dlume.root.logger=FINEST & kafka-topics.sh --zookeeper 192.168.1.66:2181,192.168.1.88:2181,192.168.1.99:2181 --list kafka-topics.sh --zookeeper 192.168.1.66:2181,192.168.1.88:2181,192.168.1.99:2181 --describe topic_start kafka-topics.sh --zookeeper 192.168.1.66:2181,192.168.1.88:2181,192.168.1.99:2181 --describe topic_event [user@NewBieMaster tmp]$ kafka-topics.sh --zookeeper 192.168.1.66:2181,192.168.1.88:2181,192.168.1.99:2181 --list __consumer_offsets test topic_event topic_start [user@NewBieMaster tmp]$
7)安装配置kafka manager,通过kafkamanager查看kafka
安装配置的步骤
8)配置消费Flume,从kafka中读取数据,写入到hdfs。
8.1)在99服务器上配置文件kafka-flume-hdfs.conf如下,然后分发到88和66服务器。
##组件 a1.sources= r1 r2 a1.channels = c1 c2 a1.sinks = k1 k2 ##source1 a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 500 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers=192.168.1.66:9092,192.168.1.88:9092,192.168.1.99:9092 a1.sources.r1.kafka.topics=topic_start ##source2 a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 500 a1.sources.r2.batchDurationMillis = 200 a1.sources.r2.kafka.bootstrap.servers=192.168.1.66:9092,192.168.1.88:9092,192.168.1.99:9092 a1.sources.r2.kafka.topics=topic_event ##channel1 a1.channels.c1.type= file a1.channels.c1.checkpointDir=/tmp/flumecheckpoint/behavior1 a1.channels.c1.dataDirs=/tmp/flumedata/behavior1 a1.channels.c1.maxFileSize=2146435071 a1.channels.c1.capacity=1000000 a1.channels.c1.keep-alive=6 ##channel2 a1.channels.c2.type= file a1.channels.c2.checkpointDir=/tmp/flumecheckpoint/behavior2 a1.channels.c2.dataDirs=/tmp/flumedata/behavior2 a1.channels.c2.maxFileSize=2146435071 a1.channels.c2.capacity=1000000 a1.channels.c2.keep-alive=6 ##sink1 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=/orginal/gmail/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix=logstart- a1.sinks.hdfs.round=true a1.sinks.hdfs.roundValue=10 a1.sinks.hdfs.roundUnit=second ##sink2 a1.sinks.k2.type=hdfs a1.sinks.k2.hdfs.path=/orginal/gmail/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix=logstart- a1.sinks.hdfs.round=true a1.sinks.hdfs.roundValue=10 a1.sinks.hdfs.roundUnit=second ##不产生小文件 a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=134217728 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k2.hdfs.rollInterval=10 a1.sinks.k2.hdfs.rollSize=134217728 a1.sinks.k2.hdfs.rollCount=0 ##控制数出文件是原生文件 a1.sinks.k1.hdfs.fileType=CompressedStream a1.sinks.k2.hdfs.fileType=CompressedStream a1.sinks.k1.hdfs.codeC=lzop a1.sinks.k2.hdfs.codeC=lzop ##拼装 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 a1.sources.r2.channels=c2 a1.sinks.k2.channel=c2
标签:hdfs,sinks,kafka,a1,sources,埋点,电商,日志,channels 来源: https://www.cnblogs.com/hailunw/p/14969858.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。