ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

kernel网络之软中断

2021-03-15 11:02:34  阅读:406  来源: 互联网

标签:kernel softirq 中断 之软 skb poll cpu sd


从网卡收包到上送协议栈有两个模式:
一种是传统的中断模式,即收到一个数据包,执行一次中断处理函数(比如e100_rx),在此函数中分配skb,替换有数据的skb(DMA已经将数据拷贝到初始化的skb),调用netif_rx将有数据的skb放在percpu的队列上(如果开启了RPS,这个队列有可能是本地cpu的,也有可能是其他cpu的),最后激活软中断。之后的软中断处理函数net_rx_action中调用poll函数process_backlog(如果将skb放在其他cpu队列上了,还需要通过ipi激活其他cpu的软中断),处理percpu队列上的数据包,上送协议栈__netif_receive_skb。
中断模式会触发很多中断,影响性能,所以有了napi模式,这种模式下,一次中断可以poll收多个数据包(配额64)。具体的为收到一个中断,执行中断处理函数(比如ixgbe_msix_clean_rings),在此函数中只是激活软中断,并不处理skb,在之后的软中断处理函数net_rx_action中调用驱动注册的poll函数,比如ixgbe_poll,来收包,上送协议栈netif_receive_skb_internal(如果开启了RPS,就会按照non-napi的处理方式,将skb放在percpu的队列上,这个队列有可能是本地cpu的,也有可能是其他cpu的),再经过软中断处理才会将skb上送协议栈__netif_receive_skb。

下面的图片展示了这两种模式的流程,其中蓝色部分为公共流程,红色的为non-NAPI流程,绿色的为NAPI流程。


  image.png

软中断流程分为两步,首先激活软中断,然后在某个时刻执行软中断处理函数

  1. 激活软中断有以下三个地方
    a. 非网络软中断激活方式
raise_softirq
  raise_softirq_irqoff(nr);
      __raise_softirq_irqoff(unsigned int nr)
        or_softirq_pending(1UL << nr);

b. NAPI模式下激活软中断方式,一般在驱动的中断处理函数中调用

napi_schedule
  __napi_schedule(n);
    ____napi_schedule(this_cpu_ptr(&softnet_data), n);
        list_add_tail(&napi->poll_list, &sd->poll_list);
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
          or_softirq_pending(1UL << nr);

c. non-NAPI模式下激活软中断方式,在netif_rx->enqueue_to_backlog时调用

enqueue_to_backlog
  sd = &per_cpu(softnet_data, cpu);
  ____napi_schedule(sd, &sd->backlog);
    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
      or_softirq_pending(1UL << nr);
  1. 执行软中断的有以下三个地方:
    a. 硬件中断代码返回的时候
  irq_exit
    if (!in_interrupt() && local_softirq_pending())
      invoke_softirq
        __do_softirq

b. ksoftirqd内核服务线程运行的时候

__do_softirq
  invoke_softirq
    raise_softirq_irqoff
        wakeup_softirqd 
            run_ksoftirqd
                  if (local_softirq_pending()) {
                    __do_softirq

c. netif_rx_ni
netif_rx_ni 会先将做和netif_rx一样的操作后,如果有软中断激活,则执行软中断

netif_rx_ni
  if (local_softirq_pending())
    do_softirq();
      do_softirq_own_stack();
        if (local_softirq_pending()) 
          __do_softirq

软中断相关初始化

kernel启动时,软中断相关初始化
static int __init net_dev_init(void)
{
    ...
    /*
     *  Initialise the packet receive queues.
     */
    初始化percpu的结构softnet_data 
    for_each_possible_cpu(i) {
        struct softnet_data *sd = &per_cpu(softnet_data, i);

        skb_queue_head_init(&sd->input_pkt_queue);
        skb_queue_head_init(&sd->process_queue);
        INIT_LIST_HEAD(&sd->poll_list);
        sd->output_queue_tailp = &sd->output_queue;
#ifdef CONFIG_RPS
        sd->csd.func = rps_trigger_softirq; //激活其他cpu软中断
        sd->csd.info = sd;
        sd->cpu = i;
#endif
          backlog借用napi的结构,实现non-NAPI的处理。
          process_backlog就是NAPI下的poll函数
        sd->backlog.poll = process_backlog;
        sd->backlog.weight = weight_p;
    }

    ...
    注册和网络相关的两个软中断处理函数
    open_softirq(NET_TX_SOFTIRQ, net_tx_action);
    open_softirq(NET_RX_SOFTIRQ, net_rx_action);
    ...
}
支持以下软中断类型
enum
{
    HI_SOFTIRQ=0,
    TIMER_SOFTIRQ,
    NET_TX_SOFTIRQ,
    NET_RX_SOFTIRQ,
    BLOCK_SOFTIRQ,
    BLOCK_IOPOLL_SOFTIRQ,
    TASKLET_SOFTIRQ,
    SCHED_SOFTIRQ,
    HRTIMER_SOFTIRQ,
    RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

    NR_SOFTIRQS
};
注册软中断处理函数
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
    softirq_vec[nr].action = action;
}

