ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Suricata6.0流表管理源码注释四:流的建立02

2022-01-08 19:03:28  阅读:173  来源: 互联网

标签:02 Suricata6.0 tv bucket flow 获取 源码 内存 NULL


这篇文章接着上篇继续解释流的建立,其中最重要的一个函数FlowGetNew,主要目的是获取一个flow,获取过程中也是非常曲折的,求爷爷告奶奶才能求来一个flow。

1. FlowGetNew 这个函数也分几个方面理解:

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew

a。从线程自己的flow队列里获取flow,如果获取成功则返回flow,如果没有就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。

b。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存超过配置上限,则设置进入紧急模式。

c。接b,设置紧急模式之后,调用函数FlowGetUsedFlow获取flow,这个函数是从正在使用的全局flow_hash的bucket链表里获取一个,后续会注释这个函数。

d。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc。

static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, const Packet *p)
{
    //获取紧急模式标志
    const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

    //检查是否可以生成flow,icmp错误包不生成flow
    //如果是紧急模式,tcp非sync的包不生成flow,可以看出sync包优先生成flow
    if (FlowCreateCheck(p, emerg) == 0) {
        return NULL;
    }

    //从线程自己的flow队列里获取flow,如果获取成功则返回flow
    /* get a flow from the spare queue */
    Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue);
    if (f == NULL) {
        //如果获取flow失败,就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。
        //FlowSpareSync这个函数主要是从空闲的全局flow内存池获取一个flow队列,给自己用
        f = FlowSpareSync(tv, fls, p, emerg);
    }

    //全局flow内存池也没有可用的flow队列
    if (f == NULL) {
        /* If we reached the max memcap, we get a used flow */
        //判断如果flow内存超过配置上限,则进入紧急模式。
        if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
            /* declare state of emergency */
            if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
                //没有设置紧急模式标志才会设置,设置过就没必要再设置了
                SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
                //这个函数把当前的流老化的正常超时时间设置为紧急模式超时时间
                FlowTimeoutsEmergency();
            }
        
            //没有空闲flow,flow占用内存也超过配置上限,试着从正在使用的全局flow_hash
            //的bucket链表上获取一个,只有引用计数为0则拿出来,不管它是否满足超时条件
            f = FlowGetUsedFlow(tv, fls->dtv, &p->ts);
            if (f == NULL) {
                return NULL;
            }
#ifdef UNITTESTS
            if (tv != NULL && fls->dtv != NULL) {
#endif
                StatsIncr(tv, fls->dtv->counter_flow_get_used);
#ifdef UNITTESTS
            }
#endif
            /* flow is still locked from FlowGetUsedFlow() */
            FlowUpdateCounter(tv, fls->dtv, p->proto);
            return f;
        }

        //如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,
        //且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc
        /* now see if we can alloc a new flow */
        f = FlowAlloc();
        if (f == NULL) {
#ifdef UNITTESTS
            if (tv != NULL && fls->dtv != NULL) {
#endif
                StatsIncr(tv, fls->dtv->counter_flow_memcap);
#ifdef UNITTESTS
            }
#endif
            return NULL;
        }

        /* flow is initialized but *unlocked* */
        //FlowAlloc里会初始化flow,没人用它,也没人锁它,谁用谁上锁
    } else {
        /* flow has been recycled before it went into the spare queue */

        /* flow is initialized (recylced) but *unlocked* */
        //这的意思就是,获取到flow了,这个flow回收时会被初始化FlowInit(f),
        //的确如此,回收时会这么做,也会释放锁
    }

    FLOWLOCK_WRLOCK(f);
    FlowUpdateCounter(tv, fls->dtv, p->proto);
    return f;
}

2. FlowTimeoutsEmergency 函数

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowTimeoutsEmergency

这个函数功能单一,把老化超时时间设置为紧急模式的老化时间。

void FlowTimeoutsEmergency(void)
{
    //两个参数都是全局变量,
    //flow_timeouts 是正在使用的超时时间
    //flow_timeouts_emerg 是初始化时设置的紧急超时时间

    //把老化超时时间设置为紧急模式的老化时间,设置后在流老化线程中,
    //检查超时时获取到的时间就是这个紧急模式的时间
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
}

3. FlowGetUsedFlow 函数

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowGetUsedFlow

这个函数是在FlowGetNew中获取flow时,既没有空闲flow可用,flow内存也到达了配置的内存上限的时候调用这个函数。

函数主要是从全局变量flow_hash已经使用的flow中获取一个flow,这个flow的引用计数必须为0,不检查是否超时,只要引用计数为0就拿出来。

获取flow时,选取bucket时,有个小算法,如果每次都从flow_hash前边的bucket中拿flow出来,那么前边的势必很快取完,取完后,每次就要从前往后遍历,每次遍历都是无用功,耗费时间,因为在前边的bucket中的符合条件的flow先被取走,就会依次遍历每个bucket。

