ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Logstash 教程

2022-08-10 13:51:35  阅读:193  来源: 互联网

标签:教程 jdbc 插件 logstash input 数据 Logstash


Logstash 教程

1.简介

Logstash是一个数据同步工具,在ELK(Elasticsearch + Logstash + Kibana)技术栈中解决数据同步问题。日常项目中数据主要存储在MYSQL、日志文件中,通过Logstash可以将MYSQL、日志文件、redis等多种数据源的数据同步到ES,这样就可以通过ES搜索数据。

img

MYSQL同步数据到Elasticsearch,主要有下面几种策略:

  • 双写策略,更新MYSQL数据的同时通过ES API直接写入数据到ES (同步方式)

  • 通过Logstash同步数据到ES (异步方式)

  • 通过订阅MYSQL Binlog,将数据同步到ES (异步方式)

这里主要介绍Logstash如何同步数据。

2.安装

2.1.环境依赖

依赖Java 8 或者 Java 11环境,可以是更高的版本。

2.2.安装方式

2.2.1. centos

更新key

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

创建文件 /etc/yum.repos.d/logstash.repo 内容如下

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

安装Logstash

sudo yum install logstash

2.2.2. ubuntu

按顺序执行下面命令

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

sudo apt-get install apt-transport-https

echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list

sudo apt-get update && sudo apt-get install logstash

2.2.3. 通过压缩包安装

通过下面地址下载最新版本的压缩包(linux/mac系统下载tar.gz, windows下载zip)

https://www.elastic.co/cn/downloads/logstash

将压缩包解压到自定义目录即可。

linux系统例子:

tar -zxvf logstash-7.7.1.tar.gz

3.测试安装

下面验证logstash安装是否成功

# 切换到安装目录
cd logstash-7.7.1
# 执行命令
bin/logstash -e 'input { stdin { } } output { stdout {} }'

等一会,logstash启动后在控制台输入tizi365.com 按回车,可以看到类似下面的输出

tizi365.com
{
  "@timestamp" => 2020-06-09T15:45:38.147Z,
      "message" => "tizi365.com",
    "@version" => "1",
        "host" => "jogindembp"
}

添加config.reload.automatic命令参数,自动加载配置,不需要重新启动logstash

bin/logstash -f tizi.conf --config.reload.automatic

4.配置文件

可以将Logstash的配置都写入一个配置文件中,下面是配置文件的格式,主要有三部分组成

# 输入插件配置, 主要配置需要同步的数据源,例如:MYSQL
input {
}
# 过滤器插件配置, 主要用于对输入的数据进行过滤,格式化操作,filter是可选的。
filter {
}
# 输出插件配置,主要配置同步数据的目的地,例如同步到ES
output {
}

提示:logstash的input、filter、output都是由各种插件组成。

例子:

创建一个tizi.conf配置文件,内容如下:

input {
  stdin {}
}
output {
  stdout { codec => rubydebug }
}

说明:

这个配置文件的意思是,从控制台标准输入(stdin)接收输入,然后直接将结果在控制台标准输出(stdout)打印出来。

通过配置文件启动logstash

bin/logstash -f tizi.conf

5.同步nginx日志到ES

下面是将Nginx的访问日志同步到ES中的配置

配置文件名:tizi.conf

input {
  # 实时监控日志文件的内容,类似tail -f 命令的作用
  file {
      # nginx日志文件路径
      path => [ "/data/nginx/logs/nginx_access.log" ]
      start_position => "beginning"
      ignore_older => 0
  }
}
# 配置过滤器对日志文件进行格式化
filter {
  # 使用grok插件对日志内容进行格式化,提取日志内容,方便转换成json格式
  # %COMBINEDAPACHELOG 是grok插件内置的apache日志内容处理模板,其实就是一些表达式,用来格式日志文本内容,也可以格式化Nginx日志
  grok {
      match => {
          "message" => "%{COMBINEDAPACHELOG}"
      }
  }
}
# 配置输出目的地,这里配置同步到ES中
output {
  elasticsearch {
      # es服务器地址
      hosts => ["127.0.0.1:9200"]
      # 目标索引
      index => "nginx-access"
  }
}

