ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

无锁队列MpscQueue源码分析

2021-06-04 09:03:43  阅读:195  来源: 互联网

标签:扩容 无锁 队列 元素 long 源码 线程 数组 MpscQueue


前言

之前的文章在分析NioEventLoop源码的时候,有提到过Netty没有用JDK提供的阻塞队列,而是使用了高性能无锁队列MpscQueue。因为篇幅原因,那篇文章并没有详细介绍MpscQueue,今天,它来啦!!!

在Netty较早的版本中,使用的是自己实现的任务队列,后来全部替换为JCTools工具的无锁化队列了,为啥呢?没别的,因为它的效率实在是太高了。

何为Mpsc???
JCTools提供了很多队列,大家需要针对不同的应用场景选择合适的队列,避免发生潜在的问题。
这里解释一下「MSPC」的含义,如下:

  • M:Multiple,多个的。
  • S:Single,单个的。
  • P:Producer,生产者。
  • C:Consumer,消费者。

因此MpscQueue其实就是指:适用于多生产者,单消费者的高性能无锁队列!

之前的文章有说过,NioEventLoop是个单线程的线程池,提交到EventLoop的任务会被线程串行化执行。因此EventLoop的任务队列的生产消费模型是:多生产者,单消费者。

所以本篇文章只会重点分析MpscQueue,其他队列大家自行研究。

MpscQueue源码分析

MpscQueue不是Netty提供的,因此在Netty项目里是看不到它的源码的,为了阅读方便,还是建议大家去单独拉JCTools的源码,地址:https://github.com/JCTools/JCTools

Netty默认使用的队列是org.jctools.queues.MpscUnboundedArrayQueue,这里只分析它。
MpscUnboundedArrayQueue是一个适用于「多生产者单消费者」的无界队列,这意味着它没有容量限制,你可以不断的往里面提交任务,即便没有消费者消费数据。

MpscUnboundedArrayQueue类的继承关系比较多,类图比较复杂,但是没关系,我们不用分析所有代码,只重点关注它的直接父类BaseMpscLinkedArrayQueue即可,核心逻辑都在这里了,看懂BaseMpscLinkedArrayQueue基本就知道它大概的一个实现思想了。
在这里插入图片描述

这里说明一下,MpscQueue的代码中存在大量类似如下代码:

byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b

读者不用过分关注,忽略它即可,这些属性没有什么特别的用处,就是做字节填充用的,这涉及到CPU硬件缓存Cache Line,这里简单说下吧。

计算机除了有Memory主存,CPU每个核心还有自己单独的缓存,这些缓存按等级分为一级缓存、二级缓存等等,距离CPU核心越近的缓存效率越高。为啥CPU要有缓存?就是因为CPU的计算速度实在是太快了,相比之下内存的读写速度实在是太慢了,为了填补二者速度上的鸿沟,CPU被加入了多级缓存,CPU会将主存中的数据进行缓存,然后进行运算,运算完成后会在未来的某个时刻写入到主存。

CPU缓存数据的最小单位是Cache Line,在大多数CPU上,它的大小是64字节。只要Cache Line中的任一数据失效,整个Cache Line就会被认为是失效的,需要从主存中重新加载。

因此,如果Java对象/属性被缓存到同一个Cache Line上了,那就有可能因为其他线程修改了这一块的某个数据,导致所有线程的Cache Line全部失效,进而导致所有线程重新从主存中load,这会导致不必要的开销。

综上所述,MpscQueue做了优化,将可能会被频繁读写的数据,分配到不同的Cache Line,避免相互影响。

言归正传,回到MpscUnboundedArrayQueue,先说一下它的一个实现思路吧。

