ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

atomic框架:AtomicReference

2022-04-25 17:02:10  阅读:176  来源: 互联网

标签:return 框架 AtomicReference current pair atomic Pair public


一、AtomicReference简介

AtomicReferenceAtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference则对应普通的对象引用。也就是说它可以保证你在修改对象引用时的线程安全性。

AtomicReference是作用是对"对象"进行原子操作。提供了一种读和写都是原子性的对象引用变量。原子意味着多个线程试图改变同一个AtomicReference(例如比较和交换操作)将不会使得AtomicReference处于不一致的状态。

为什么需要AtomicReference?难道多个线程同时对一个引用变量赋值也会出现并发问题?
引用变量的赋值本身没有并发问题,也就是说对于引用变量var,类似下面的赋值操作本身就是原子操作:
Foo var = ... ;
AtomicReference的引入是为了可以用一种类似乐观锁的方式操作共享资源,在某些情景下以提升性能。

我们知道,当多个线程同时访问共享资源时,一般需要以加锁的方式控制并发:

volatile Foo sharedValue = value;
Lock lock = new ReentrantLock();

lock.lock();
try{
    // 操作共享资源sharedValue
}
finally{
    lock.unlock();
}

上述访问方式其实是一种对共享资源加悲观锁的访问方式。而AtomicReference提供了以无锁方式访问共享资源的能力,看看如何通过AtomicReference保证线程安全,来看个具体的例子:

public class TestAtomicRef {
    public static void main(String[] args) throws InterruptedException {
        AtomicReference<Integer> ref = new AtomicReference<>(new Integer(1000));

        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread t = new Thread(new Task(ref), "Thread-" + i);
            list.add(t);
            t.start();
        }

        for (Thread t : list) {
            t.join();
        }
        System.out.println(ref.get());    // 打印2000
    }
}

class Task implements Runnable {
    private AtomicReference<Integer> ref;

    Task(AtomicReference<Integer> ref) {
        this.ref = ref;
    }
    
    @Override
    public void run() {
        for (; ; ) {    //自旋操作
            Integer oldV = ref.get();  
            if (ref.compareAndSet(oldV, oldV + 1))  // CAS操作
                break;
        }
    }
}

上述示例,最终打印“2000”。

该示例并没有使用锁,而是使用自旋+CAS的无锁操作保证共享变量的线程安全。1000个线程,每个线程对金额增加1,最终结果为2000,如果线程不安全,最终结果应该会小于2000

通过示例,可以总结出AtomicReference的一般使用模式如下:

AtomicReference<Object> ref = new AtomicReference<>(new Object());
Object oldCache = ref.get();

// 对缓存oldCache做一些操作
Object newCache = someFunctionOfOld(oldCache); 

// 如果期间没有其它线程改变了缓存值,则更新
boolean success = ref.compareAndSet(oldCache , newCache);

上面的代码模板就是AtomicReference的常见使用方式,看下compareAndSet方法:

public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

该方法会将入参的expect变量所指向的对象和AtomicReference中的引用对象进行比较,如果两者指向同一个对象,则将AtomicReference中的引用对象重新置为update,修改成功返回true,失败则返回false。也就是说,AtomicReference其实是比较对象的引用

二、AtomicReference源码

