nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。


import faust
import redis
from feishuRobot import feishuRobot
from datetime import timedelta
from log import logger
from config.config import ReadConfig

    obj = ReadConfig()
    conf = obj.get_config()
    logger.info("load config file successful")
except Exception as r:
    logger.error('Fail to load config file: {reason}', reason=r)

    pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] )
    obj_r = redis.Redis(connection_pool=pool)

except Exception as r:
    logger.error("Fail to connection redis poll: {reason}", reason=r)

app = faust.App(
    broker= conf['kafka']['access_broker'],

class Transfer(faust.Record):
    from_host_ip: str
    level: str
    #message: str
    reason: str
    logtime: str

#def master_processor(key, events):
    #timestamp = key[1][0]
    #for event in events:

record_error = app.Table(
#).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream()
).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream()

error_topic = app.topic('sec-waf-error-log', value_type=Transfer)

async def greet(stream):
    #async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip):
    async for value in stream:
        master_to_total[value.from_host_ip] += 1
    #async for value in stream.group_by(Transfer.from_host_ip):
    #upstream timed out (110: Connection timed out) while reading response header from upstream
    #httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer
    #upstream prematurely closed connection while reading response header from upstream
    # connect() failed (111: Connection refused) while connecting to upstream
    #access forbidden by rule,
    #recv() failed (104: Connection reset by peer) while proxying upgraded connection

        feishu = feishuRobot()
    except Exception as r:
        logger.error("Fail to init feishuRobot object: {reason}", reason=r)

    async for value in stream.filter(lambda x: x.level == "error"):
        #print("attack: ", value)
        record_error['{value.from_host_ip}'] += 1

        v = record_error['{value.from_host_ip}']
        if v.now() >= 10:
            msg = ""
            err_key = ""

            if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_timed_out" 
                msg += "级别: 中\r\n" 

            elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_reset_peer" 
                msg += "级别: 中\r\n" 

            elif "upstream prematurely closed connection while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection"
                msg += "级别: 中\r\n" 

            elif "access forbidden by rule" in value.reason:
                err_key = "error_" + value.from_host_ip + "_access_forbidden_rule"
                msg += "级别: 低\r\n" 

            elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason:
                errr_key = "error_" + value.from_host_ip + "_Connection_refused"
                msg += "级别: 中\r\n" 

            elif "client intended to send too large body" in value.reason:
                err_key = "error_" + value.from_host_ip + "_send_too_large_body"
                msg += "级别: 低\r\n" 

            elif "failed to commit the pipelined (push_count_dict)" in value.reason:
                err_key = "error_" + value.from_host_ip + "_commit_the_pipelined"
                msg += "级别: 低\r\n" 

            elif "could not build optimal server_names_hash" in value.reason: #warning
                err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash"
                msg += "级别: 低\r\n" 

            elif "no live upstreams while connecting to upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_no_live_upstreams"
                msg += "级别: 高\r\n" 

            elif "SSL_do_handshake() failed" in value.reason:
                err_key = "error_" + value.from_host_ip + "_SSL_do_handshake"
                msg += "级别: 高\r\n" 

            if obj_r.get(err_key) is not None:
                obj_r.set(err_key, str(v.now()))
                obj_r.set(err_key, str(v.now()))
                obj_r.expire(err_key, conf['redis']['record_expire'])

                msg += "waf节点: " + value.from_host_ip + "\r\n"
                msg += "错误信息" + value.reason + "\r\n"
                msg += "时间: " + value.logtime + "\r\n"
                msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n"
if __name__ == '__main__':