MpscUnboundedArrayQueue基本的数据结构由「数组+链表」组成,它有两个指针:producerBuffer和consumerBuffer,分别指向生产者生产和消费者消费对应的数组。它还有两个索引指针:producerIndex和consumerIndex,分别代表生产者生产和消费者消费的索引,这两个索引会以2为步长不断递增。
另外对于生产者,它还有一个producerLimit指针,它代表生产者生产消息的上限,达到该上限,Queue就要扩容了,扩容的方式是创建一个长度一样的新数组,然后旧数组的最后一个元素指向新数组,形成单向链表。

笔者画了一个简图,描述了MpscQueue的数据结构变化:
在这里插入图片描述

整体流程说的差不多了,下面开始分析源码,先看几个比较重要的属性:

// 生产者索引
private volatile long producerIndex;
// 元素生产的限制,当producerIndex == producerLimit,代表队列需要扩容
private volatile long producerLimit;
protected long producerMask;
// 当前生产者指向的数组
protected E[] producerBuffer;

// 消费者索引
private volatile long consumerIndex;
protected long consumerMask;
// 当前消费者指向的数组
protected E[] consumerBuffer;

// 数组被生产者填满后,会填充一个JUMP,代表队列扩容了,消费者遇到JUMP会消费下一个数组。
private static final Object JUMP = new Object();

// 消费者消费完一个完整的数组后,会将最后一个元素设为BUFFER_CONSUMED。
private static final Object BUFFER_CONSUMED = new Object();

再看构造函数,需要给定一个chunkSize,指定块大小,MpscQueue由一系列数组构成,chunkSize就是数组的大小,它必须是一个2的幂次方数。

public MpscUnboundedArrayQueue(int chunkSize) {
    super(chunkSize);
}

在父类的构造函数中,计算了mask,初始化了一个数组,并将producerBuffer和consumerBuffer都指向了同一个数组,然后根据mask设置producerLimit。

假设initialCapacity为8,数组的长度就是9,因为最后一个元素会用来存放扩容数组的地址,形成链表。每个数组还会预留一个槽位存放JUMP元素,代表队列扩容了,消费者遇到JUMP元素就会通过最后一个元素找到扩容后的数组继续消费,因此一个数组最多保留7个元素。

/**
 * 初始化
 * @param initialCapacity 数组容量,要求是2的幂次方数
 */
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
	// initialCapacity必须大于等于2
	RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");

	// 容量确保是2的幂次方数,找到initialCapacity下一个2的幂次方数
	int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);

	// index以2为步长递增,预留一个元素存JUMP,所以limit为:(capacity-1)*2
	long mask = (p2capacity - 1) << 1;
	// need extra element to point at next array
	// 需要一个额外元素来链接下一个数组
	E[] buffer = allocateRefArray(p2capacity + 1);
	// 生产者和消费者Buffer指向同一个数组
	producerBuffer = buffer;
	producerMask = mask;
	consumerBuffer = buffer;
	consumerMask = mask;
	// 设置producerLimit = mask
	soProducerLimit(mask);
}

Queue初始化完成后,就是不断的生产数据和消费数据了,所以接下来重点分析offer()poll()方法。

offer()分析

offer(e)会将元素e添加到队列中,即生产数据。在MpscQueue中,队列是不能存放空数据的,所以首先会检查非空。然后线程通过CAS的方式以步长为2递增producerIndex,CAS会保证只有一个线程操作成功,CAS成功就代表线程抢到了数组中的槽位,它可以将元素e添加到数组的指定槽位。CAS失败代表并发失败了,会自旋重试。

上面说的是producerIndex还没有达到producerLimit的情况,如果达到producerLimit,代表生产达到上限,队列可能需要扩容了。offerSlowPath()方法会判断队列是否需要扩容,如果需要扩容,也只会交给一个线程去扩容,这里又是一个CAS操作,线程以1为步长递增producerIndex,只有CAS成功的线程才会去执行扩容逻辑。

因此,在offer(e)的逻辑中,还会判断producerIndex是否是奇数,如果为奇数就代表队列正在扩容。因为MpscQueue的扩容非常快速,它不需要迁移元素,只需要创建一个新数组,再和旧数组建立连接就可以了,所以没有必要让其他线程挂起,线程发现队列在扩容时,会进行自旋重试,直到扩容完成。

