ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

ReentrantLock 的公平锁源码分析

2019-07-02 18:03:55  阅读:192  来源: 互联网

标签:Node node int pred ReentrantLock 源码 线程 公平 arg


ReentrantLock 源码分析   以公平锁源码解析为例:

1:数据结构:

维护Sync 对象的引用:   private final Sync sync;

Sync对象继承 AQS,  Sync  分为两个类:处理公平锁锁和非公平锁:

FairSync   NonfairSync

 具体的类图如下:

  

2:接下来重点分析AQS这个类:AbstractQueuedSynchronizer:

AQS中的成员变量:

private transient volatile Node head;   //AQS维护队列的头结点

private transient volatile Node tail;     // AQS维护队列的尾结点

private volatile int state;                            // AQS 锁的状态  数量标识锁被获取的次数

下面看看NODE 结点的成员变量:

volatile int waitStatus;   //等待状态

volatile Node prev;      //前继节点

volatile Node next;      //后继节点

volatile Thread thread;   //线程对象

Node nextWaiter;       //下一个等待节点

从NODE的数据结构可以看出来,AQS里面维护的队列的数据结构是双链表的形式;

 
   

 

 

 

 

 

 

 

 

3:接下来分析 ReentrantLock  的构造方法:

ReentrantLock lock = new ReentrantLock(true);   //传入true,说明是构造公平锁

具体的构造方法如下,返回FairSync 对象:

public ReentrantLock(boolean fair) {

        sync = fair ? new FairSync() : new NonfairSync();

    }

4:lock方法的分析:因为是公平锁,所以调用 FairSync 下的lock方法:

final void lock() {

            acquire(1);

        }

   Acquire 的方法如下:

public final void acquire(int arg) {    // arg=1

        if (!tryAcquire(arg) &&    

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

}

接下来分两种场景来分析tryAcquire(arg)  1:同一线程第一次或N次获取锁(其他线程没有获取到锁)   2:其他线程已经获取到锁,当前线程尝试去获取锁;

//场景 1:

protected final boolean tryAcquire(int acquires) {   // acquires=1

            final Thread current = Thread.currentThread();   //当前线程:main-thread

            int c = getState();    //如果为第一次获取锁c=0  如果main线程已经获取过锁,则c为加锁的次数

            if (c == 0) {     //当前线程第一次获取锁

                if (!hasQueuedPredecessors() &&  // hasQueuedPredecessor的分析如下单独分析:

                    compareAndSetState(0, acquires)) {  //cas原子操作 state=1

                    setExclusiveOwnerThread(current);  //独占线程设置为当前线程

                    return true;  //返回true表明加锁成功

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // c=n 的情况下

                int nextc = c + acquires;    // nextc=n+1

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);    //设置 state=n+1

                return true;  //获取到锁,返回true

            }

            return false;

        }

说明:hasQueuedPredecessors主要是判断当前线程所在的节点是不是CLH队列的首个位置,这个判断的目的是公平锁的公平获取锁的机制

hasQueuedPredecessors的源码如下:以该线程是第一次获取锁为例分析:

public final boolean hasQueuedPredecessors() {

        Node t = tail;      // tail=null

        Node h = head;    // head=null

        Node s;

        return h != t &&    //返回false

            ((s = h.next) == null || s.thread != Thread.currentThread());

    }

 

//场景2 分析:有其他线程未释放锁(main-thread 持有锁,thread-1尝试去获取锁)

protected final boolean tryAcquire(int acquires) {   // acquires=1

            final Thread current = Thread.currentThread();   //当前线程:thread-1

            int c = getState();    // 由于其他持有锁 state 至少为1        

if (c == 0) {    

                if(!hasQueuedPredecessors()&&  compareAndSetState(0, acquires)) {                    

setExclusiveOwnerThread(current); 

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // current=thread1  // getExclusiveOwnerThread()=main

                int nextc = c + acquires;                   

if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);   

                return true; 

            }

            return false;   //此时返回false表明尝试获取锁失败

        }

 

5:上面的场景1 获取到锁后返回true,则lock 方法执行结束。下面分析场景2:

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&     //尝试获取锁失败,返回false

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

!tryAcquire(arg)  返回为true;接下来进入

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  这个逻辑;

这个方法有两层处理:

1:addWaiter(Node.EXCLUSIVE)   将当前线程调价到CLH队列中

2:acquireQueued();逐步执行CLH队列中的线程,如果当前线程获取到锁则返回,否则,当前线程进行休眠,直到唤醒并重新获取到锁才返回;

以下分析这两个方法:场景:main线程获取到锁但未释放,这是线程 thread-1去获取锁:(假设此时没有其他线程在CLH队列中,即CLH队列为null)

addWaiter(Node.EXCLUSIVE)方法:入参 Node.EXCLUSIVE 为null;

 

   private Node addWaiter(Node mode) {   //mode=null  EXCLUSIVE 标识节点为独占锁模型

        Node node = new Node(Thread.currentThread(), mode); //创建新节点:新节点中线程为当前线程,节点模型为独占锁:thread= thread-1   nextWaiter= mode

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;   // 此时tail=null

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);  //进入enq方法

        return node;

    }

  Enq() 方法如下:

