ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

filebeat+kafka+logstash 详细配置

2021-03-01 18:03:36  阅读:244  来源: 互联网

标签:filebeat zookeeper 192.168 kafka topic logstash


环境 :centos 7

192.168.1.1 zookeeper+kafka+logstash+es+kiana
192.168.1.2 zookeeper+kafka+filebeat
192.168.1.3 zookeeper+kafka+filebeat

组件介绍:

1.Filebeat负责从web服务器上实时抓取数据,当log文件发生变化时,将文件内容吐给kafka。

2.Kafka是消息队列,主要作用是在filebeat和logstash之间做缓存,避免因写入logstash的数据量过大,导致数据丢失。

3.Zookeeper是kafka的分发系统,他负责维护整个kafka集群的负载均衡,在部署的时候,每个kafka节点上都要单独安装zookeeper,同时要保证zookeeper之间能够互相通信(2181端口)。

4.Logstash是日志处理器,也是整个elk系统的核心。负责来自kafka的日志处理,然后把处理过的日志吐给elasticsearch。需要注意的是,经logstash处理过的日志都是json格式的

架构图:

在这里插入图片描述

一、配置filebeat

1.安装filebeat

yum -y install filebeat......rpm 

可直接安装rpm包使用,看清linux版本

2.配置filebeat

cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: system_log
  enabled: true
  paths:
    - /var/log/message
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
  hosts: ["192.168.1.1:9092","192.168.1.2:9092","192.168.1.3:9092"]
  enabled: true
  topic: node01-system-message	#会自定在kafka创建该topic供logstash订阅读取时使用
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

官方地址wiki:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html#_password_4

3.启动

filebeat -e -c /etc/filebeat/filebeat.yml

二、kafka集群搭建并测试

1.下载kafka

官网下载地址为:http://kafka.apache.org/downloads

2.将下载好的压缩包解压

3.配置zookeeper

kafka启动依赖于zookeeper集群

1).使用kafka包里自带的zookeeper

添加如下基本配置(三台上基本一致)

vim /opt/kafka.../config/zookeeper.propertie

tickTime=2000
initLimit=10
syncLimit=5
dataDir = /data/zookeeper
server.1 = 192.168.1.1:2888:3888
server.2 = 192.168.1.2:2888:3888
server.3 = 192.168.1.3:2888:3888

2888 端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;

3888 端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。

mkdir -p /data/zookeeper
 echo (1,2,3) > /data/zookeeper/myid	#每天ip对应,不能有重复

2).使用另外的zookeeper

3).启动zookpeer集群

/opt/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh ../config/zookeeper.propertie	#注意这是前台启动

zookeeper-server-stop.sh	#停止脚本

刚开始会报错是由于zookeeper集群在启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。

4).zookpeer集群检查

netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24 : 3888 0.0.0.0 : * LISTEN 1959 / java

tcp 0 0 0.0.0.0 : 2181 0.0.0.0 : * LISTEN 1959/ java

tcp 0 0 192.168.2.24 : 2888 0.0.0.0 : * LISTEN 950 / java

可以看出,如果哪台是Leader,那么它就拥有2888这个端口

4.配置kafka集群

1).配置kafka(三台大致配置一样,除却ip跟broker_id)

vim /opt/kafka_2.12-2.7.0/config/server.properties

要修改或添加的参数,除却以下参数,采用默认即可

broker.id=1	#对应zookeepr集群id
delete.topic.enable=true	#默认时false,为true时,删除topic时会被立即删除
host.name=192.168.1.1
listeners=PLAINTEXT://192.168.1.1:9092
advertised.host.name=192.168.1.1
advertised.port=9092
log.dirs=/data/kafka/kafkalogs
############################# Zookeeper ############################
zookeeper.connect=192.168.1.1:2181.192.168.1.2:2181,192.168.1.3:2181
zookeeper.connection.timeout.ms=18000
######### Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enable=true  #允许删除topic
auto.create.topics.enable = true #允许自动创建topic

2).启动测试

启动kafka

/opt/kafka_2.12-2.7.0/bin/kafka-server-start.sh -daemon(后台启动) ../config/server.properties

三台依次启动,启动侯无发现错误信息即可测试

测试

创建topic

./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --replication-factor 3 --partitions 1 --topic yanghaoyu

--replication-factor 3 #该参数不能大于集群总数

查看topic列表

 ./kafka-topics.sh --list --zookeeper 192.168.1.2:2181

三、配置logstash

1.安装logstash(这里使用rpm文件直接安装使用)

yum - y install logstash.......

2.编辑配置文件

vim /etc/logstash/conf.d/test.yml
#输入配置,一个input{}里可以配置多个输入源
input {
  #kafka输入源配置
  kafka {
    #kafka集群地址
    bootstrap_servers => ["192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092"]
    #从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致
    topics => ["node01-system-message"]
    #这是kafka中的消费组者ID,默认值是“logstash”。kafka将消息发到每个消费者组中,同一个组中的消费者收到的数据不重复。例如有两个消费者组G1、G2,G1中有成员A、B,G2中有成员C、D。kafka从输入中收到了10条消息,会将这10条消息同时发送给G1和G2,A和B各会收到这10条消息中的一部分,他们收到消息的并集就是这10条消息,C和D同理。
    group_id => "filebeat-logstash"
    #kafka消费者组中每个消费者的ID,默认值是“logstash”
    client_id => "logstash-node01"
    #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。
    consumer_threads => 1
    #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
    codec => json
  }
}

#输出配置,这里表示输出到文件
output {
  file {
    path => "/tmp/logstash.output"
  }
}

3.启动测试

logstash -f /etc/logstash/conf.d/test.yml

tail -f /tmp/logstash.output

能看到filebeat收集的message的相关信息;测试成功,接下来输出到es

output {
        elasticsearch {
                hosts => ["192.168.137.25:9200"]
                index => "node01-system-message-%{+YYYY-MM}"
                }
        }

标签:filebeat,zookeeper,192.168,kafka,topic,logstash
来源: https://www.cnblogs.com/yhy223/p/14465055.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有