ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程安全的Collection相关类介绍和使用

2021-05-25 11:00:04  阅读:165  来源: 互联网

标签:elements 队列 lock 元素 Collection 安全 线程 数组


原文1:https://blog.csdn.net/lkp1603645756/article/details/85016035

原文2:https://www.cnblogs.com/shamao/p/11065885.html

目录

1 引言

在我们使用多线程时,有时会使用到集合,但是多线程多集合进行读写操作时,会报ConcurrentModificationException并发异常的错误,原因的不允许读的同时进行写操作。

2 并发包下的集合类介绍

如下是java.util.concurrent包下集合组成结构的类图
在这里插入图片描述

3 线程安全集合的特性

下列介绍的集合统一特性:线程安全,支持并发操作

非阻塞队列(队列无数据,操作队列产生异常或返回null,不具备等待/阻塞的特色)

  • ConcurrentHashMap:相对于线程安全的HashTable,优势在于HashTable不支持在循环(iterator)中对结构调整(增、删),否则会有ConcurrentModificationException异常。采用了分段锁的设计, 将一个HashMap分成N段,使用key的hashCode来确定分配到那个字段,只有在同一分段内才存在竞态关系,每个分段相当于一个HashTable,执行效率相当于提升了N倍。

  • ConcurrentSkipListMap:支持排序。

  • ConcurrentSkipListSet:支持排序且不允许重复元素。

注:上面两个,排序的实现要求集合中的对象实现Comparable接口,不重复的实现为重写hashCode和equals方法

  • ConcurrentLinkedQueue:队列操作(只操作队头),poll() / peek() / element()

  • ConcurrentLinkedDeque:双端队列(支持操作队头和队尾),pollFirst() / pollLast()

  • CopyOnWriteArrayList:适用于读操作>>写操作的情况。在写时拷贝,也就是如果需要对CopyOnWriteArrayList的内容进行改变,首先会拷贝一份新的List并且在新的List上进行修改,最后将原List的引用指向新的List。线程安全地遍历,因为如果另外一个线程在遍历的时候修改List的话,实际上会拷贝出一个新的List上修改,而不影响当前正在被遍历的List。ArrayList非线程安全

  • CopyOnWriteArraySet:HashSet非线程安全

阻塞队列(取空队列需要等待直到有元素,塞满队列需要等待直到有空间)

  • ArrayBlockingQueue:有界阻塞队列

  • LinkedBlockingQueue:无界阻塞队列,基于单向链表的实现

  • PriorityBlockingQueue:无界有序的阻塞队列,基于数组

  • SynchronousQueue:同步队列,插入需等待移除,移除需等待插入

  • DelayQueue:延时执行任务的队列,集合元素需实现java.util.concurrent.Delayed接口

  • LinkedTransferQueue:与SynchronousQueue功能类似,但有嗅探功能,能尝试性的添加数据(tryTransfer()方法)

什么是CopyOnWrite容器

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

4 线程安全类的详细介绍与使用

  • CopyOnWriteArrayList:实现了List接口,相当于线程安全的ArrayList。

  • CopyOnWriteArraySet:继承于AbstractSet类,相当于线程安全的HashSet。CopyOnWriteArraySet内部包含一个CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的。

  • ConcurrentSkipListSet:继承于AbstractSet类,相当于线程安全的TreeSet。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。

  • ArrayBlockingQueue:继承于AbstractQueue类,是数组实现的线程安全的有界的阻塞队列。

  • LinkedBlockingQueue:继承于AbstractQueue类,是单向链表实现的(指定大小)阻塞队列,该队列按FIFO(先进先出)排序元素。

  • LinkedBlockingDeque:继承于AbstractQueue类,是双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式。

  • ConcurrentLinkedQueue:继承于AbstractQueue类,是单向链表实现的无界队列,该队列按FIFO(先进先出)排序元素。

  • ConcurrentLinkedDeque:继承于AbstractQueue类,是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。

CopyOnWriteArrayList

说明

  • CopyOnWriteArrayList的内部有个“volatile数组”来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”,这就是它叫做CopyOnWriteArrayList的原因。CopyOnWriteArrayList就是通过这种方式实现的动态数组,不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低,但是单单只是进行遍历查找的话,效率比较高。

  • CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入,就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的
    保证。

  • CopyOnWriteArrayList通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”,这样,就达到了保护数据的目的。

  • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。迭代器支持hasNext()、next()等不可变操作,但不支持add()、remove()等可变操作。

构造方法

public CopyOnWriteArrayList() {
    setArray(new Object[0]);
}