/**
 * 向队列中添加一个元素e,生产数据
 * @param e
 * @return
 */
@Override
public boolean offer(final E e) {
	if (null == e) { // 非空校验
		throw new NullPointerException();
	}

	long mask;
	E[] buffer;//生产者指向的数组
	long pIndex;//生产索引

	while (true) {
		long producerLimit = lvProducerLimit();
        // 获取生产者索引
		pIndex = lvProducerIndex();
		// 生产索引以2为步长递增,一般不会是奇数,在offerSlowPath()中扩容线程会将其设为奇数
		if ((pIndex & 1) == 1) {
			// 奇数代表正在扩容,自旋,等待扩容完成
			continue;
		}

		mask = this.producerMask;
		buffer = this.producerBuffer;
		// 生产索引达到producerLimit,代表可能需要扩容。
		if (producerLimit <= pIndex) {
			int result = offerSlowPath(mask, pIndex, producerLimit);
			switch (result) {
				case CONTINUE_TO_P_INDEX_CAS:
					//producerLimit虽然达到了limit,但是当前数组已经被消费了部分数据,暂时不会扩容,会使用已被消费的槽位。
					break;
				case RETRY://CAS失败,重试
					continue;
				case QUEUE_FULL://队列满,offer失败
					return false;
				case QUEUE_RESIZE://需要扩容
					resize(mask, buffer, pIndex, e, null);
					return true;
			}
		}

		if (casProducerIndex(pIndex, pIndex + 2)) {
			// CAS递增producerIndex成功,抢到槽位,跳出自旋
			break;
		}
	}
	final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
	// 将buffer数组的指定位置替换为e,不是根据下标来设置的,是根据槽位的地址偏移量offset,UNSAFE操作。
	soRefElement(buffer, offset, e); // release element e
	return true;
}

offerSlowPath()会告诉线程队列是满了,还是需要扩容,还是需要自旋重试。虽然producerIndex达到了producerLimit,但不代表队列就非得扩容,如果消费者已经消费了部分生产者指向的数组元素,就意味这当前数组还是有槽位可以继续用的,暂时不用扩容。

/**
 * @param mask
 * @param pIndex 生产者索引
 * @param producerLimit 生产者limit
 * @return
 */
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
	// 消费者索引
	final long cIndex = lvConsumerIndex();
	// 数组缓冲的容量,(长度-1) * 2
	long bufferCapacity = getCurrentBufferCapacity(mask);

	// 消费索引+当前数组的容量 > 生产索引,代表当前数组已有部分元素被消费了,不会扩容,会使用已被消费的槽位。
	if (cIndex + bufferCapacity > pIndex) {
		if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
			// CAS失败,自旋重试
			return RETRY;
		} else {
			// 重试CAS修改 生产索引
			return CONTINUE_TO_P_INDEX_CAS;
		}
	}
	// 根据生产者和消费者索引判断Queue是否已满,无解队列永不会满
	else if (availableInQueue(pIndex, cIndex) <= 0) {
		return QUEUE_FULL;
	}
	// grab index for resize -> set lower bit
	// CAS的方式将producerIndex加1,奇数代表正在resize
	else if (casProducerIndex(pIndex, pIndex + 1)) {
		return QUEUE_RESIZE;
	} else {
		// resize失败,重试
		return RETRY;
	}
}

如果需要扩容,线程会CAS操作将producerIndex改为奇数,让其它线程能感知到队列正在扩容,要生产数据的线程先自旋,等待扩容完成再继续操作。

resize()是扩容的核心方法,它首先会创建一个相同长度的新数组,将producerBuffer指向新数组,然后将元素e放到新数组中,旧元素的最后一个元素指向新数组,形成链表。还会将旧元素的槽位填充JUMP元素,代表队列扩容了。