public class AtomicReference<V> implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;

    // 获取Unsafe对象,Unsafe的作用是提供CAS操作
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicReference.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    //通过volatile关键字保证value值的可见性。
    private volatile V value;

    //使用给定的初始值创建新的AtomicReference
    public AtomicReference(V initialValue) {
        value = initialValue;
    }

    //使用null初始值创建新的AtomicReference
    public AtomicReference() {
    }
	
    //获取当前值
    public final V get() {
        return value;
    }

    //设置为给定的新值
    public final void set(V newValue) {
        value = newValue;
    }

    // 懒设置,最终设置为给定的值,newValue:给定的更新值 
    public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    }

    /**
     * 如果当前值等于预期值,则将当前值设置为给定的更新值
     * 
     * @param expect 预期值
     * @param update 更新值
     * @return boolean 是否更新成功
     */
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

    /**
     * 如果当前值等于预期值,则以原子的方式将当前值设置为给定的更新值
     *  
     * @param expect 预期值
     * @param update 给定的更新值
     * @return boolean 是否更新成功
     */
    public final boolean weakCompareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

    //使用原子的方式将值更新为给定的值,并返回更新前的值
    public final V getAndSet(V newValue) {
        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    }

    public final V getAndUpdate(UnaryOperator<V> updateFunction) {
        V prev, next;
        do {
            prev = get();
            next = updateFunction.apply(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final V updateAndGet(UnaryOperator<V> updateFunction) {
        V prev, next;
        do {
            prev = get();
            next = updateFunction.apply(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final V getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction) {
        V prev, next;
        do {
            prev = get();
            next = accumulatorFunction.apply(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final V accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction) {
        V prev, next;
        do {
            prev = get();
            next = accumulatorFunction.apply(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }
}

AtomicReference是通过volatileUnsafe提供的CAS实现原子操作

  • value值是volatile类型
    当某个线程修改value值时,其余线程获取的值都是最新的value值,也就是修改之后的volatile
  • 通过CAS设置value
    当某个线程池通过CAS函数设置value时,操作是原子的,也就是线程在操作value时不会被中断

三、AtomicStampedReference的引入

CAS操作可能存在的问题:

CAS操作可能存在ABA的问题,就是说:
假如一个值原来是A,变成了B,又变成了A,那么CAS检查时会发现它的值没有发生变化,但是实际上却变化了。

一般来讲这并不是什么问题,比如数值运算,线程其实根本不关心变量中途如何变化,只要最终的状态和预期值一样即可。但是,有些操作会依赖于对象的变化过程,此时的解决思路一般就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A - 2B - 3A

AtomicStampedReference就是上面所说的加了版本号的AtomicReference

3.1 AtomicStampedReference原理

先来看下如何构造一个AtomicStampedReference对象,AtomicStampedReference只有一个构造器:

public class AtomicStampedReference<V> {

    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

    private volatile Pair<V> pair;
    
    //构造方法, 传入引用和戳
    public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }

    //返回引用
    public V getReference() {
        return pair.reference;
    }

    //返回版本戳
    public int getStamp() {
        return pair.stamp;
    }

    public V get(int[] stampHolder) {
        Pair<V> pair = this.pair;
        stampHolder[0] = pair.stamp;
        return pair.reference;
    }

    public boolean weakCompareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
        return compareAndSet(expectedReference, newReference,
                             expectedStamp, newStamp);
    }

    //如果当前引用 等于 预期值并且 当前版本戳等于预期版本戳, 将更新新的引用和新的版本戳到内存
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

    //设置当前引用的新引用和版本戳
    public void set(V newReference, int newStamp) {
        Pair<V> current = pair;
        if (newReference != current.reference || newStamp != current.stamp)
            this.pair = Pair.of(newReference, newStamp);
    }

    //如果当前引用 等于 预期引用, 将更新新的版本戳到内存
    public boolean attemptStamp(V expectedReference, int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            (newStamp == current.stamp ||
             casPair(current, Pair.of(expectedReference, newStamp)));
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }

    static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class<?> klazz) {
        try {
            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
        } catch (NoSuchFieldException e) {
            // Convert Exception to corresponding Error
            NoSuchFieldError error = new NoSuchFieldError(field);
            error.initCause(e);
            throw error;
        }
    }
}

可以看到,除了传入一个初始的引用变量initialRef外,还有一个initialStamp变量,initialStamp其实就是版本号(或者说时间戳),用来唯一标识引用变量。在构造器内部,实例化了一个Pair对象,Pair对象记录了对象引用和时间戳信息,采用int作为时间戳,实际使用的时候,要保证时间戳唯一(一般做成自增的),如果时间戳如果重复,还会出现ABA的问题。

AtomicStampedReference的所有方法,其实就是Unsafe类针对这个Pair对象的操作。
AtomicReference相比,AtomicStampedReference中的每个引用变量都带上了pair.stamp这个版本号,这样就可以解决CAS中的ABA问题了。

3.2 AtomicStampedReference使用示例

来看下AtomicStampedReference的使用:

// 创建AtomicStampedReference对象,持有Foo对象的引用,初始为null,版本为0
AtomicStampedReference<Foo> asr = new AtomicStampedReference<>(null,0);  

int[] stamp = new int[1];
Foo oldRef = asr.get(stamp);   // 调用get方法获取引用对象和对应的版本号
int oldStamp = stamp[0];       // stamp[0]保存版本号

//尝试以CAS方式更新引用对象,并将版本号+1
asr.compareAndSet(oldRef, null, oldStamp, oldStamp + 1)

上述模板就是AtomicStampedReference的一般使用方式,注意下compareAndSet方法,AtomicStampedReference内部保存了一个pair对象,该方法的逻辑如下:

如果AtomicStampedReference内部pair的引用变量、时间戳与入参expectedReferenceexpectedStamp都一样,说明期间没有其它线程修改过AtomicStampedReference,可以进行修改。此时,会创建一个新的Pair对象(casPair方法,因为Pair是Immutable类)。

但这里有段优化逻辑,就是如果newReference == current.reference && newStamp == current.stamp,说明用户修改的新值和AtomicStampedReference中目前持有的值完全一致,那么其实不需要修改,直接返回true即可。

四、AtomicMarkableReference

AtomicStampedReference可以给引用加上版本号,追踪引用的整个变化过程,如:A -> B -> C -> D - > A,通过AtomicStampedReference可以知道引用变量中途被更改了3次。但是有时候不需要关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

public class AtomicMarkableReference<V> {

    private static class Pair<T> {
        final T reference;
        final boolean mark;
        private Pair(T reference, boolean mark) {
            this.reference = reference;
            this.mark = mark;
        }
        static <T> Pair<T> of(T reference, boolean mark) {
            return new Pair<T>(reference, mark);
        }
    }

    private volatile Pair<V> pair;

    public AtomicMarkableReference(V initialRef, boolean initialMark) {
        pair = Pair.of(initialRef, initialMark);
    }

    public V getReference() {
        return pair.reference;
    }

    public boolean isMarked() {
        return pair.mark;
    }

    public V get(boolean[] markHolder) {
        Pair<V> pair = this.pair;
        markHolder[0] = pair.mark;
        return pair.reference;
    }

    public boolean weakCompareAndSet(V       expectedReference,
                                     V       newReference,
                                     boolean expectedMark,
                                     boolean newMark) {
        return compareAndSet(expectedReference, newReference,
                             expectedMark, newMark);
    }

    public boolean compareAndSet(V       expectedReference,
                                 V       newReference,
                                 boolean expectedMark,
                                 boolean newMark) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedMark == current.mark &&
            ((newReference == current.reference &&
              newMark == current.mark) ||
             casPair(current, Pair.of(newReference, newMark)));
    }

    public void set(V newReference, boolean newMark) {
        Pair<V> current = pair;
        if (newReference != current.reference || newMark != current.mark)
            this.pair = Pair.of(newReference, newMark);
    }

    public boolean attemptMark(V expectedReference, boolean newMark) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            (newMark == current.mark ||
             casPair(current, Pair.of(expectedReference, newMark)));
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }

    static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class<?> klazz) {
        try {
            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
        } catch (NoSuchFieldException e) {
            // Convert Exception to corresponding Error
            NoSuchFieldError error = new NoSuchFieldError(field);
            error.initCause(e);
            throw error;
        }
    }
}

可以看到,AtomicMarkableReference的唯一区别就是不再用int标识引用,而是使用boolean变量——表示引用变量是否被更改过。从语义上讲,AtomicMarkableReference对于那些不关心引用变化过程,只关心引用变量是否变化过的应用会更加友好。

参考文章

atomic框架:AtomicReference

标签:return,框架,AtomicReference,current,pair,atomic,Pair,public
来源: https://www.cnblogs.com/ciel717/p/16190565.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有