public CopyOnWriteArrayList(E[] toCopyIn) {
    setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

public CopyOnWriteArrayList(Collection<? extends E> c) {
    Object[] elements;
    if (c.getClass() == CopyOnWriteArrayList.class)
        elements = ((CopyOnWriteArrayList<?>)c).getArray();
    else {
        elements = c.toArray();
        // c.toArray might (incorrectly) not return Object[] (see 6260652)
        if (elements.getClass() != Object[].class)
            elements = Arrays.copyOf(elements, elements.length, Object[].class);
    }
    setArray(elements);
}

获取和设置array的方法

array是被volatile和transient修饰的一个数组。
关于volatile关键字,我们知道“volatile能让变量变得可见”,即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。正在由于这种特性,每次更新了“volatile数组”之后,其它线程都能看到对它所做的更新。
关于transient关键字,它是在序列化中才起作用,transient变量不会被自动序列化。

private transient volatile Object[] array;

final Object[] getArray() {
    return array;
}

final void setArray(Object[] a) {
    array = a;
}

添加元素

因为array数组是volatile修饰的,不能保证线程安全,所以在添加元素时使用锁来保证线程安全。

又因为array数组是volatile修饰的,所以在调用了setArray()方法后,能保证其它线程都能看到新添加的元素。

public void add(int index, E element) {
    // 使用锁来保证线程安全。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获得array指向的引用地址。
        Object[] elements = getArray();
        int len = elements.length;
        // 如果指定位置越界,则抛出异常。
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
        Object[] newElements;
        // 如果插入位置是末尾。
        int numMoved = len - index;
        if (numMoved == 0)
            // 将原数组进行拷贝并扩大一个容量。
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            // 如果不是插入到末尾,则创建扩大一个容量的数组。
            newElements = new Object[len + 1];
            // 分段复制原数组,并空出指定位置。
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1, numMoved);
        }
        // 设置指定位置的指定元素。
        newElements[index] = element;
        // 将array引用的地址指向新的数组。
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

删除元素

删除元素就是将array数组中指定位置的元素删除。

它的实现方式是,如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。否则,新建数组,然后将array数组中被删除元素之外的其它元素拷贝到新数组中。最后,将新数组赋值给array数组。

public E remove(int index) {
    // 使用锁来保证线程安全。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获得array指向的引用地址。
        Object[] elements = getArray();
        int len = elements.length;
        // 根据指定的位置获取元素。
        E oldValue = get(elements, index);
        // 如果指定的元素是最后一个元素。
        int numMoved = len - index - 1;
        if (numMoved == 0)
            // 将原数组进行拷贝截取并将array的引用地址指向新的数组。
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            // 如果不是最后一个元素,则创建减少一个容量的数组。
            Object[] newElements = new Object[len - 1];
            // 分段复制原数组,并空出指定位置。
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index, numMoved);
            // 将array的引用地址指向新的数组。
            setArray(newElements);
        }
        // 返回该位置上的元素。
        return oldValue;
    } finally {
        lock.unlock();
    }
}

获取元素

获取元素很简单,就是返回array数组的指定位置的元素。

public E get(int index) {
    return get(getArray(), index);
}

private E get(Object[] a, int index) {
    return (E) a[index];
}

设置元素

在设置元素之前判断指定位置的旧元素是否和新元素相等,如果相等则不进行替换,但仍然要调用setArray()方法。

public E set(int index, E element) {
    // 使用锁来保证线程安全。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获得array指向的引用地址。
        Object[] elements = getArray();
        // 获取指定位置的旧元素。
        E oldValue = get(elements, index);
        // 如果旧元素的引用和新元素的引用不同。
        if (oldValue != element) {
            // 创建新的数组并拷贝array数组的值,替换新数组指定位置的元素。
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            // 将array的引用地址指向新的数组
            setArray(newElements);
        } else {
            // 为了确保voliatile的语义,任何一个读操作都应该是写操作的结构,所以尽管写操作没有改变数据,还是调用set方法,当然这仅仅是语义的说明,去掉也是可以的。
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

遍历

CopyOnWriteArrayList类的迭代方法返回的是一个COWIterator类的对象。

public Iterator<E> iterator() {
     return new COWIterator<E>(getArray(), 0);
}

CopyOnWriteArrayList在类里维护了一个用于遍历的COWIterator类,COWIterator类实现了ListIterator接口。

static final class COWIterator<E> implements ListIterator<E> {
    // 数组的快照。
    private final Object[] snapshot;
    // 指定下标。
    private int cursor;

    // 构造方法。
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }

    // 判断是否存在下一个元素。
    public boolean hasNext() {
        return cursor < snapshot.length;
    }

    // 判断是否存在上一个元素。
    public boolean hasPrevious() {
        return cursor > 0;
    }

    // 获取下一个元素。
    @SuppressWarnings("unchecked")
    public E next() {
        if (! hasNext())
            throw new NoSuchElementException();
        return (E) snapshot[cursor++];
    }

    // 获取上一个元素。
    @SuppressWarnings("unchecked")
    public E previous() {
        if (! hasPrevious())
            throw new NoSuchElementException();
        return (E) snapshot[--cursor];
    }

    // 获取下一个元素的位置。
    public int nextIndex() {
        return cursor;
    }

    // 获取上一个元素的位置。
    public int previousIndex() {
        return cursor-1;
    }

    // 不支持删除元素。
    public void remove() {
        throw new UnsupportedOperationException();
    }

    // 不支持修改元素。
    public void set(E e) {
        throw new UnsupportedOperationException();
    }

    // 不支持添加元素。
    public void add(E e) {
        throw new UnsupportedOperationException();
    }

    // JDK1.8新增的方法,使用迭代器Iterator的所有元素,并且第二次调用它将不会做任何事情。
    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        Object[] elements = snapshot;
        final int size = elements.length;
        for (int i = cursor; i < size; i++) {
            @SuppressWarnings("unchecked") E e = (E) elements[i];
            action.accept(e);
        }
        cursor = size;
    }
}

