ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flume文件配置方法且maven

2021-09-21 15:58:26  阅读:248  来源: 互联网

标签:Flume a1 r1 文件 maven sources c1 channels sinks


flume配置简介

配置简介

总的来说:

source用来接收数据
sinks用来发送数据
channel用来缓存数据

以下是一些相关类型,以后将会用到。
1.Source组件类型(用于接收从某个地方发送过来的数据)

Netcat Source
接受来自于数据客户端的请求数据,常用于测试开发

Exec Source
运行一个给定的unix指令,将指令的执行结果作为数据来源

Spooling Directory Source
监视指定目录的新文件,并从出现的新文件中解析事件

Kafka Source
获取来自于Kafka集群中数据

Sequence Generator Source
序列产生器,计数器从0开始每次+1到LONG.MAX_VALUE

Avro Source
接受来自于Avro Client请求数据,类似于Netcat Source
通常用于构建Flume集群和RPC通信数据的手机

2.Channel组件类型(用于缓存数据)
Memory Channel
将Event事件对象缓存到内存中
优点:快
缺点:存在数据丢失风险

JDBC Channel
将Event事件对象保存到DB中,目前只支持Derby
优点:安全
缺点:效率较低

File Channel
将Event事件对象保存到文件中
优点:安全
缺点:效率较低

Kafka Channel
将Event事件写入保存到Kafka集群
优点:高可用,数据备份

3.Sink组件(主要是输出文件,或者输出到其他主机或者发送信息到其他地方)
Logger Sink
以日志的形式输出采集到的数据

HDFS Sink
将采集到的数据最终写出到HDFS分布式文件系统,支持两种文件格式:文本和序列
注意:文件格式DataStream,采集到的数据不会进行序列化处理
每隔十分钟产生一个数据文件目录

File Roll Sink
基于文件滚动的sink输出,将采集到的数据写入保存到本地文件系统

Null Sink
将采集到所有的数据,全部丢弃

HBaseSinks
将采集到的数据写出保存到HBase非关系型数据库

安装flume(这里使用的是1.9版本)

这个文档建议用于1.6版本以上,因为涉及一些方法要自行调试,旧版本会比较麻烦。

1.下载安装前所需要的插件,用来简单测试flume。

yum install -y nc  下载插件

把我们要用的安装包下载
flume官方下载链接
安装包(点击就可以下载)

上传完我们的flume,然后解压,这些步骤不懂自行上网查询。
2.配置环境

export FLUME_HOME=你的flume路径
export PATH=$PATH:$FLUME_HOME/bin


3.启动hadoop这里建议使用完全分布式,这里会涉及多个agent。
4.启动flume文件方法

[root@master conf]# flume-ng agent -n a1 -c  ./ -f /example.conf -Dflume.root.logger=INFO,console

或者在flume.env.sh文件中加入

export JAVA_OPTS="-Dflume.root.logger=INFO,console"

各种配置文件

带有文件1,2的注意分开不同机器,注意看一下文档内容了解一下。

1、提示:

目前我这里用的是俩台机器分别都有hadoop3.x,flume1.9
192.168.120.129是我的主机
192.168.120.134是我的副机

2、各种配置方法

在flume目录中的conf目录

1、非持久化保存数据:文件名 example.conf

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

2、持久化保存数据

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/flume/checkpoint
a1.channels.c2.dataDirs = /usr/local/src/flume/data

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c2

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

3、单个日志监控

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置soure类型为exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F app.log

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

4、多个日志监控

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

5、多个agent监控

文件一:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

文件二:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

新打开文件1的会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

6、拦截器:

文件1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

#配置sink类型为sink
a1.sinks.k1.type = logger

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

文件2:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

新打开文件1的会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

7、拦截器的使用:

这个看个眼熟

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#添加拦截器
a1.sources.r1.interceptors = i1 i2 i3 i4 i5
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp

#自定义拦截器
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = beijing

#添加UUID
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

#隐藏文字
a1.sources.r1.interceptors.i5.type = search_replace
a1.sources.r1.interceptors.i5.searchPattern = \\d{6}
a1.sources.r1.interceptors.i5.replaceString = ******

#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

8、自定义拦截器

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是MyHostInterceptor.java
使用的是:

包名+文件名
Mzj.Demo.MyHostInterceptor$Builer

打包后的jar包放在flume/lib目录

文件1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555 
            
#添加自定义拦截器 
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type =  Mzj.Demo.MyHostInterceptor$Builder
           
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或>者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 
    
