ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flume入门

2021-11-15 23:33:35  阅读:173  来源: 互联网

标签:flume channels 入门 a1 sources k1 c1 sinks


flume入门

1.安装与配置

下载地址:http://archive.apache.org/dist/flume/
安装:
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

(3)修改apache-flume-1.9.0-bin的名称为flume

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

rm /opt/module/flume/lib/guava-11.0.2.jar

配置环境变量:

vim /etc/profile.d/my_env.sh

将flume的环境变量导进去

export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

测试:
输入flume-ng 若出现语法提示则成功
在这里插入图片描述

2.基础架构

3.事务

在这里插入图片描述

4.agent的内部原理

在这里插入图片描述

5.基础案例

1.实时监控单个文件,并上传到HDFS中 hdfs

A.案例需求:实时监控单个文件,并上传到HDFS中 hdfs sink 为常用sink

  1. 确定 sources sink channel 这三个组件要使用什么类型
    exec hdfs memory
  2. 到官方文档看需要填什么参数
#name
  a1.sources = s1
  a1.sinks = k1
  a1.channels = c1

  #sources 定义
  a1.sources.s1.type = exec
  a1.sources.s1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt

  #channel 定义
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100


  #sinks 定义
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  #上传文件的前缀
  a1.sinks.k1.hdfs.filePrefix = logs-
  #是否按照时间滚动文件夹
  a1.sinks.k1.hdfs.round = true
  #多少时间单位创建一个新的文件夹
  a1.sinks.k1.hdfs.roundValue = 1
  #重新定义时间单位
  a1.sinks.k1.hdfs.roundUnit = hour
  #是否使用本地时间戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #积攒多少个Event才flush到HDFS一次
  a1.sinks.k1.hdfs.batchSize = 100
  #设置文件类型,可支持压缩
  a1.sinks.k1.hdfs.fileType = DataStream
  #多久生成一个新的文件
  a1.sinks.k1.hdfs.rollInterval = 60
  #设置每个文件的滚动大小
  a1.sinks.k1.hdfs.rollSize = 134217700
  #文件的滚动与Event数量无关
  a1.sinks.k1.hdfs.rollCount = 0

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1

2.使用Flume监听整个目录的文件,并上传至HDFS

B.案例需求:使用Flume监听整个目录的文件,并上传至HDFS (自有当有新文件时才会上传)

#name
  a1.sources = r1
  a1.channels = c1
  a1.sink1 = k1
  
  #source
  a1.sources.r1.type = spooldir
  a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/jobs/spooldir
  a1.sources.r1.fileSuffix = .COMPLETED
  a1.sources.r1.fileHeader = true

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100

  #sinks
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  #上传文件的前缀
  a1.sinks.k1.hdfs.filePrefix = logs-
  #是否按照时间滚动文件夹
  a1.sinks.k1.hdfs.round = true
  #多少时间单位创建一个新的文件夹
  a1.sinks.k1.hdfs.roundValue = 1
  #重新定义时间单位
  a1.sinks.k1.hdfs.roundUnit = hour
  #是否使用本地时间戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #积攒多少个Event才flush到HDFS一次
  a1.sinks.k1.hdfs.batchSize = 100
  #设置文件类型,可支持压缩
  a1.sinks.k1.hdfs.fileType = DataStream
  #多久生成一个新的文件
  a1.sinks.k1.hdfs.rollInterval = 60
  #设置每个文件的滚动大小
  a1.sinks.k1.hdfs.rollSize = 134217700
  #文件的滚动与Event数量无关
  a1.sinks.k1.hdfs.rollCount = 0

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1

  启动: flume-ng agent -c  $FLUME_HOME/conf  -f  $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

3.使用Flume监听整个目录的实时追加文件,并上传至HDFS

C.使用Flume监听整个目录的实时追加文件,并上传至HDFS (目录需要分组,只有是组内的文件追加内容时才会上传)(分组一般用正则表达式识别)

#name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    #source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
    a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
    a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

    #channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100

    #sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
    #上传文件的前缀
    a1.sinks.k1.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a1.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a1.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a1.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a1.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a1.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a1.sinks.k1.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a1.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a1.sinks.k1.hdfs.rollCount = 0

    #bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

4.复制 replicating

D.案例需求: 复制
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memory avro
flume2 avro memory hdfs
flume3 avro memory file_roll