non-NAPI处理流程

  1. 激活软中断
    网卡收到数据包后,通过中断通知cpu,cpu调用网卡驱动注册的中断处理函数,比如dm9000_interrupt,调用netif_rx将skb放入percpu队列,激活软中断。细节请看下面代码分析
static irqreturn_t dm9000_interrupt(int irq, void *dev_id)
    /* Received the coming packet */
    if (int_status & ISR_PRS)
        dm9000_rx(dev);
            //分配 skb
            skb = netdev_alloc_skb(dev, RxLen + 4)
            //将数据存入 skb
            rdptr = (u8 *) skb_put(skb, RxLen - 4);
            (db->inblk)(db->io_data, rdptr, RxLen);
            //调用netif_rx处理skb
            netif_rx(skb);

int netif_rx(struct sk_buff *skb)
{
    //static tracepoint
    trace_netif_rx_entry(skb);

    return netif_rx_internal(skb);
}

获取合适的cpu,调用 enqueue_to_backlog 将skb放入percpu的队列中
static int netif_rx_internal(struct sk_buff *skb)
{
    int ret;

    net_timestamp_check(netdev_tstamp_prequeue, skb);

    trace_netif_rx(skb);
#ifdef CONFIG_RPS
  如果内核配置选项配置了 RPS,并且使能了rps(echo f >  
 /sys/class/net/eth0/queues/rx-0/rps_cpus),则通过get_rps_cpu获取合适的cpu(有 
 可能是本地cpu也有可能是remote cpu),否则使用本地cpu
    if (static_key_false(&rps_needed)) {
        struct rps_dev_flow voidflow, *rflow = &voidflow;
        int cpu;

        preempt_disable();
        rcu_read_lock();

        cpu = get_rps_cpu(skb->dev, skb, &rflow);
        if (cpu < 0)
            cpu = smp_processor_id();

        ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

        rcu_read_unlock();
        preempt_enable();
    } else
#endif
    {
        unsigned int qtail;
        没有配置rps,则获取当地cpu
        ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
        put_cpu();
    }
    return ret;
}

将skb放在指定cpu的softnet_data->input_pkt_queue队列中,
如果是队列上第一个包还需要激活软中断
/*
 * enqueue_to_backlog is called to queue an skb to a per CPU backlog
 * queue (may be a remote CPU queue).
 */
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
                  unsigned int *qtail)
{
    struct softnet_data *sd;
    unsigned long flags;
    unsigned int qlen;
    获取percpu的sd
    sd = &per_cpu(softnet_data, cpu);

    local_irq_save(flags);

    rps_lock(sd);
    if (!netif_running(skb->dev))
        goto drop;
    如果队列中skb个数小于netdev_max_backlog(默认值1000,可以通过sysctl修改netdev_max_backlog值),
    并且 skb_flow_limit (为了防止large flow占用太多cpu,small flow得不到处理。代码实现没看明白)返回false,则skb可以继续入队,否则drop skb
    qlen = skb_queue_len(&sd->input_pkt_queue);
    if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
        如果队列不为空,则直接入队,否则先激活软中断,再入队
        if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
            __skb_queue_tail(&sd->input_pkt_queue, skb);
            input_queue_tail_incr_save(sd, qtail);
            rps_unlock(sd);
            local_irq_restore(flags);
            return NET_RX_SUCCESS;
        }

        /* Schedule NAPI for backlog device
         * We can use non atomic operation since we own the queue lock
         */
        队列为空时,即skb是第一个入队元素,则将state设置为 NAPI_STATE_SCHED(软中断处理函数rx_net_action会检查此标志),表示软中断可以处理此backlog
        if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
              if返回0的情况下,需要将sd->backlog挂到sd->poll_list上,并激活软中断。
            rps_ipi_queued看下面的分析
            if (!rps_ipi_queued(sd))
                ____napi_schedule(sd, &sd->backlog);
        }
        goto enqueue;
    }