#将source 和 sink 绑定到channel 上
a1.sources.r2.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#定义agent名称为a1

文件2:

#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

a1.sinks.k1.type = logger

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

9、管道选择器

这些都放在同一个机器上
agent1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量>或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100

a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4

#通道选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4

#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = US

agent2:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4040

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

agent3:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4041

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

agent4:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4042

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

10、sink故障转移:

agent1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129 
a1.sinks.k2.port = 4040
    
a1.sinks.k3.type = avro 
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041  
    
a1.sinks.k4.type = avro 
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

其他agent(n)文件如配置9agent(n)文件一样(除了1以外)

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

11、sink处理器负载均衡

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#定义组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

其他agent(n)文件如配置9agent(n)文件一样(除了1以外)
新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

12、导出数据到hdfs

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#node端口 9000/路径   下面的path  ,在发送数据的过程中会自动创建文件夹
#配置类型为hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

13、多个agent上传hdfs

文件1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.120.129
a1.sources.r1.port = 4040

#配置sink1类型为Avro
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

文件2:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink2,3,4类型为Avro
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.120.129
a1.sinks.k1.port = 4040

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

hadoop fs -cat 文件路径 (这个会在写入的信息会出现在主机)

14、自定义source

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:

包名+文件名

使用的是s1java
打包后的jar包放在flume/lib目录

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = Mzj.Demo.s1

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义sinks

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:

包名+文件名

使用的是s2.java
打包后的jar包放在flume/lib目录

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = Mzj.Demo.s1
a1.sinks.k1.type = Mzj.Demo.s2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

代码测试资源

提取链接
提取码:6666
打包:
双击红色框框

生成的jar包这个拉进flume/lib里面

maven资源简介:
pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>Mzj_baby</groupId>
    <artifactId>Flume_Demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

MyHostInterceptor.java:

package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 浩
 * 2021/9/15
 */
public class MyHostInterceptor implements Interceptor{
    private  String name;


    private static final Logger logger = LoggerFactory
            .getLogger(MyHostInterceptor.class);
    @Override
    public void initialize() {
        this.name = "";
    }

    @Override
    public Event intercept(Event event) {
        //对事件做处理 事件包含消息体和头部
        //如果host来源是192.168.120.129,对事件做一个抛弃处理
        if(event.getHeaders().get("host").equals("192.168.120.134")){
            logger.info("消息来源是134,抛弃事件");
            return null;
        }

        Map<String,String> map = new HashMap<String,String>();
        map.put("state","CZ");
        event.setHeaders(map);

        return event;
    }

    @Override
    //处理所有事件
    public List<Event> intercept(List<Event> events) {
        List<Event> eventList = new ArrayList<Event>();
        for (Event event: events){
            Event event1 = intercept(event);
            if (event1 != null){
                eventList.add(event1);
            }

        }
        return eventList;
    }

    @Override
    public void close() {

    }



    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return  new MyHostInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

s1.java

package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
/**
 * 浩
 * 2021/9/20
 */
public class s1 extends AbstractSource implements Configurable, PollableSource {
    //处理数据
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        try {
            //自己模拟数据发送
            for (int i = 0; i< 10;i++){
                Event event = new SimpleEvent();
                event.setBody(("data:"+i).getBytes());
                getChannelProcessor().processEvent(event);
                //数据准备消费
                status = Status.READY;
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }
        return status;
    }
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
    @Override
    public void configure(Context context) {
    }
}

s2.java

package Mzj.Demo;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 浩
 * 2021/9/21
 */
public class s2 extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(s2.class);
    //处理数据
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        //获取sink绑定的Channel
        Channel ch = getChannel();
        //获取事务
        Transaction transaction = ch.getTransaction();
        try {
            transaction.begin();
            //从Channel接收数据
            Event event = ch.take();
            //可以将数据发送到外部存储
            if(event == null){
                status = Status.BACKOFF;
            }else {
                logger.info(new String(event.getBody()));
                status = Status.READY;
            }
        transaction.commit();
        }catch (Exception e){
            logger.error(e.getMessage());
            status = Status.BACKOFF;
        }finally {
            transaction.close();
        }
        return status;
    }
    @Override
    public void configure(Context context) {
    }
}

标签:Flume,a1,r1,文件,maven,sources,c1,channels,sinks
来源: https://blog.csdn.net/weixin_51214928/article/details/120401089

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有