ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

faust从kafka消费nginx日志流分析告警

2022-01-23 15:31:50  阅读:154  来源: 互联网

标签:ip reason value kafka nginx host faust key error


faust从kafka消费nginx日志流分析告警

nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。

基于faust框架编写数据流消费程序,从kafka指定的topic上消费数据流,通过stream.filter+lambda表达式,指定错误界别的数据流进行分析,使用域名和ip为key进行计数,当错误超过阈值时发送告警通知相关人员。

import faust
import redis
from feishuRobot import feishuRobot
from datetime import timedelta
from log import logger
from config.config import ReadConfig

try:
    obj = ReadConfig()
    conf = obj.get_config()
    logger.info("load config file successful")
except Exception as r:
    logger.error('Fail to load config file: {reason}', reason=r)

try:
    pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] )
    obj_r = redis.Redis(connection_pool=pool)

except Exception as r:
    logger.error("Fail to connection redis poll: {reason}", reason=r)

app = faust.App(
    'error_log_alarm',
    store='rocksdb://',
    broker= conf['kafka']['access_broker'],
    stream_wait_empty=False, 
    broker_max_poll_records=conf['kafka']['max_poll'], 
    topic_partitions=1,
    #vaule_type=json,
    #value_serializer='raw',
)

class Transfer(faust.Record):
    from_host_ip: str
    level: str
    #message: str
    reason: str
    logtime: str


#def master_processor(key, events):
    #timestamp = key[1][0]
    #for event in events:


record_error = app.Table(
    'record_error',
    default=int,
    #on_window_close=master_processor,
#).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream()
).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream()


error_topic = app.topic('sec-waf-error-log', value_type=Transfer)

@app.agent(error_topic)
async def greet(stream):
    
    '''
    #async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip):
    async for value in stream:
        master_to_total[value.from_host_ip] += 1
    '''
    #async for value in stream.group_by(Transfer.from_host_ip):
    #upstream timed out (110: Connection timed out) while reading response header from upstream
    #httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer
    #upstream prematurely closed connection while reading response header from upstream
    # connect() failed (111: Connection refused) while connecting to upstream
    #access forbidden by rule,
    #recv() failed (104: Connection reset by peer) while proxying upgraded connection

    try:
        feishu = feishuRobot()
    except Exception as r:
        logger.error("Fail to init feishuRobot object: {reason}", reason=r)

    async for value in stream.filter(lambda x: x.level == "error"):
        #print("attack: ", value)
        record_error['{value.from_host_ip}'] += 1
        

        #域名_ip统计计数
        v = record_error['{value.from_host_ip}']
  
        if v.now() >= 10:
            msg = ""
            err_key = ""

            if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_timed_out" 
                msg += "级别: 中\r\n" 

            elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_reset_peer" 
                msg += "级别: 中\r\n" 

            elif "upstream prematurely closed connection while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection"
                msg += "级别: 中\r\n" 

            elif "access forbidden by rule" in value.reason:
                err_key = "error_" + value.from_host_ip + "_access_forbidden_rule"
                msg += "级别: 低\r\n" 

            elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason:
                errr_key = "error_" + value.from_host_ip + "_Connection_refused"
                msg += "级别: 中\r\n" 

            elif "client intended to send too large body" in value.reason:
                err_key = "error_" + value.from_host_ip + "_send_too_large_body"
                msg += "级别: 低\r\n" 

            elif "failed to commit the pipelined (push_count_dict)" in value.reason:
                err_key = "error_" + value.from_host_ip + "_commit_the_pipelined"
                msg += "级别: 低\r\n" 

            elif "could not build optimal server_names_hash" in value.reason: #warning
                err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash"
                msg += "级别: 低\r\n" 

            elif "no live upstreams while connecting to upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_no_live_upstreams"
                msg += "级别: 高\r\n" 

            elif "SSL_do_handshake() failed" in value.reason:
                err_key = "error_" + value.from_host_ip + "_SSL_do_handshake"
                msg += "级别: 高\r\n" 


            if obj_r.get(err_key) is not None:
                #如果存在更新计数统计
                obj_r.set(err_key, str(v.now()))
            else:
                obj_r.set(err_key, str(v.now()))
                obj_r.expire(err_key, conf['redis']['record_expire'])

                msg += "waf节点: " + value.from_host_ip + "\r\n"
                msg += "错误信息" + value.reason + "\r\n"
                msg += "时间: " + value.logtime + "\r\n"
                msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n"
                feishu.send_card_text(msg)
               
if __name__ == '__main__':
    app.main()

标签:ip,reason,value,kafka,nginx,host,faust,key,error
来源: https://blog.csdn.net/realmardrid/article/details/122651853

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有