drop:
    sd->dropped++;
    rps_unlock(sd);

    local_irq_restore(flags);

    atomic_long_inc(&skb->dev->rx_dropped);
    kfree_skb(skb);
    return NET_RX_DROP;
}

/*
 * Check if this softnet_data structure is another cpu one
 * If yes, queue it to our IPI list and return 1
 * If no, return 0
 */ 
上面注释说的很清楚,在配置RPS情况下,检查sd是当前cpu的还是其他cpu的,
如果是其他cpu的,将sd放在当前cpu的mysd->rps_ipi_list上,并激活当前cpu的软中断,返回1. 在软中断处理函数net_rx_action中,通过ipi中断通知其他cpu来处理放在其他cpu队列上的skb
如果是当前cpu,或者没有配置RPS,则返回0,在外层函数激活软中断,
并将当前cpu的backlog放入sd->poll_list上,
static int rps_ipi_queued(struct softnet_data *sd)
{
#ifdef CONFIG_RPS
    struct softnet_data *mysd = this_cpu_ptr(&softnet_data);

    if (sd != mysd) {
        sd->rps_ipi_next = mysd->rps_ipi_list;
        mysd->rps_ipi_list = sd;

        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
        return 1;
    }
#endif /* CONFIG_RPS */
    return 0;
}
  1. 执行软中断
    __do_softirq 执行当前cpu上所有软中断
asmlinkage __visible void __do_softirq(void)
{
    MAX_SOFTIRQ_TIME为2ms,如果一直有软中断可以执行2ms
    unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
    unsigned long old_flags = current->flags;
    MAX_SOFTIRQ_RESTART为10,表示可以循环执行10此软中断
    int max_restart = MAX_SOFTIRQ_RESTART;
    struct softirq_action *h;
    bool in_hardirq;
    __u32 pending;
    int softirq_bit;

    /*
     * Mask out PF_MEMALLOC s current task context is borrowed for the
     * softirq. A softirq handled such as network RX might set PF_MEMALLOC
     * again if the socket is related to swap
     */
    current->flags &= ~PF_MEMALLOC;
    取出当前cpu上所有的软中断
    pending = local_softirq_pending();
    account_irq_enter_time(current);

    __local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);
    in_hardirq = lockdep_softirq_start();

restart:
    /* Reset the pending bitmask before enabling irqs */
    清空当前cpu上所有的软中断
    set_softirq_pending(0);
    执行软中断时打开硬件中断
    local_irq_enable();

    h = softirq_vec;
    遍历执行软中断
    while ((softirq_bit = ffs(pending))) {
        unsigned int vec_nr;
        int prev_count;

        h += softirq_bit - 1;

        vec_nr = h - softirq_vec;
        prev_count = preempt_count();

        kstat_incr_softirqs_this_cpu(vec_nr);

        trace_softirq_entry(vec_nr);
        软中断处理函数,比如 net_rx_action
        h->action(h);
        trace_softirq_exit(vec_nr);
        if (unlikely(prev_count != preempt_count())) {
            pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
                   vec_nr, softirq_to_name[vec_nr], h->action,
                   prev_count, preempt_count());
            preempt_count_set(prev_count);
        }
        h++;
        pending >>= softirq_bit;
    }

    rcu_bh_qs();
    执行完软中断,关闭硬中断
    local_irq_disable();
    检查执行软中断过程中(开启硬中断)是否有新的软中断被激活
    pending = local_softirq_pending();
    if (pending) {
        如果有新的软中断被激活,并且执行软中断时间不足2ms,
        并且重新执行次数不足10次,则可以再次执行软中断。
        if (time_before(jiffies, end) && !need_resched() &&
            --max_restart)
            goto restart;
        否则只能唤醒软中断处理线程继续处理软中断
        wakeup_softirqd();
    }

    lockdep_softirq_end(in_hardirq);
    account_irq_exit_time(current);
    __local_bh_enable(SOFTIRQ_OFFSET);
    WARN_ON_ONCE(in_interrupt());
    tsk_restore_flags(current, old_flags, PF_MEMALLOC);
}

网络收包软中断处理函数