所以,设置一个原子变量flow_prune_idx,用它控制每次从哪个bucket开始获取符合条件的flow,这里是每次加5,即每次取flow的bucket的索引间隔为5,如果这个bucket上没有符合条件的flow,则索引加1,检查下一个bucket,直到找到flow或者查找计数为5(全局变量固定值),如果遍历到达最后一个bucket,则再从第一个bucket开始搜索。

目的是为每个bucket保留符合条件的flow,每次取时可以较快的找到flow,不需要遍历所有bucket.

/** \internal
 *  \brief Get a flow from the hash directly.
 *
 *  Called in conditions where the spare queue is empty and memcap is reached.
 *
 *  Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
 *  is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
 *  top each time since that would clear the top of the hash leading to longer
 *  and longer search times under high pressure (observed).
 *
 *  \param tv thread vars
 *  \param dtv decode thread vars (for flow log api thread data)
 *
 *  \retval f flow or NULL
 */
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const struct timeval *ts)
{
    //这个函数对控制bucket索引的全局原子变量加上FLOW_GET_NEW_TRIES,值是5,
    //就是每次搜索的bucket索引是上次的值加5,不会每次都从开始搜索,这样可以最大概率的给每个bucket
    //保留符合条件的flow,以后再搜索这些bucket,成功概率较大,不需要遍历所有bucket.
    //这是个人理解,为什么是最大概率呢,因为不是所有bucket都有符合条件的flow,你不搜索它,它也没有,
    //搜索它,它也没有,也谈不上给那个bucket保留符合条件的flow,
    uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size;
    uint32_t tried = 0;

    while (1) {
        //查找次数其实也是查找的bucket个数,如果到达5次,则更新计数器,返回.
        if (tried++ > FLOW_GET_NEW_TRIES) {
            STATSADDUI64(counter_flow_get_used_eval, tried);
            break;
        }

        //bucket索引到最大时,从0开始
        if (++idx >= flow_config.hash_size)
            idx = 0;

        FlowBucket *fb = &flow_hash[idx];

        //next_ts超时变量设置INT_MAX时,说明bucket上没有flow在未来next_ts要超时
        //就是bucket是空的,FlowManager老化线程设置的INT_MAX
        if (SC_ATOMIC_GET(fb->next_ts) == INT_MAX)
            continue;

        if (GetUsedTryLockBucket(fb) != 0) {
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            continue;
        }

        Flow *f = fb->head;
        if (f == NULL) {
            FBLOCK_UNLOCK(fb);
            continue;
        }

        if (GetUsedTryLockFlow(f) != 0) {
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            FBLOCK_UNLOCK(fb);
            continue;
        }

        /** never prune a flow that is used by a packet or stream msg
         *  we are currently processing in one of the threads */
        if (f->use_cnt > 0) {
            //计数器大于0,说明有packe引用这个flow,不能抢别人的flow
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            FBLOCK_UNLOCK(fb);
            FLOWLOCK_UNLOCK(f);
            continue;
        }

        //这个函数判断这个flow,最近是否使用过,
        //0判断的依据就是最新时间和这个flow的时间差,小于一定秒数则最近使用过,不能拿走
        if (StillAlive(f, ts)) {
            STATSADDUI64(counter_flow_get_used_eval_reject, 1);
            FBLOCK_UNLOCK(fb);
            FLOWLOCK_UNLOCK(f);
            continue;
        }

        //好了,可喜可贺,这个flow计数为0,最近没被更新,就它了
        //把它从bucket上拿走
        /* remove from the hash */
        fb->head = f->next;
        f->next = NULL;
        f->fb = NULL;
        FBLOCK_UNLOCK(fb);

        //这flow的终止标志,被强行拿走标志FLOW_END_FLAG_FORCED,
        //flow终止时是在紧急模式,设置标志FLOW_END_FLAG_EMERGENCY
        /* rest of the flags is updated on-demand in output */
        f->flow_end_flags |= FLOW_END_FLAG_FORCED;
        if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
            f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;

        /* invoke flow log api */
#ifdef UNITTESTS
        if (dtv) {
#endif
            //输出flow日志
            if (dtv->output_flow_thread_data) {
                (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
            }
#ifdef UNITTESTS
        }
#endif
        //清除flow中的一些资源,主要是是否flow关联的tcp会话的内存即f->protoctx
        //flow结构体后的storage空间,void *数组,重新初始化这个flow
        FlowClearMemory(f, f->protomap);

        /* leave locked */

        STATSADDUI64(counter_flow_get_used_eval, tried);
        return f;
    }

    STATSADDUI64(counter_flow_get_used_failed, 1);
    return NULL;
}

标签:02,Suricata6.0,tv,bucket,flow,获取,源码,内存,NULL
来源: https://blog.csdn.net/ljq32/article/details/122383335

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有