启动logstash

bin/logstash -f tizi.conf

Logstash 工作原理

Logstash同步数据,主要有三个核心环节:inputs → filters → outputs,流程如下图。

img

inputs模块负责收集数据,filters模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

提示:inputs/filters/outputs是通过插件机制扩展各种能力。

inputs

inputs可以收集多种数据源的数据,下面是常见的数据源:

  • file - 扫描磁盘中的文件数据,例如: 扫描日志文件。

  • mysql - 扫描Mysql的表数据

  • redis

  • Filebeat - 轻量级的文件数据采集器,可以取代file的能力。

  • 消息队列kafka、rabbitmq等 - 支持从各种消息队列读取数据。

filters

filters是一个可选模块,可以在数据同步到目的地之前,对数据进行一些格式化、过滤、简单的数据处理操作。

常用的filters功能:

  • grok - 功能强大文本处理插件,主要用于格式化文本内容。

  • drop - 丢弃一些数据

outputs

Logstatsh的最后一个处理节点,outputs负责将数据同步到目的地。

下面是常见的目的地:

  • elasticsearch

  • file - 也可以将数据同步到一个文件中

Codecs

codecs就是编码器,负责对数据进行序列号处理,主要就是json和文本两种编码器。

Logstash - 同步MYSQL数据到Elasticsearch

在实际项目场景中,业务数据主流的存储方案还是MYSQL,但是MYSQL处理海量数据的搜索能力较差,目前MYSQL搭配ES,为业务提供强大的数据搜索能力是业界主流的方案,因此需要解决如何将MYSQL中的数据导入到ES中,下面介绍通过Logstash准实时的将MYSQL数据导入到ES中。

1.jdbc插件介绍

Logstash通过jdbc input插件实现定时同步MYSQL数据,了解JAVA的同学应该对jdbc不陌生,就是访问数据库的API标准,我们常见的数据库都可以使用jdbc接口进行访问。

使用jdbc访问数据库,通常都需要安装对应数据库的jdbc驱动,例如:MYSQL的jdbc驱动,到MYSQL官网下载对应的jar包就可以。

MYSQL jdbc驱动下载地址:

https://mvnrepository.com/artifact/mysql/mysql-connector-java

找到MYSQL对应的版本下载JAR包即可,例如下面下载8.0.15版本。

img

2.简单的同步例子

关键配置有两点:

  • 配置input jdbc输入插件

  • 配置output elasticsearch输出插件

完整的配置如下

input {
# 配置JDBC数据源
jdbc {
  # mysql jdbc驱动路径
  jdbc_driver_library => "/Users/tizi365/.m2/repository/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar"
  # mysql jdbc驱动类
  jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  # MYSQL连接地址,格式: jdbc:mysql://服务器地址:端口/数据库名
  jdbc_connection_string => "jdbc:mysql://localhost:3306/wordpress"
  # •MYSQL 账号
  jdbc_user => "root"
  # MYSQL 密码
  jdbc_password => "123456"
  # 定时任务配置,下面表示每分钟执行一次SQL
  # 具体语法请参考下一个章节内容
  schedule => "* * * * *"
  # 定时执行的SQL语句,Logstash会根据schedule配置,定时执行这里的SQL语句
  # 将SQL语句查询的结果,传给output插件
  statement => "SELECT * FROM `wp_posts`"
}
}

output {
  stdout {
    # 配置将数据导入到ES中
    elasticsearch {
      # 索引名,logstash会将数据导入到这个索引中
      index => "wp_posts"
      # ES服务器地址,支持多个地址
      hosts => ["127.0.0.1:9200","127.0.0.2:9200"]
      # 设置ES文档的唯一Id值为SQL语句返回的id
      # 建议将document_id设置为MYSQL表的主键
      document_id => "%{id}"
    }
  }
}

3.定时任务配置

jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下:

语法格式,总共由5个字段组成,含义如下:

  *    *   *   *  * 

分   时 天   月 星期

各个字段取值范围:

  • 分 - 0-59

  • 时 - 0-23

  • 天 - 1-31

  • 月 - 1-12

  • 星期 - 0-7