// 扩容:新建一个E[],将oldBuffer和newBuffer建立连接。
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
	assert (e != null && s == null) || (e == null || s != null);
	// 下一个Buffer的长度,MpscQueue会构建一个相同长度的Buffer
	int newBufferLength = getNextBufferSize(oldBuffer);
	final E[] newBuffer;
	try {
		// 创建一个新的E[]
		newBuffer = allocateRefArray(newBufferLength);
	} catch (OutOfMemoryError oom) {
		assert lvProducerIndex() == pIndex + 1;
		soProducerIndex(pIndex);
		throw oom;
	}

	// 生产者Buffer指向新的E[]
	producerBuffer = newBuffer;
	// 计算新的Mask,Buffer长度不变的情况下,Mask也不变
	final int newMask = (newBufferLength - 2) << 1;
	producerMask = newMask;

	// 根据该偏移量设置oldBuffer的JUMP元素,会递增然后重置,不断循环
	final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
	// Mask不变的情况下,oldBuffer的JUMP对应的位置,就是newBuffer中要消费的位置.
	final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

	// 元素e放到新数组中
	soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
	// 旧数组和新数组建立连接,旧数组的最后一个元素保存新数组的地址。
	soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);

	// 消费者索引
	final long cIndex = lvConsumerIndex();
	// 根据消费者和生产者索引,校验Queue是否已满。对于无界队列,返回Integer.MAX_VALUE,永远都不会满。
	final long availableInQueue = availableInQueue(pIndex, cIndex);
	RangeUtil.checkPositive(availableInQueue, "availableInQueue");

	// 设置新的producerLimit
	soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

	/*
	扩容的时候会将producerIndex设为pIndex+1,奇数代表正在扩容,非扩容线程会自旋重试,等待扩容完成。
	现在元素已经放入队列,将producerIndex设为pIndex+2,让其他线程知道扩容完成。
	 */
	soProducerIndex(pIndex + 2);

	/*
	将旧数组的指定位置设为JUMP,消费者遇到JUMP就知道队列扩容了,会寻找next连接的数组。
	 */
	soRefElement(oldBuffer, offsetInOld, JUMP);
}

offer()主要流程就这样,CAS抢槽位,确保只有单个线程能生产,CAS失败的线程自旋重试。如果遇到队列需要扩容,则将producerIndex设为奇数,其他线程自旋等待扩容完成,扩容后再设为偶数,通知其它线程继续生产。

poll()分析

元素生产好了,就是为了调用poll()来进行消费的。

poll()首先还是找到consumerBuffer指向的当前消费数组,根据消费索引consumerIndex计算要消费的元素相较于Array的内存地址偏移量,根据这个偏移量来获取元素。

如果元素为null,并不代表队列是空的,还要比较consumerIndex和producerIndex,如果两者索引不同,那么producerIndex肯定是大于consumerIndex的,说明生产者已经在生产了,移动了producerIndex,只是还没来得及将元素填充到数组而已。因为生产者是先CAS递增producerIndex,再将元素填充到数组的,两步之间存在一个非常短的时间差,如果消费者恰好在这个时间差内去消费数据,那么就自旋等待一下,等待生产者填充元素到数组。

如果元素为JUMP,说明队列扩容了,消费者需要根据数组的最后一个元素找到扩容后的新数组,消费新数组的元素。

