ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flume+Kafka+Storm实战:二、Flume与Kafka整合

2021-06-11 10:02:18  阅读:196  来源: 互联网

标签:Flume agent1 -- kafka Storm conf Kafka sinks


文章目录

0x00 文章内容
  1. Flume准备
  2. Kafka准备
  3. 校验结果

PS:请自行准备好Flume、Kafka的环境。由于本教程是属于整合教程,所以,我们可以直接在原来的基础上进行升级即可。过程是将教程:Flume入门案例之NetCat-Souces里的Sink修改为Kafka,而这里的Kafka用的其实是教程:Flume+Kafka+Storm实战:一、Kakfa与Storm整合里面的topic。

0x01 Flume准备
1. 编写Flume配置文件

a. 拷贝一份example
cd ~/bigdata/apache-flume-1.8.0-bin
cp conf/example.conf conf/kafka.conf
b. 然后修改Sink(点击跳转官网参考
vi conf/kafka.conf
在这里插入图片描述
在这里插入图片描述
c. 完整配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = word-count-input
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 5

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
0x02 Kafka准备
1. 创建topic(如已操作过可跳过)

a. 启动Kafka与ZK(三台都执行)
启动ZK:
zkServer.sh start
启动Kafka:
nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &

目前的进程情况(call_all.sh脚本请参考:大数据常用管理集群脚本集合):
在这里插入图片描述
b. 创建Topic:word-count-input(如果已创建则忽略)

查看Topic(在上一教程已经创建):
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --list --zookeeper master:2181
在这里插入图片描述
如果没创建则创建:
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input

0x03 校验结果
1. 启动Flume

a. 在master启动
bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/kafka.conf --name a1
在这里插入图片描述

2. 启动Kafka消费者

a. 首先应确保先将Kafka与ZK启动起来(已启动则忽略,三台都执行)
启动ZK:
zkServer.sh start
启动Kafka:
nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &
b. 在master启动Kafka消费者
kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-input

3. 测试结果

a. 测试过程与教程:Flume入门案例之NetCat-Souces 一样,可以看到我们的内容可以在Kafka的消费者端接收到。

在这里插入图片描述
在这里插入图片描述

0xFF 总结
  1. 其实内容也比较简单,重点是要结合官网来进行配置,其余的都是学习过的内容。
  2. 实战的上一篇教程:Flume+Kafka+Storm实战:一、Kakfa与Storm整合,学习没有先后顺序,但是应该具备Flume、Kafka等基础。
  3. 不同的版本,配置与结果都有可能不同,关于flumeBatchSize的配置项问题,请参考文章:Flume1.7及以上版本的Kafka Sink batchsize(flumeBatchSize) 配置问题,flumeBatchSize为分批次进行发送配置。
  4. Flume1.6.0版本参考:
先测试能否输出到控制台再对接Spark Streaming
技术选型一:avro-memory-logger
技术选型二:avro-memory-kafka

参考:
技术选型一:
#streaming_logger.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=localhost
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_logger.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

参考:
技术选型二:
#streaming_kafka.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streaming_topic
agent1.sinks.kafka-sink.brokerList = localhost:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

执行脚本:
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_kafka.conf \
--name agent1 \

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


标签:Flume,agent1,--,kafka,Storm,conf,Kafka,sinks
来源: https://blog.51cto.com/u_12564104/2895595

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有