特殊字符含义:

  • 星号() :代表所有值,例如:第一个字段是星号(),则代表每分钟。

  • 逗号(,):指定一个数值范围,例如:1,2,3,4

  • 横杠(-):另外一种表示一个整数范围的方法,例如:1-4 表示1,2,3,4

  • 斜线(/):可以用斜线指定时间的间隔频率,例如:*/5,如果用在分钟字段,表示每5分钟执行一次。

例子:

# 每分钟执行一次
* * * * *
# 每10分钟执行一次
*/10 * * * *
# 每小时执行一次
* */1 * * *
# 每天0点执行一次
0 0 * * *
# 每天凌晨2点1分执行一次
1 2 * * *

4.增量同步数据

前面的例子同步数据的SQL如下:

input {
  # 配置JDBC数据源
  jdbc {
    # 忽略其他配置
    statement => "SELECT * FROM `wp_posts`"
  }
}

同步数据的SQL语句,直接扫描全表的数据,如果数据量比较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂了,因此需要实现增量同步,每次仅同步新增的数据。

Logstash提供了sql_last_value字段值,帮助我们实现增量同步;增量同步的核心思路就是,logstash每次执行SQL的时候,会将SQL查询结果的最后一条记录的某个值保存到sql_last_value字段中,下一次执行SQL的时候,以sql_last_value值作为参考,从这个值往后查询新数据。

例子:

input {
  jdbc {
    # 注意where条件id > :sql_last_value
    # 每次执行SQL的时候,id大于sql_last_value的值
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    # 允许sql_last_value的值来自查询结果的某个字段值。
    use_column_value => true
    # sql_last_value的值来自查询结果中的最后一个id值
    tracking_column => "id"
    # ... 忽略其他配置
  }
}

说明:

sql_last_value的默认值是0或者1970-01-01,具体是什么值跟数据类型有关,上面的例子,定时任务执行SQL如下

# 第一次执行,sql_last_value=0
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 0

# 第二次执行,sql_last_value=100,假设上面的SQL最后的id值是100
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 100

# 第三次执行,sql_last_value=200,,假设上面的SQL最后的id值是200
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 200

提示:

上面的例子,使用id作为增量同步数据的依据,不一定适合所有的业务场景,例如:同步文章数据,文章更新了,但是文章的id没有更新,这个时候使用id作为增量同步的依据,会导致更新的文章没有同步到ES,这种场景适合使用更新时间作为增量同步的依据,用法一样,sql_last_value换一个字段值即可。

5.分页

前面的章节在实现增量同步的时候,也存在一个问题,如果增量同步的数据太多的时候,logstash也会卡死,尤其是首次增量同步,例如:一个MYSQL表的数据有100万,首次增量同步数据,会扫描全表的数据。

logstash jdbc插件执行分页查询,避免一次查询太多数据,配置如下:

input {
jdbc {
  # 激活分页处理
  jdbc_paging_enabled => true
  # 分页大小,每次查询1000条数据
  jdbc_page_size => 1000
  # sql语句
  statement => "SELECT * FROM my_table"
  # ... 忽略其他配置
}
}

6.大表同步

在实际业务场景中,有些数据表的数据会有几百万,甚至上亿的数据,那么在使用logstash同步这些大表数据的时候,结合前面两个章节的增量同步和分页处理就可以解决,不过需要注意深度分页的性能问题。

例如:

# 每次查询1000条数据,但是翻页从第500万条数据偏移开始
SELECT * FROM my_table limit 5000000, 1000

这条SQL会非常慢,可以借助索引覆盖优化性能。

例子:

SELECT * FROM my_table WHERE id in (SELECT id FROM my_table limit 5000000, 1000)

因为id是主键,在主键索引中已经包含id的值,不需要回表扫描磁盘的数据,所以性能比较好,上面的SQL首先借助索引覆盖将id值查询出来,然后根据id查询具体的数据。

Filebeat 教程

logstash虽然也支持从磁盘文件中收集数据,但是logstash自己本身还是比较重,对资源的消耗也比较大,尤其是在容器化环境,每个容器都部署logstash也太浪费资源,因此出现了轻量级的日志文件数据收集方案Filebeat,Filebeat将收集到的文件数据传给Logstatsh处理即可。