// MpscQueue是多生产单消费者的Queue,因此poll()没有做并发控制。
@Override
public E poll() {
	/*
	consumerBuffer和producerBuffer会在Queue的构造函数中被初始化,
	初始化时,两者会指向同一个数组。随着生产者不断生产数据,Queue扩容,producerBuffer会慢慢指向新的数组。
	 */
	final E[] buffer = consumerBuffer;
	// 消费者索引
	final long index = lpConsumerIndex();
	final long mask = consumerMask;

	// 计算消费者需要消费的元素在数组中的地址偏移量
	final long offset = modifiedCalcCircularRefElementOffset(index, mask);
	// 根据offset取出元素e
	Object e = lvRefElement(buffer, offset);
	if (e == null) {
		if (index != lvProducerIndex()) {
			/*
			offer()时生产者先CAS改producerIndex,再设置元素。
			中间会有一个时间差,此时会自旋,等待元素设置完成。
			 */
			do {
				e = lvRefElement(buffer, offset);
			}
			while (e == null);
		} else {//元素已经消费完
			return null;
		}
	}

	if (e == JUMP) {// 代表队列扩容了
		/*
		通过当前数组的最后一个元素,获取下一个待消费的数组,
		同时,消费者还会将最后一个元素设为BUFFER_CONSUMED,代表当前数组已经消费完毕。
		 */
		final E[] nextBuffer = nextBuffer(buffer, mask);
		// 从新数组中消费元素
		return newBufferPoll(nextBuffer, index);
	}

	// 取出元素后,将原来的槽位设为null
	soRefElement(buffer, offset, null);
	// 递增consumerIndex
	soConsumerIndex(index + 2);
	return (E) e;
}

如果队列扩容了,nextBuffer()会找到扩容后的新数组,同时它还会将旧数组的最后一个元素设为BUFFER_CONSUMED,代表当前数组已经被消费完了,也就从链表中剔除了。

// 找到扩容后的新数组
private E[] nextBuffer(final E[] buffer, final long mask) {
    // 计算数组最后一个元素的地址偏移量
    final long offset = nextArrayOffset(mask);
    // 找到下一个数组
    final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
    // 消费者Buffer指向新数组
    consumerBuffer = nextBuffer;
    // 重新计算Mask,数组长度不变的则Mask不变
    consumerMask = (length(nextBuffer) - 2) << 1;
    // 将旧数组的最后一个元素设为BUFFER_CONSUMED,代表消费完毕。
    soRefElement(buffer, offset, BUFFER_CONSUMED);
    return nextBuffer;
}

得到新数组后,会调用newBufferPoll()从新数组中消费数据:

// 从扩容后的新数组里消费数据,索引下标不变
private E newBufferPoll(E[] nextBuffer, long index) {
	// 根据consumerIndex计算要消费的元素相较于Array的内存偏移量
	final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
	// 根据offset取出这个元素
	final E n = lvRefElement(nextBuffer, offset);
	if (n == null) {//offer()的元素不可能为null,一般不会进这个if
		throw new IllegalStateException("new buffer must have at least one element");
	}
	// 元素取出后将那个那个槽位设为null
	soRefElement(nextBuffer, offset, null);
	// 递增consumerIndex
	soConsumerIndex(index + 2);
	return n;
}

消费者取出数据后,会将数组原来的槽位中填充null,其实也就代表这个槽位没有使用,可以被复用。

至此,poll()的消费流程也全部分析结束了,可以看到,全程都没有挂起线程,顶多就是自旋等待。

总结

MpscQueue是一个「多生产者单消费者」的高性能无锁队列,符合Netty EventLoop的任务消费模型。
它用到了大量的CAS操作,对于需要做并发控制的地方,确保只有一个线程会执行成功,其他CAS失败的线程会自旋重试,全程都是无锁非阻塞的。不管是扩容,还是等待元素被填充到数组,这些过程都是会极快完成的,因此短暂的自旋会比线程挂起再唤醒效率更高。
MpscQueue由一系列数组构成,数组的最后一个元素指向下一个数组,形成单向链表。数组扩容后会在原槽位填充JUMP元素,消费者遇到该元素就知道要寻找新数组继续消费了。

MpscQueue全程无锁化,非阻塞,相较于JDK提供的同步阻塞队列,性能有很好的提升,这也是Netty后来的版本将任务队列替换为JCtools的重要原因。

标签:扩容,无锁,队列,元素,long,源码,线程,数组,MpscQueue
来源: https://blog.csdn.net/qq_32099833/article/details/117547853

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有