private Node enq(final Node node) {   //入参为上一步新建的节点

        for (;;) {

            Node t = tail;    // 第一次遍历逻辑 tail=null

            if (t == null) { // Must initialize

                if (compareAndSetHead(new Node()))  //CAS 创建表头 Head

                    tail = head;

            } else {

                node.prev = t;  //第二次遍历逻辑:node为上面新建的节点:thread= thread-1   nextWaiter= mode, node.prev指向表头 t 为表头

                if (compareAndSetTail(t, node)) {  //CAS设置队列尾节点为当前node

                    t.next = node;   // 表头后继节点指向当前节点

                    return t;

                }

            }

        }

}

该场景经过上面的处理之后 CLH队列的数据结构如下:

第一次遍历:

 Head,tail节点

 

 

 

 

 

 

 

 

 

 

 

第二次遍历:

   Head               当前线程node:设置为tail

 
   

 

 

 

 

 

 

 

 

接下来分析

acquireQueued这个方法

final boolean acquireQueued(final Node node, int arg) {  //node为当前线程节点 arg=1

        boolean failed = true;

        try {

            boolean interrupted = false; //当前线程在休眠时,有没有被中断过

            for (;;) {

                final Node p = node.predecessor(); //获取前继节点,这里为head节点

                if (p == head && tryAcquire(arg)) { //这里p== head为true,接下来进入

// tryAcquire(arg)这个方法,tryAcquire(arg)方法前面分析过了,这里返回false;

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

//   接下来会进入以下的逻辑:下面会分析这两个方法

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

 

这里再次回顾下tryAcquire(arg) 方法:返回fasle

protected final boolean tryAcquire(int acquires) {    // acquires=1

            final Thread current = Thread.currentThread(); //当前线程 thread-1

            int c = getState();   //state=1

            if (c == 0) {

                if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // getExclusiveOwnerThread

获取到的线程是tryAcquire(int acquires)中设置的值 这里是 main; current=thread-1

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

 

1): acquireQueued(final Node node, int arg) 方法中for循环第一次执行shouldParkAfterFailedAcquire(p, node) 方法分析:源码如下:

入参:pred为前继节点,这里是head  node为当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;  // ws=0

        if (ws == Node.SIGNAL) 

                     return true;

        if (ws > 0) {

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

        }

        return false;  //返回false;

    }

   For循环第二次执行 shouldParkAfterFailedAcquire(p, node)方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;  // ws=-1   在第一次已经设置为-1

        if (ws == Node.SIGNAL)   //返回true

                     return true;

        if (ws > 0) {

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

        }

        return false;  //返回false;

    }

 

返回true后,接下进入parkAndCheckInterrupt()这个方法:

源码分析如下:

private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);  //阻塞当前线程

        return Thread.interrupted(); // 返回线程的中断状态

    }

LockSupport.park(this); //作用:前继线程节点的状态是 Node.SIGNAL;挂起当前线程;

Thread.interrupted(); //当前被挂起的线程被前继线程中断,返回线程的中断状态;

下面解释一个线程的行为:LockSupport.park(this)  线程被挂起:

当线程被挂起的时候唤醒的方式有两种:

1:unpark 的方式唤醒,前继节点线程使用完锁后,通过unpark方式唤醒当前线程

2:中断唤醒,其他线程通过 interrupt 中断当前线程

 

接下来继续分析:acquire(int arg)

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

这个时候的重点放在分析 selfInterrupt(); 这个方法上;

进入这个方法的条件是 当前线程被中断过,并且获取锁成功了;

static void selfInterrupt() {

        Thread.currentThread().interrupt();  //当前线程产生一个中断,真正被唤醒

    }

 

到此为止,ReentrantLock 的公平锁源码分析结束。

标签:Node,node,int,pred,ReentrantLock,源码,线程,公平,arg
来源: https://www.cnblogs.com/beppezhang/p/11122200.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有