flume3.conf
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = file_roll
  a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = hdfs
  a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  a2.sinks.k1.hdfs.filePrefix = logs-
  a2.sinks.k1.hdfs.round = true
  a2.sinks.k1.hdfs.roundValue = 1
  a2.sinks.k1.hdfs.roundUnit = hour
  a2.sinks.k1.hdfs.useLocalTimeStamp = true
  a2.sinks.k1.hdfs.batchSize = 100
  a2.sinks.k1.hdfs.fileType = DataStream
  a2.sinks.k1.hdfs.rollInterval = 60
  a2.sinks.k1.hdfs.rollSize = 134217700
  a2.sinks.k1.hdfs.rollCount = 0

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1
  #name
  a1.sources = r1
  a1.channels = c1 c2
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #ChannelSelector
  a1.sources.r1.selector.type = replicating

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100

  a1.channels.c2.type = memory
  a1.channels.c2.capacity = 10000
  a1.channels.c2.transactionCapacity = 100

  #sinks
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #bind
  a1.sources.r1.channels = c1 c2
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c2

  启动3:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console 

  启动2:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console

  启动1:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console 
 

5.负载均衡 load_balance

案例需求:负载均衡案例
使用Flume-1监控文件变动,Flume-1将变动内容(轮询或者随机传递给)传递给Flume-2,flume-3 Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memroy avro
flume2 avro memroy hdfs
flume3 avro memroy file_roll

flume3.conf
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = file_roll
  a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = hdfs
  a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  a2.sinks.k1.hdfs.filePrefix = logs-
  a2.sinks.k1.hdfs.round = true
  a2.sinks.k1.hdfs.roundValue = 1
  a2.sinks.k1.hdfs.roundUnit = hour
  a2.sinks.k1.hdfs.useLocalTimeStamp = true
  a2.sinks.k1.hdfs.batchSize = 100
  a2.sinks.k1.hdfs.fileType = DataStream
  a2.sinks.k1.hdfs.rollInterval = 60
  a2.sinks.k1.hdfs.rollSize = 134217700
  a2.sinks.k1.hdfs.rollCount = 0

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #Sink Processor
  a1.sinkgroups = g1
  a1.sinkgroups.g1.sinks = k1 k2
  a1.sinkgroups.g1.processor.type = load_balance
  a1.sinkgroups.g1.processor.selector = random

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c1

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

7.故障转移 failover

案例需求:故障转移
使用Flume-1监控端口,Flume-1将端口数据传递给Flume-2(为active状态),当flume2出现故障时,将数据传递给flume3
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf

#name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger
  

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1
  
  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = logger
  

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 5555

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #Sink Processor
  a1.sinkgroups = g1
  a1.sinkgroups.g1.sinks = k1 k2
  a1.sinkgroups.g1.processor.type = failover
  a1.sinkgroups.g1.processor.priority.k1 = 5
  a1.sinkgroups.g1.processor.priority.k2 = 10

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c1

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

8.聚合

案例需求:聚合
hadoop102上的Flume-1监控文件/opt/module/flume-1.9.0/jobs/taildir,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
flume sources channels sinks
分析: flume1 taildir memory avro
flume2 netcat memory avro
flume3 avro memory logger

flume3.conf        
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = hadoop104
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  #sources
  a2.sources.r1.type = netcat
  a2.sources.r1.bind = hadoop103
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = avro
  a2.sinks.k1.hostname = hadoop104
  a2.sinks.k1.port = 8888

  #bind
  a2.sources.r1.channels = c1
  a2.sinks.k1.channel = c1

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = hadoop104
  a1.sinks.k1.port = 8888

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  
  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume1.conf -n a1 -Dflume.root.logger=INFO,console

8.多路 multiplexing

案例需求:多路
hadoop101 的flume1 监控 8888端口的数据 并将已字母开头的发给Hadoop104,数字开头的发送给Hadoop103
flume2,flume3打印到控制台
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger

flume3.conf        
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = hadoop104
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  #sources
  a2.sources.r1.type = avro
  a2.sources.r1.bind = hadoop103
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = logger

  #bind
  a2.sources.r1.channels = c1
  a2.sinks.k1.channel = c1

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1 c2
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 6666

  # Interceptor
  a1.sources.r1.interceptors = i1
  a1.sources.r1.interceptors.i1.type = myInterceptor.CustomInterceptor$Builder

  #channel selector
  a1.sources.r1.selector.type = multiplexing
  a1.sources.r1.selector.header = type
  a1.sources.r1.selector.mapping.letter = c1
  a1.sources.r1.selector.mapping.number = c2

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  a1.channels.c2.type = memory
  a1.channels.c2.capacity = 10000
  a1.channels.c2.transactionCapacity = 100

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = hadoop104
  a1.sinks.k1.port = 8888

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = hadoop103
  a1.sinks.k2.port = 7777

  #bind
  a1.sources.r1.channels = c1 c2
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c2      

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

9.监听端口 netcat

#监听端口
要用到netcat

#name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 6666

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = logger
  
  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  
  启动:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

标签:flume,channels,入门,a1,sources,k1,c1,sinks
来源: https://blog.csdn.net/m0_51022702/article/details/121345691

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有