ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

springboot使用kafka发送日志到elk

2022-01-14 21:01:21  阅读:298  来源: 互联网

标签:elk log spring kibana kafka logstash springboot


单机单节点 篇

win10环境(jdk1.8)

一、环境搭建

1,安装elasticsearch

2,安装kibana

3,安装logstash

4,安装kafka

二、项目搭建并运行验证

1,搭建项目并运行验证

三、安装过程记录

1,简单说明

一.1-3 为elk通用方案的内容,使用任何语言项目都可参考

一.4 为使用kafka作为消息队列异步收集项目日志并传递到logstash,有使用该方案的可参考

二.1 为springboot使用log4j将日志传递到kafka的实例

2,安装步骤

2.1 elk官网下载三个应用的安装包(elasticsearch,kibana,logstash)

https://www.elastic.co/cn/

elasticsearch 7.16.3:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-windows-x86_64.zip

kibana 7.16.3:

https://artifacts.elastic.co/downloads/kibana/kibana-7.16.3-windows-x86_64.zip

logstash 7.16.3:

https://artifacts.elastic.co/downloads/logstash/logstash-7.16.3-windows-x86_64.zip

2.2 部署运行elk

以 D:\elk 路径为elk环境的路径

路径下新建三个文件夹 elasticsearch        kinaba        logstash

将下载的三个包分别解压到对用的路径下

2.2.1 启动配置elasticsearch

启动

        终端或者powershell中执行 D:\elk\elasticsearch\bin\elasticsearch.bat

配置 最简配 不做配置

2.2.2 启动配置kibana

编辑

        kibana/conf/kibana.yml

        elasticsearch.hosts: "http://localhost:9200"

启动

        终端或者powershell中执行 D:\elk\kibana\bin\kibana.bat

2.2.3 启动配置logstash

编辑

        D:\elk\logstash\config\logstash.conf

        

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    group_id => "log_topic"
    client_id => "logstash_01"
    auto_offset_reset => "latest"
    topics => ["kafka_log_topic"]	
    add_field => {"logs_type" => "springboot"}		
    codec => json { charset => "UTF-8" }
  }
}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "springboot"
    #user => "elastic"
    #password => "changeme"
  }
}

启动

        D:        cd \elk\logstash

        

.\bin\logstash.bat -f .\config\log4j2.properties

验证:

访问 localhost:5601 即可进入kibana页面

3. kafka安装启动

        kafka官网 https://kafka.apache.org/

        下载 https://dlcdn.apache.org/kafka/2.8.1/kafka_2.12-2.8.1.tgz

        以路径D:\kafka为例

        解压在该路径下

编辑

        config\zookeeper.properties

       

dataDir=D:\kafka\data

        

       config\server.properties

log.dirs=D:\kafka\logka
zookeeper.connect=localhost:2181

        启动

        进入kafka路径下 分别用两个终端或者powershell执行下列命令启动zk和kafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

        创建log信息的topic(路径不变)

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --topic kafka_log_topic  --partitions 1 --replication-factor 1

看到控制台输出:Created topic kafka_log_topic_2. 则成功

四、建立项目并启动验证

4.1 创建springboot项目

添加依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.6</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

controll

er

@RestController
@RequestMapping(value = "/log")
public class LogController {

    Logger logger = LoggerFactory.getLogger(LogController.class);
    private int num = 1;

    @RequestMapping(value = "/log", method = RequestMethod.GET)
    public String log() throws InterruptedException {
        while (true) {
            Thread.sleep(100);
            logger.info("just a log");
            if (num++ % 100 == 0) {
                break;
            }
        }
        return "ok";
    }

}

添加kafka配置

application.properties

logging.config=classpath:log4j.xml
# kafka服务器地址,多个集群用逗号分隔
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=default_consumer_group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

log4j.xml

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,
你会看到log4j2内部各种详细输出。可以设置成OFF(关闭)或Error(只输出错误信息) -->
<Configuration status="OFF">
    <Properties>
        <!-- 配置日志文件输出目录 -->
        <Property name="log_path" value="log/" />
        <Property name="file_name">log</Property>
        <Property name="kafka_log_topic">kafka_log_topic</Property>
        <Property name="bootstrap_servers">localhost:9092</Property>
    </Properties>

    <Appenders>
        <!-- 输出控制台日志的配置 -->
        <Console name="console" target="SYSTEM_OUT">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
            <!-- 输出日志的格式 -->
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss SSS} [%t] %-5level %logger{36} - %msg%n" />
        </Console>
        <File name="log_file" fileName="${log_path}/${file_name}.log"  append="true" immediateFlush="true">
            <PatternLayout pattern="%d{yy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
        </File>
        <Kafka name="kafka" topic="${kafka_log_topic}">
            <!--<PatternLayout pattern="%date %message"/>-->
            <Property name="bootstrap.servers">${bootstrap_servers}</Property>
            <!--json格式输出-->
            <JsonLayout compact="true" locationInfo="true" complete="false" eventEol="true"/>
        </Kafka>
    </Appenders>

    <Loggers>
        <Root level="info">
            <AppenderRef ref="kafka"/>
            <AppenderRef ref="console"/>
            <AppenderRef ref="log_file"/>
        </Root>
        <!-- <Logger name="org.apache.kafka" level="INFO" />--> <!-- avoid recursive logging -->
        <logger name="org.springframework" level="INFO"/>
    </Loggers>
</Configuration>

启动项目

浏览器访问 localhost:8080/log/log 记录日志消息

4.2 验证

打开 http://localhost:5601/app/management​​​​​​

点击左侧栏 Kibana->Index Patterns 创建检索的索引

可以看到右边有springboot索引候选

输入springboot*  Timestamp field随便选一个 创建即完成

访问 ​​​​​​http://localhost:5601/app/discover#/​​​​​​

选择 刚建立的索引即可看到springboot项目的日志可以在这里检索。

五、elk kafka涉及知识点

待补充

标签:elk,log,spring,kibana,kafka,logstash,springboot
来源: https://blog.csdn.net/nengyongma/article/details/122500360

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有