ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

ArrayBlockingQueue源码剖析

2021-06-18 22:34:49  阅读:197  来源: 互联网

标签:notEmpty lock 剖析 源码 new ArrayBlockingQueue items final notFull


生产者-消费者
ArrayBlockingQueue是一个实现了BlockingQueue接口的类,其可以很方便的实现生产者-消费者模式。用法如下:


class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}

class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}

class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
two-condition算法来进行并发控制
在ArrayBlockingQueue中有如下三个变量声明(定义):


/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
1
2
3
4
5
6
7
8
9
10
11
12
13
实现生产者-消费者的并发控制很简单,一把锁,两个条件!再来看ArrayBlockingQueue的构造函数代码:


public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
1
2
3
4
5
6
7
8
9
10
在初始化的时候,ArrayBlockingQueue对lock、notEmpty、notFull进行了初始化。

生产者进行生产
首先查看生产者生产时候需要调用的put(E e)方法:


public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
首先通过ReentrantLock的lockInterruptibly()方法来尝试获得锁,该方法在获取锁之后,可以继续响应线程的interrupt操作,注意lock.unlock()一定要写在finally块中,不然在出现异常之后,有可能永远也释放不了锁了!

当发现当前数量已经满的时候:while(count == items.length),那么将会让生产者(当前线程)进行等待:notFull.await(),否则进行insert(e)操作。

继续跟踪insert(e)操作不难想到,在插入成功之后,会通知notEmpty来唤醒消费者(某一个正在等待notEmpty条件的线程),告知有了新的产品可消费了!


private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
1
2
3
4
5
6
7
8
如上可知:如果队列已满(full),那么notFull进行等待,否则插入成功之后,唤醒notEmpty告知不用等待了。同理:消费者进行消费的take操作也是类似的。

消费者进行消费

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
概括
整体而言,在有了ReentrantLock、Condition之后,生产者-消费者模式实现起来还是很简单的。ReentrantLock负责加锁释放锁,Condition负责等待唤醒线程。
————————————————
版权声明:本文为CSDN博主「赵坤的个人网站」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/anxiaoyi520/article/details/46670675

标签:notEmpty,lock,剖析,源码,new,ArrayBlockingQueue,items,final,notFull
来源: https://www.cnblogs.com/hanease/p/14901428.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有