Filebeat部署架构

img

可以在每一台服务器或者每一个容器中安装Filebeat,Filebeat负责收集日志数据,然后将日志数据交给Logstash处理,Logstash在将数据导入ES。

安装Filebeat

下载安装包,然后解压即可。

官网下载地址:

https://www.elastic.co/cn/downloads/beats/filebeat

下面以7.7.1版本为例

mac

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-darwin-x86_64.tar.gz

tar xzvf filebeat-7.7.1-darwin-x86_64.tar.gz

linux

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-linux-x86_64.tar.gz

tar xzvf filebeat-7.7.1-linux-x86_64.tar.gz

Filebeat配置

Filebeat的配置结构类似Logstash,也需要配置input和output,分别配置输入和输出,Filebeat使用yaml格式编写配置文件。

默认配置文件路径:

${安装目录}/filebeat.yml
/etc/filebeat/filebeat.yml
/usr/share/filebeat/filebeat.yml

因为我们使用的是tar安装包安装,所以选择${安装目录}/filebeat.yml 路径。

配置例子:

# 配置采集数据源
filebeat.inputs:
- type: log
  paths:
    - /var/log/messages
    - /var/log/*.log
# 配置输出目标,这里将数据投递给logstash
output.logstash:
  # logstash地址
  hosts: ["127.0.0.1:5044"]

说明:

type为log类型,表示收集日志文件数据,paths是一个文件路径数组,这里扫描/var/log/messages文件和/var/log/目录下所有以log为扩展名的日志文件。

Logstash beat配置

配置Logstash的input,让Logstash可以接收Filebeat投递过来的数据。

input {
# 配置接收Filebeat数据源,监听端口为5044
# Filebeat的output.logstash地址保持跟这里一致
beats {
  port => 5044
}
}

output {
# 将数据导入到ES中
elasticsearch {
  hosts => ["http://localhost:9200"]
  index => "tizi365"
}
}

启动Filebeat

进入filebeat安装目录

./filebeat -c filebeat.yml

如果配置PATH,直接启动即可。

input插件

Logstash Beats插件

Beats input插件让Logstash可以接收来自Elastic Beats framework发送过来的数据,Elastic Beats framework用的比较多的就是Filebeat.

例子

input {
# 在5044端口监听来自beats框架的数据
beats {
  port => 5044
}
}

output {
elasticsearch {
  hosts => ["http://localhost:9200"]
  index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
}

接收来自beats的数据,并且将数据导入到ES中。

Beats Input插件参数

参数名类型默认值说明
host string 0.0.0.0 监听地址
port number 监听端口

Logstash File input插件

Logstash的file input插件可以实现从磁盘文件中采集数据,通常用于收集日志文件数据,file input插件一行行的从文件中读取数据,然后交给logstash。

提示: file input插件的作用跟linux命令tail -f 的作用类似,可以实时收集文件的最新数据。

例子

input {
  # 扫描指定文件日志数据
  file {
    # 指定需要扫描的日志文件,支持多个文件,也支持星号(*)通配符
    # 含义:扫描/var/log/messages文件和/var/log/目录下的所有以log为扩展名的日志文件。
    path => [ "/var/log/messages", "/var/log/*.log" ]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "tizi365" 
  }
}

file Input插件参数

参数名类型默认值说明
path array 需要扫描的文件路径,数组格式:[ "/var/log/messages", "/var/log/tizi.log" ]
delimiter string \n 指定文件换行符
exclude string 指定需要排除的文件,例如排除压缩包:*.gz , 这个参数通常在path参数包含通配符的时候,一起配合使用

Logstash Exec input插件

Exec input插件可以定时的执行一个命令,然后采集命令输出的结果,通过exec插件,我们可以轻松的采集linux系统状态,例如:定时的采集linux服务的内存使用情况。

例子:

input {
# 通过exec插件,定时的通过命令
exec {
  # 需要执行的命令
  command => "free -m"
  # 30秒执行一次
  interval => 30
}
}

output {
elasticsearch {
  hosts => ["http://localhost:9200"]
  index => "tizi365"
}
}

说明:

30秒执行一次free -m命令,命令输出的结果,会被Logstash同步到ES中。

exec Input插件参数

参数名类型默认值说明
command string 设置需要执行的命令
interval number 单位是秒,多长时间执行一次命令
schedule string 使用类型linux crontab的语法,设置定时任务,例如:/10 * * * 代表每10分钟跑一次,interval和schedule参数二选一即可

Logstash jdbc input插件

jdbc插件用于解决Logstash采集数据库数据问题,基本上所有的关系数据库都支持jdbc接口,例如: MYSQL、Oracle等。

jdbc插件通过定时任务,定时的执行SQL语句,从数据库中读取数据,定时任务语法类似linux的crontab的写法。

例子

input {
  # 配置jdbc数据源
  jdbc {
    # 指定jdbc驱动路径
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    # jdbc驱动类
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库连接配置
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    # 数据库账号
    jdbc_user => "mysql"
    # 数据库密码
    jdbc_password => "123456"
    # SQL绑定的参数
    parameters => { "favorite_artist" => "Beethoven" }
    # 定时任务配置
    schedule => "* * * * *"
    # SQL语句
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "tizi365" 
  }
}

详细的例子可以参考:同步MYSQL数据到Elasticsearch

jdbc Input插件参数

参数名类型默认值说明
jdbc_driver_library string   指定jdbc驱动路径, 不同数据库jdbc驱动不一样
jdbc_driver_class string   jdbc驱动类,新版的MYSQL驱动类为:com.mysql.cj.jdbc.Driver
jdbc_connection_string string   数据库连接配置, 格式: jdbc:数据库类型://地址:端口/数据库,例子:jdbc:mysql://localhost:3306/mydb
jdbc_user string   数据库账号
jdbc_password string   数据库密码
schedule string   定时任务配置,语法可以参考linux的cron
statement string   需要执行的SQL语句
parameters hash   SQL绑定参数,例子:{ "target_id" => "321" }
use_column_value boolean false 当设置为true时,使用tracking_column定义的列作为:sql_last_value的值。当设置为false时,:sql_last_value等于上次执行查询的时间。
tracking_column string   定义使用SQL查询结果中的哪一个字段值作为sql_last_value的值
jdbc_paging_enabled boolean false 激活分页处理
jdbc_page_size number 100000 分页大小

Logstash kafka input插件

kafka input插件 支持Logstash从kafka消息队列中的topic读取数据。

例子

input {
  # 配置kafka数据源
  kafka {
    # kafka服务器地址,多个地址使用逗号分隔
    bootstrap_servers => "localhost:9092"
    # 订阅的主题,支持订阅多个主题
    topics => ["logstash", "tizi365"]
    # 消费者线程数
    consumer_threads => 5
    # 消费组Id
    group_id => "logstash"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "tizi365" 
  }
}

kafka Input插件参数

参数名类型默认值说明
bootstrap_servers string localhost:9092 kafka服务器地址,多个地址使用逗号分隔
topics array ["logstash"] 订阅的主题,支持订阅多个主题
consumer_threads number 1 消费者线程数
group_id string logstash 消费组Id
fetch_min_bytes number   一次最少从服务器读取多少字节数据
fetch_max_bytes number   一次最多从服务器读取多少字节数据

Logstash RabbitMQ input插件

RabbitMQ input插件,支持Logstash通过RabbitMQ消息队列读取数据。

例子:

input {
# 配置rabbitmq数据源
rabbitmq {
  # rabbitmq服务器地址
  host => "localhost"
  # 端口  
  port => 5672
  # RabbitMQ 账号
  user => "guest"
  # RabbitMQ 密码
  password => "guest"
  # 队列名
  queue => "tizi365"
}
}

output {
elasticsearch {
  hosts => ["http://localhost:9200"]
  index => "tizi365"
}
}

RabbitMQ Input插件参数

参数名类型默认值说明
host string   rabbitmq服务器地址
port number 5672 端口
user string guest RabbitMQ 账号
password string guest RabbitMQ 密码
queue string   队列名
auto_delete boolean false 最后一个消费组退出后是否删除消息
prefetch_count number 256 预加载多少条消息到本地

Logstash redis input插件

redis input插件支持Logstash从redis中读取数据,目前仅支持从redis的list和channels两种数据结构中读取数据。

例子

 input {
  # 配置redis数据源
  redis {
    # redis服务器地址
    host => "127.0.0.1"
    # 端口  
    port => 6379
    # redis 密码, 没有设置密码可以不填
    password => "123456"
    # 从哪个key读取数据
    key => "tizi365_list"
    # 设置Key的redis的数据类型
    data_type => "list"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "tizi365" 
  }
}

redis Input插件参数

参数名类型默认值说明
host string 127.0.0.1 redis服务器地址
port number 6379 redis服务器端口号
password string   redis服务密码
key string   配置logstash从哪个key读取数据
data_type string   设置Key的redis的数据类型,支持list, channel
db number 0 redis数据库
threads number 1 并发线程数
timeout number 5 redis连接超时时间,单位秒
ssl boolean false 是否打开ssl支持
batch_count number 125 一次批量从redis加载多少条数据

output插件

Logstash Elasticsearch output插件

通过Elasticsearch output插件Logstash可以将采集到的数据导入到Elasticsearh中。

例子:

input {
  # 扫描指定文件日志数据
  file {
    path => [ "/var/log/messages" ]
  }
}

output {
  # 将数据导入到ES中
  elasticsearch {
    # ES服务地址
    hosts => ["http://localhost:9200"]
    # 索引名
    index => "tizi365" 
  }
}

Elasticsearch output插件参数

参数名类型默认值说明
hosts uri [//127.0.0.1] ES服务地址
index string logstash-%{+yyyy.MM.dd} 索引名
document_id string   设置document的id值,通常使用logstash采集数据的某个字段值作为id值,例如:%{id}
user string   账号
password string   密码
routing string   设置ES的路由参数

Logstash Stdout output插件

在调试Logstash调试的时候,可以将Logstash收集到的数据在命令窗口直接打印出来,通过Stdout output插件可以实现将数据打印到标准输出。

简单例子:

input {
  # 扫描指定文件日志数据
  file {
    path => [ "/var/log/messages" ]
  }
}

output {
  # 将数据直接打印出来
  stdout {}
}

指定输出格式:

output {
  # 以Json格式将数据直接打印出来
  stdout { codec => json }
}

以rubydebug的格式打印数据:

output {
  # 以Json格式将数据直接打印出来
  stdout { codec => rubydebug }
}

filter插件

Logstash grok filter插件

通过grok filter插件我们可以对文本内容进行格式化处理,提取文本中的内容,并将其转换成json格式,在处理日志内容的时候非常有用。

例子:

例如日志内容如下:

55.3.244.1 GET /index.html 15824 0.043

这条日志内容包含了ip、http请求方法、请求路径、响应内容大小、响应时间,这条日志是一行字符串,我们可以通过grok将其格式化为:client、method、request、bytes、duration这几个字段,然后在保存到elasticsearch中。

logstash配置:

input {
# 扫描指定文件日志数据
file {
  path => [ "/var/log/http.log" ]
}
}
# 配置过滤器插件,对Input收集到的数据进行格式化处理
filter {
  # 通过grok插件,格式化文本内容
  grok {
    # grok参数,这里决定如何对每一行日志进行参数提取
    # message 字段的内容就是格式化日志的表达式
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

output {
# 将数据导入到ES中
elasticsearch {
  # ES服务地址
  hosts => ["http://localhost:9200"]
  # 索引名
  index => "tizi365"
}
}

通过grok提取的结果如下:

  • client: 55.3.244.1

  • method: GET

  • request: /index.html

  • bytes: 15824

  • duration: 0.043

grok模式语法

grok的提取字符串内容的语法其实就是在正则表达式基础之上进行封装,Logstash grok内置了120种默认表达式,解决很多日常需求,不需要重头编写复杂的正则表达式。

grok表达式语法:

%{模式名:自定义字段名}

说明:

  • 模式名 - 指的就是预先定义好的正则表达式的别名,例如:IP 可以匹配ip内容。

  • 自定义字段名 - 通过模式匹配到内容后,将内容保存到这个自定义的字段中

例子:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

这是上面例子的grok表达式,下面是对表达式的解读:

  • %{IP:client} - 匹配IP内容,结果保存到client字段

  • %{WORD:method} - 匹配非空字符串内容,结果保存到method字段

  • %{URIPATHPARAM:request} - 匹配url路径,结果保存到request字段

  • %{NUMBER:bytes} - 匹配数字,结果保存到bytes字段

grok filter插件参数

参数名类型默认值说明
match hash {} 定义grok的表达式,格式: message => "表达式"
patterns_dir array [] 自定义模式配置文件的路径,支持多个路径,例子:["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]

grok内置模式

常用模式

表达式标识名称详情匹配例子
USERNAME 或 USER 用户名 由数字、大小写及特殊字符(._-)组成的字符串 1234、Bob、Alex.Wong
EMAILLOCALPART 用户名 首位由大小写字母组成,其他位由数字、大小写及特殊字符(_.+-=:)组成的字符串。注意,国内的QQ纯数字邮箱账号是无法匹配的,需要修改正则 windcoder、windcoder_com、abc-123
EMAILADDRESS 电子邮件   windcoder@abc.comwindcoder_com@gmail.comabc-123@163.com
HTTPDUSER Apache服务器的用户 可以是EMAILADDRESS或USERNAME  
INT 整数 包括0和正负整数 0、-123、43987
BASE10NUM 或 NUMBER 十进制数字 包括整数和小数 0、18、5.23
BASE16NUM 十六进制数字 整数 0x0045fa2d、-0x3F8709
WORD 字符串 包括数字和大小写字母 String、3529345、ILoveYou
NOTSPACE 不带任何空格的字符串    
SPACE 空格字符串    
QUOTEDSTRING 或 QS 带引号的字符串   "This is an apple"、'What is your name?'
UUID 标准UUID   550E8400-E29B-11D4-A716-446655440000
MAC MAC地址 可以是Cisco设备里的MAC地址,也可以是通用或者Windows系统的MAC地址  
IP IP地址 IPv4或IPv6地址 127.0.0.1、FE80:0000:0000:0000:AAAA:0000:00C2:0002
HOSTNAME IP或者主机名称    
HOSTPORT 主机名(IP)+端口   127.0.0.1:3306、api.windcoder.com:8000
PATH 路径 Unix系统或者Windows系统里的路径格式 /usr/local/nginx/sbin/nginx、c:\windows\system32\clr.exe
URIPROTO URI协议   http、ftp
URIHOST URI主机   windcoder.com、10.0.0.1:22
URIPATH URI路径   //windcoder.com/abc/、/api.php
URIPARAM URI里的GET参数   ?a=1&b=2&c=3
URIPATHPARAM URI路径+GET参数 /windcoder.com/abc/api.php?a=1&b=2&c=3  
URI 完整的URI   https://windcoder.com/abc/api.php?a=1&b=2&c=3
LOGLEVEL Log表达式 Log表达式 Alert、alert、ALERT、Error

日期时间模式

表达式标识名称匹配例子
MONTH 月份名称 Jan、January
MONTHNUM 月份数字 03、9、12
MONTHDAY 日期数字 03、9、31
DAY 星期几名称 Mon、Monday
YEAR 年份数字  
HOUR 小时数字  
MINUTE 分钟数字  
SECOND 秒数字  
TIME 时间 00:01:23
DATE_US 美国时间 10-01-1892、10/01/1892/
DATE_EU 欧洲日期格式 01-10-1892、01/10/1882、01.10.1892
ISO8601_TIMEZONE ISO8601时间格式 +10:23、-1023
TIMESTAMP_ISO8601 ISO8601时间戳格式 2016-07-03T00:34:06+08:00
DATE 日期 美国日期%{DATE_US}或者欧洲日期%{DATE_EU} |
DATESTAMP 完整日期+时间 07-03-2016 00:34:06
HTTPDATE http默认日期格式 03/Jul/2016:00:36:53 +0800

Grok自带的模式,具体的规则可以参考下面链接

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

自定义模式

如果grok内置的模式无法满足需求,也可以自定义模式。

模式定义语法:

NAME PATTERN

说明:

  • NAME - 模式名

  • PATTERN - 表达式,包括正则表达式和logstash变量。

例子:

步骤1:

配置文件路径:/opt/logstash/tizi_patterns ,文件内容如下

TIZI_NUMBER \d+

提示:自定义模式配置文件路径,可以根据项目情况自定义即可

步骤2:

在logstash配置文件中引用自定义表达式

filter {
   grok {
      # 指定自定义模式路径
      patterns_dir => ["/opt/logstash/tizi_patterns"]
      # 使用自定义模式
      match => { "message" => "%{TIZI_NUMBER:tizi_data}" }
   }
}

调试grok模式

Kibana支持在线调试grok,如下截图:

img

Logstash java_uuid filter插件

如果我们想给logstash收集到的每一条数据增加一个唯一id,可以通过java_uuid和uuid两个filter插件实现,他们的区别只是底层实现不同,效果类似。

java_uuid

filter {
    # java版的uuid生成插件
    java_uuid {
      # 生成的唯一id,保存到target指定的字段
      target   => "uuid"
      # 如果target指定的字段已经存在,是否覆盖
      overwrite => true
    }
  }

uuid

filter {
    # 定义uuid插件
    uuid {
      # 生成的唯一id,保存到target指定的字段
      target   => "uuid"
      # 如果target指定的字段已经存在,是否覆盖
      overwrite => true
    }
  }

Logstash json filter插件

通常情况,Logstash收集到的数据都会转成json格式,但是默认logstash只是对收集到的格式化数据转成json,如果收到的数据仅仅是一个字符串是不会转换成Json.

例如:

{
    "id":20,
    "domain": "https://www.tizi365.com",
    "data": "{\"type\":1, \"msg\":\"message ok\"}"
}

data字段的内容是一个json字符串,不是格式化的Json格式,如果数据导入到Elasticsearch,data字段也是一个字符串,不是一个Json对象;json filter插件可以解决这种问题。

例子:

filter {
    # 定义json插件
    json {
        # 指定需要转换成json格式的字段
        source => "data"
        # 指定转换成json的数据,保存到那个字段,如果字段存在会覆盖
        target => "data"
        # 如果遇到错误的json,是否跳过json filter过滤器
        skip_on_invalid_json => true
    }
}

json filter格式化数据后,输出如下:

{
  "id":20,
  "domain": "https://www.tizi365.com",
  "data": {
      "type":1,
      "msg":"message ok"
  }
}

Logstash kv filter插件

如果logstash收集到的日志格式是key=value键值对,可以通过kv filter插件对其进行格式化。

例子:

日志内容:

ip=1.2.3.4 error=REFUSED

logstash配置

input {
# 扫描指定文件日志数据
file {
  path => [ "/var/log/http.log" ]
}
}

filter {
  # 使用kv filter格式化键值对日志内容
  kv { }
}

output {
# 将数据直接打印出来
stdout {}
}

logstash输出的内容如下:

{
  "ip": "1.2.3.4",
  "error": "REFUSED"
}

kv filter插件参数

参数名类型默认值说明
prefix string   指定key的前缀,例如:arg_
field_split string " " 指定两个kv值直接的分隔符,默认是空格,例:field_split => "&" , 通过&分隔键值对
default_keys hash   设置key的默认值,例:default_keys => [ "from", "logstash@example.com", "to", "default@dev.null" ]

Logstash drop filter插件

drop filter插件主要用于删除logstash收集到的数据,通常配合条件语句一起使用。

提示:logstash是一条一条数据发给filter处理,所以drop filter也是一条数据,一条数据的删除。

例子:

input {
# 扫描指定文件日志数据
file {
  path => [ "/var/log/http.log" ]
}
}

filter {
  # 如果loglevel字段值等于debug,则删除整条消息
  if [loglevel] == "debug" {
      # 通过drop过滤器删除消息
      drop { }
  }
}

output {
# 将数据直接打印出来
stdout {}
}

标签:教程,jdbc,插件,logstash,input,数据,Logstash
来源: https://www.cnblogs.com/root-123/p/16572116.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有