ArrayBlockingQueue

说明

  • ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。
  • ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定,ArrayBlockingQueue默认会使用非公平锁。
  • ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问。

构造方法

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);
    // 加锁是为了保证可见性,因为可能存在其他线程在初始化之后修改集合。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

添加元素

ArrayBlockingQueue提供了offer()方法和put()方法两种方式添加元素。

offer()方法添加失败会立即返回false,并且添加过程中不允许被其他线程中断。

put()方法添加失败会等待,并且在添加过程中可以被其他线程中断,抛出InterruptedException异常。

// 不允许被其他线程中断,添加失败则立即返回false。
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

// 允许被其他线程中断,抛出InterruptedException,并且添加失败会等待。
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

// 实际上的添加方法,添加成功后会唤醒一个等待删除元素的线程。
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

删除元素

ArrayBlockingQueue提供了poll()方法和take()方法两种方式删除元素。

poll()方法删除失败会立即返回false,并且添加过程中不允许被其他线程中断。

take()方法删除失败会等待,并且在删除过程中可以被其他线程中断,抛出InterruptedException异常。

// 不允许被其他线程中断,删除失败则立即返回null。
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

// 允许被其他线程中断,抛出InterruptedException,并且删除失败会等待。
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

// 实际上的删除方法,删除成功后会唤醒一个等待添加元素的线程。
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

LinkedBlockingQueue

说明

  • LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
  • LinkedBlockingQueue是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
  • LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
  • LinkedBlockingQueue在实现多线程对竞争资源的互斥访问时,对于插入和取出操作分别使用了不同的锁。此外,插入锁putLock和非满条件notFull相关联,取出锁takeLock和非空条件notEmpty相关联。通过notFull和notEmpty更细腻的控制锁。

属性

head是链表的表头。取出数据时,都是从表头head处插入。
last是链表的表尾。新增数据时,都是从表尾last处插入。
count是链表的实际大小,即当前链表中包含的节点个数。
capacity是列表的容量,它是在创建链表时指定的。
putLock是插入锁。
takeLock是取出锁。
notEmpty是非空条件。
notFull是非满条件。

构造方法

1 // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue。
2 LinkedBlockingQueue()
3 // 创建一个指定容量的LinkedBlockingQueue。
4 LinkedBlockingQueue(int capacity)
5 // 创建一个容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含给定collection的元素,元素按该collection迭代器的遍历顺序添加。
6 LinkedBlockingQueue(Collection<? extends E> c)

其他方法

// 将指定元素插入到此队列的尾部,如果队列已满,则等待。
void put(E e)
// 将指定元素插入到此队列的尾部,如果队列已满,则返回false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如果队列已满,则等待指定的时间。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取并移除此队列的头部,如果队列为空,则等待。
E take()
// 获取并移除此队列的头部,如果队列为空,则返回null。
E poll()
// 获取并移除此队列的头部,如果队列为空,则等待指定的时间。
E poll(long timeout, TimeUnit unit)
// 获取但不移除此队列的头,如果此队列为空,则返回null。
E peek()
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()

ConcurrentLinkedQueue

说明

  • ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。ConcurrentLinkedQueue使用CAS来保证更新的线程安全,是一个非阻塞队列。
  • ConcurrentLinkedQueue是一个基于链表的无界线程安全队列,按照FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

构造方法

1 // 创建一个最初为空的ConcurrentLinkedQueue。
2 ConcurrentLinkedQueue()
3 // 创建一个最初包含给定collection元素的ConcurrentLinkedQueue,按照此collection迭代器的遍历顺序来添加元素。
4 ConcurrentLinkedQueue(Collection<? extends E> c)

其他方法

// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取并移除此队列的头,如果队列为空,则返回null。
E poll()
// 获取但不移除此队列的头,如果队列为空,则返回null。
E peek()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 返回此队列中的元素数量。
int size()

标签:elements,队列,lock,元素,Collection,安全,线程,数组
来源: https://blog.csdn.net/weixin_43702146/article/details/117250238

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有