ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

无锁队列理解

2021-04-05 20:57:09  阅读:208  来源: 互联网

标签:无锁 end 队列 chunk back pos queue 理解 return


        由于普通锁的粒度比较大,以至于在并发量高的环境下,锁对于并发性能影响很大,本文章对无锁队列做探索,该无锁队列目前只支持单读单写,上干货

         该队列由链表组成,每个节点有N个泛型T组成,该队列实现对T类型元素单读单写的无锁操作,可以方便的用在单生产者消费者模型中

        队列的主要元素如下:

        yqueue_t的实现,每次批量分配⼀批元素,减少内存的分配和释放(解决不断动态内存分配的问题)。
        yqueue_t内部由⼀个⼀个chunk组成,每个chunk保存N个元素

 

        当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
        在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
       yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个 元素在当前chunk中的位置。                back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元
      素在当前chunk的位置。
      end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位
置。
      这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:
      back_chunk/back_pos:对应的是元素存储位置;
     end_chunk/end_pos:决定是否要分配chunk或者回收chunk

 

//Back position may point to invalid memory if the queue is empty,
//while begin & end positions are always valid. Begin position is
//accessed exclusively be queue reader (front/pop), while back and
//end positions are accessed exclusively by queue writer (back/push).
chunk_t *begin_chunk;   // 链表头结点
int begin_pos;          // 起始点
chunk_t *back_chunk;    // 队列中最后一个元素所在的链表结点
int back_pos;           // 尾部
chunk_t *end_chunk;     // 拿来扩容的,总是指向链表的最后一个结点
int end_pos;
// 链表结点称之为chunk_t
struct chunk_t  
{
    T values[N];    //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
    chunk_t *prev;
    chunk_t *next;
};


inline yqueue_t()
{
    begin_chunk = (chunk_t *)malloc(sizeof(chunk_t));
    alloc_assert(begin_chunk);
    begin_pos = 0;
    back_chunk = NULL;   //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
    back_pos = 0;
    end_chunk = begin_chunk;    //end_chunk总是指向链表的最后一个chunk
    end_pos = 0;
}
        队列操作函数
      
inline T &front()   // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
    return begin_chunk->values[begin_pos];
}

//  Returns reference to the back element of the queue.
//  If the queue is empty, behaviour is undefined.
inline T &back()     // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
    return back_chunk->values[back_pos];
}

//  Adds an element to the back end of the queue.
inline void push()
{
    back_chunk = end_chunk;
    back_pos = end_pos;             // 

    if (++end_pos != N) //end_pos==N表明这个链表结点已经满了
        return;

    chunk_t *sc = spare_chunk.xchg(NULL);       // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
    if (sc)     // 如果有spare chunk则继续复用它
    {
        end_chunk->next = sc;
        sc->prev = end_chunk;
    }
    else        // 没有则重新分配
    {
        end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t));   // 分配一个chunk
        alloc_assert(end_chunk->next);
        end_chunk->next->prev = end_chunk;
    }
    end_chunk = end_chunk->next;
    end_pos = 0;
}

    //  Removes element from the back end of the queue. In other words
    //  it rollbacks last push to the queue. Take care: Caller is
    //  responsible for destroying the object being unpushed.
    //  The caller must also guarantee that the queue isn't empty when
    //  unpush is called. It cannot be done automatically as the read
    //  side of the queue can be managed by different, completely
    //  unsynchronised thread.
    // 必须要保证队列不为空,参考ypipe_t的uwrite
    inline void unpush()
    {
        //  First, move 'back' one position backwards.
        if (back_pos)           // 从尾部删除元素
            --back_pos;
        else
        {
            back_pos = N - 1;       // 回退到前一个chunk
            back_chunk = back_chunk->prev;
        }

        //  Now, move 'end' position backwards. Note that obsolete end chunk
        //  is not used as a spare chunk. The analysis shows that doing so
        //  would require free and atomic operation per chunk deallocated
        //  instead of a simple free.
        if (end_pos)            // 意味着当前的chunk还有其他元素占有
            --end_pos;
        else
        {
            end_pos = N - 1;    // 当前chunk没有元素占用,则需要将整个chunk释放
            end_chunk = end_chunk->prev;
            free(end_chunk->next);
            end_chunk->next = NULL;
        }
    }

    //  Removes an element from the front end of the queue.
    inline void pop()
    {
        if (++begin_pos == N)       // 删除满一个chunk才回收chunk
        {
            chunk_t *o = begin_chunk;
            begin_chunk = begin_chunk->next;
            begin_chunk->prev = NULL;
            begin_pos = 0;

            //  'o' has been more recently used than spare_chunk,
            //  so for cache reasons we'll get rid of the spare and
            //  use 'o' as the spare.
            chunk_t *cs = spare_chunk.xchg(o);  //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
            free(cs);
        }
    }
这⾥的front()或者back()函数,需要注意的返回的是左值引⽤,我们可以修改其值。
对于先进后出队列⽽⾔:
begin_chunk->values[begin_pos]代表队列头可读元素, 读取队列头元素即是读取begin_pos位置
的元素;
back_chunk->values[back_pos]代表队列尾可写元素,写⼊元素时则是更新back_pos位置的元素,
要确保元素真正⽣效,还需要调⽤push函数更新back_pos的位置,避免下次更新的时候⼜是更新当前
back_pos位置对应的元素。