static void net_rx_action(struct softirq_action *h)
{
    获取percpu的sd
    struct softnet_data *sd = this_cpu_ptr(&softnet_data);
    unsigned long time_limit = jiffies + 2;

    netdev_budget默认值300,可通过sysctl修改
    int budget = netdev_budget;
    void *have;

    local_irq_disable();
    如果sd->poll_list不为空,说明有数据需要处理
    while (!list_empty(&sd->poll_list)) {
        struct napi_struct *n;
        int work, weight;

        /* If softirq window is exhuasted then punt.
         * Allow this to run for 2 jiffies since which will allow
         * an average latency of 1.5/HZ.
         */
        如果budget用完了,或者经过了两个时间片,说明数据包压力过大,还没处理
        完就需要跳出循环,在softnet_break会再次激活软中断(因为执行软中断时已
        经把所有的pending清空了)
        if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
            goto softnet_break;

        local_irq_enable();

        /* Even though interrupts have been re-enabled, this
         * access is safe because interrupts can only add new
         * entries to the tail of this list, and only ->poll()
         * calls can remove this head entry from the list.
         */
        n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

        have = netpoll_poll_lock(n);

        weight = n->weight;

        /* This NAPI_STATE_SCHED test is for avoiding a race
         * with netpoll's poll_napi().  Only the entity which
         * obtains the lock and sees NAPI_STATE_SCHED set will
         * actually make the ->poll() call.  Therefore we avoid
         * accidentally calling ->poll() when NAPI is not scheduled.
         */
        work = 0;
        只有state为NAPI_STATE_SCHED才会执行poll函数。
        对于non-napi来说,poll函数为process_backlog,处理percpu的input queue上的数据包。
        对于napi来说,poll函数为网卡驱动提供的poll函数,比如ixgbe_poll,分配skb,将skb上送协议栈
        如果poll处理后的结果work小于weight说明没有更多数据需要处理,poll函数中会把napi从链表sd->poll_list删除。
        如果work等于weight说明还有更多数据需要处理,不会删除napi,只是将napi移动到链表尾部
        if (test_bit(NAPI_STATE_SCHED, &n->state)) {
            work = n->poll(n, weight);
            trace_napi_poll(n);
        }

        WARN_ON_ONCE(work > weight);
            work为poll实际处理的数据个数,budget需要减去work
        budget -= work;

        local_irq_disable();

        /* Drivers must not modify the NAPI state if they
         * consume the entire weight.  In such cases this code
         * still "owns" the NAPI instance and therefore can
         * move the instance around on the list at-will.
         */
         如果work等于weight说明还有更多数据需要处理
        if (unlikely(work == weight)) {
            if (unlikely(napi_disable_pending(n))) {
                local_irq_enable();
                napi_complete(n);
                local_irq_disable();
            } else {
                if (n->gro_list) {
                    /* flush too old packets
                     * If HZ < 1000, flush all packets.
                     */
                    local_irq_enable();
                    napi_gro_flush(n, HZ >= 1000);
                    local_irq_disable();
                }
                        将napi移动到链表尾部
                list_move_tail(&n->poll_list, &sd->poll_list);
            }
        }

        netpoll_poll_unlock(have);
    }
out:
    net_rps_action_and_irq_enable(sd);

    return;

softnet_break:
    sd->time_squeeze++;
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
    goto out;
}

/*
 * net_rps_action_and_irq_enable sends any pending IPI's for rps.
 * Note: called with local irq disabled, but exits with local irq enabled.
 */
如果链表 sd->rps_ipi_list不为空,说明在rps下,将skb放在其他cpu上的percpu队列
上了,所以需要通过ipi中断通知其他cpu,通过smp_call_function_single_async远
程激活其他cpu的软中断,使其他cpu处理数据包
static void net_rps_action_and_irq_enable(struct softnet_data *sd)
{
#ifdef CONFIG_RPS
    struct softnet_data *remsd = sd->rps_ipi_list;

    if (remsd) {
        sd->rps_ipi_list = NULL;

        local_irq_enable();

        /* Send pending IPI's to kick RPS processing on remote cpus. */
        while (remsd) {
            struct softnet_data *next = remsd->rps_ipi_next;

            if (cpu_online(remsd->cpu))
                smp_call_function_single_async(remsd->cpu,
                               &remsd->csd);
            remsd = next;
        }
    } else
#endif
        local_irq_enable();
}

non-napi下的poll函数为 process_backlog

static int process_backlog(struct napi_struct *napi, int quota)
{
    int work = 0;
    struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

#ifdef CONFIG_RPS
    /* Check if we have pending ipi, its better to send them now,
     * not waiting net_rx_action() end.
     */
    激活其他cpu上的软中断
    if (sd->rps_ipi_list) {
        local_irq_disable();
        net_rps_action_and_irq_enable(sd);
    }
#endif
    napi->weight = weight_p;
    local_irq_disable();
    while (1) {
        struct sk_buff *skb;

        while ((skb = __skb_dequeue(&sd->process_queue))) {
            rcu_read_lock();
            local_irq_enable();
            将skb上送协议栈
            __netif_receive_skb(skb);
            rcu_read_unlock();
            local_irq_disable();
            input_queue_head_incr(sd);
            处理skb的个数达到quota了,说明还有更多数据包需要处理
            if (++work >= quota) {
                local_irq_enable();
                return work;
            }
        }

        rps_lock(sd);
        if (skb_queue_empty(&sd->input_pkt_queue)) {
            /*
             * Inline a custom version of __napi_complete().
             * only current cpu owns and manipulates this napi,
             * and NAPI_STATE_SCHED is the only possible flag set
             * on backlog.
             * We can use a plain write instead of clear_bit(),
             * and we dont need an smp_mb() memory barrier.
             */
            如果input_pkt_queue队列为空,将napi从链表poll_list删除
            list_del(&napi->poll_list);
            napi->state = 0;
            rps_unlock(sd);

            break;
        }
       将input_pkt_queue队列中的skb挂到process_queue上,并清空input_pkt_queue
        skb_queue_splice_tail_init(&sd->input_pkt_queue,
                       &sd->process_queue);
        rps_unlock(sd);
    }
    local_irq_enable();

    return work;
}

NAPI

1.激活软中断

硬件中断到来时调用中断处理函数 ixgbe_msix_clean_rings
ixgbe_msix_clean_rings
    napi_schedule(&q_vector->napi);
____napi_schedule(this_cpu_ptr(&softnet_data), n);
            //将napi添加到per cpu的softnet_data->poll_list中
            list_add_tail(&napi->poll_list, &sd->poll_list);
           //将接收软中断置位
            __raise_softirq_irqoff(NET_RX_SOFTIRQ);

2.执行软中断

__do_softirq
    net_rx_action
        n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
        work = n->poll(n, weight); //即调用 ixgbe_poll
            ixgbe_clean_rx_irq(q_vector, ring)
                skb = ixgbe_fetch_rx_buffer(rx_ring, rx_desc);
                ixgbe_rx_skb(q_vector, skb);
                    napi_gro_receive(&q_vector->napi, skb);
                        //上送协议栈,但如果开启了RPS就走non-NAPI的路径了
                        netif_receive_skb_internal
            /* all work done, exit the polling mode */
            //如果处理的skb小于配额,说明工作已经完成,将napi从poll_list删除
           //清除标志位 NAPI_STATE_SCHED
            napi_complete(napi);
                list_del(&n->poll_list);
                clear_bit(NAPI_STATE_SCHED, &n->state);

如果没有开启RPS,则直接调用__netif_receive_skb上送协议栈了。
如果开启了RPS,则调用get_rps_cpu获取合适的cpu(有可能是本地cpu,也有可能是其他cpu),再调用enqueue_to_backlog将skb放在percpu的队列中,激活相应cpu的软中断

static int netif_receive_skb_internal(struct sk_buff *skb)
{
    int ret;

    net_timestamp_check(netdev_tstamp_prequeue, skb);

    if (skb_defer_rx_timestamp(skb))
        return NET_RX_SUCCESS;

    rcu_read_lock();

#ifdef CONFIG_RPS
    注意使用的是static_key_false进行判断,意思是分支预测为false概率很大
    if (static_key_false(&rps_needed)) {
        struct rps_dev_flow voidflow, *rflow = &voidflow;
        int cpu = get_rps_cpu(skb->dev, skb, &rflow);

        if (cpu >= 0) {
            ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
            rcu_read_unlock();
            return ret;
        }
    }
#endif
    ret = __netif_receive_skb(skb);
    rcu_read_unlock();
    return ret;
}

参考

https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/



 

标签:kernel,softirq,中断,之软,skb,poll,cpu,sd
来源: https://www.cnblogs.com/dream397/p/14536368.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有