下面为无锁同步的重要操作方法

 // 设置新值,返回旧值
        inline T *xchg (T *val_)    //原子操
        {
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
            return (T*) atomic_swap_ptr (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
            return (T*) arch_atomic_exchange (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
            T *old;
            __asm__ volatile (
                "lock; xchg %0, %2"
                : "=r" (old), "=m" (ptr)
                : "m" (ptr), "0" (val_));
            return old;
#elif defined ZMQ_ATOMIC_PTR_MUTEX
            sync.lock ();
            T *old = (T*) ptr;
            ptr = val_;
            sync.unlock ();
            return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
        }

        //  Perform atomic 'compare and swap' operation on the pointer.
        //  The pointer is compared to 'cmp' argument and if they are
        //  equal, its value is set to 'val'. Old value of the pointer
        //  is returned.
        // 原来的值(ptr指向)如果和 comp_的值相同则更新为val_,并返回原来的ptr
        inline T *cas (T *cmp_, T *val_)//原子操作
        {
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
            return (T*) atomic_cas_ptr (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
            return (T*) arch_atomic_val_compare_and_exchange (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
            T *old;
            __asm__ volatile (
                "lock; cmpxchg %2, %3"
                : "=a" (old), "=m" (ptr)
                : "r" (val_), "m" (ptr), "0" (cmp_)
                : "cc");
            return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
        }


 

下面讲一下无锁队列的读写操作方法

读写队列操作的重要元素如下:

 yqueue_t<T, N> queue;

 //  Points to the first un-flushed item. This variable is used
 //  exclusively by writer thread.
 T *w;//指向第一个未刷新的元素,只被写线程使用

 //  Points to the first un-prefetched item. This variable is used
 //  exclusively by reader thread.
 T *r;//指向第一个还没预提取的元素,只被读线程使用

 //  Points to the first item to be flushed in the future.
 T *f;//指向下一轮要被刷新的一批元素中的第一个

//  The single point of contention between writer and reader thread.
//  Points past the last flushed item. If it is NULL,
//  reader is asleep. This pointer should be always accessed using
//  atomic operations.
atomic_ptr_t<T> c;//读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)

无锁队列重要的操作方法

//  Write an item to the pipe.  Don't flush it yet. If incomplete is
    //  set to true the item is assumed to be continued by items
    //  subsequently written to the pipe. Incomplete items are neverflushed down the stream.
    // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
    inline void write(const T &value_, bool incomplete_)
    {
        //  Place the value to the queue, add new terminator element.
        queue.back() = value_;
        queue.push();

        //  Move the "flush up to here" poiter.
        if (!incomplete_)
            f = &queue.back();      // 记录要刷新的位置
    }

#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif

    //  Pop an incomplete item from the pipe. Returns true is such
    //  item exists, false otherwise.
    inline bool unwrite(T *value_)
    {
        if (f == &queue.back())
            return false;
        queue.unpush();
        *value_ = queue.back();
        return true;
    }

    //  Flush all the completed items into the pipe. Returns false if
    //  the reader thread is sleeping. In that case, caller is obliged to
    //  wake the reader up before using the pipe again.
    // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
    inline bool flush()
    {
        //  If there are no un-flushed items, do nothing.
        if (w == f)     // 不需要刷新,即是还没有新元素加入
            return true;

        //  Try to set 'c' to 'f'.
        if (c.cas(w, f) != w)   // 尝试将c设置为f,即是准备更新w的位置
        {

            //  Compare-and-swap was unseccessful because 'c' is NULL.
            //  This means that the reader is asleep. Therefore we don't
            //  care about thread-safeness and update c in non-atomic
            //  manner. We'll return false to let the caller know
            //  that reader is sleeping.
            c.set(f);
            w = f;
            return false;       // 线程看到flush返回false之后会发送一个消息给读线程,这个是需要写业务去做处理
        }

        //  Reader is alive. Nothing special to do now. Just move
        //  the 'first un-flushed item' pointer to 'f'.
        w = f;
        return true;
    }

    //  Check whether item is available for reading.
    // 这里面有两个点,一个是检查是否有数据可读,一个是预取
    inline bool check_read()
    {
        //  Was the value prefetched already? If so, return.
        if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
            return true;

        //  There's no prefetched value, so let us prefetch more values.
        //  Prefetching is to simply retrieve the
        //  pointer from c in atomic fashion. If there are no
        //  items to prefetch, set c to NULL (using compare-and-swap).
        r = c.cas(&queue.front(), NULL);//尝试预取数据

        //  If there are no elements prefetched, exit.
        //  During pipe's lifetime r should never be NULL, however,
        //  it can happen during pipe shutdown when items
        //  are being deallocated.
        if (&queue.front() == r || !r)//判断是否成功预取数据
            return false;

        //  There was at least one value prefetched.
        return true;
    }

    //  Reads an item from the pipe. Returns false if there is no value.
    //  available.
    inline bool read(T *value_)
    {
        //  Try to prefetch a value.
        if (!check_read())
            return false;

        //  There was at least one value prefetched.
        //  Return it to the caller.
        *value_ = queue.front();
        queue.pop();
        return true;
    }

    //  Applies the function fn to the first elemenent in the pipe
    //  and returns the value returned by the fn.
    //  The pipe mustn't be empty or the function crashes.
    inline bool probe(bool (*fn)(T &))
    {
        bool rc = check_read();
        // zmq_assert(rc);

        return (*fn)(queue.front());
    }

标签:无锁,end,队列,chunk,back,pos,queue,理解,return
来源: https://blog.csdn.net/lyt_dawang/article/details/115446749

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有