Java并发之ReentrantLock源码解析(一)-编程思维

Java并发之ReentrantLock源码解析(一)

ReentrantLock

ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源。相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继承ReentrantLock,并重写部分方法使其更加高效。

当一个线程调用ReentrantLock.lock()方法时,如果ReentrantLock没有被其他线程持有,且不存在额外的线程与当前线程竞争ReentrantLock,调用ReentrantLock.lock()方法后当前线程会占有此锁并立即返回,ReentrantLock内部会维护当前线程对锁的引用计数,当线程获取锁时会增加其线程对锁的引用计数,当线程释放锁时会减少线程对锁的引用计数,当前线程如果在占有锁之后,又重复获取锁,则会增加锁的引用计数,当锁的引用计数为0的时候,代表当前线程完全释放锁。需要注意的是,只有占有锁的线程才会增加锁的引用计数,当锁被占据时,如果有其他线程要竞争锁,ReentrantLock会把其他线程加入一个竞争锁的队列,并让线程陷入阻塞,直到占据锁的线程释放了锁,ReentrantLock才会唤醒队列中的线程重新竞争锁。

我们用下面的例子来加深对于锁的理解,假设我们的进程内目前没有任何线程竞争lock,此时锁的引用计数为0,有一个线程Thread-1调用完下面<1>处的lock()方法成功占有锁,此时锁的引用计数由0变为1。之后Thread-1调用了<2>处的methodB()方法,methodB()的<4>处又获取了一次锁,由于lock已经被Thread-1占据,所以这里简单的对锁的引用计数+1即可,此时锁的引用计数为2,Thread-1执行完methodB()的方法体后,执行<5>处的unlock()方法释放锁,这里对锁的引用计数-1,由2变为1。在调用完methodB后,执行methodA的方法体,最后执行<3>处的unlock()方法,将锁的引用计数由1变为0,Thread-1完全释放锁。此时,锁变为无主状态。

    private final ReentrantLock lock = new ReentrantLock();

    public void methodA() {
        try {
            lock.lock();//<1>
            methodB();//<2>
            //methodA body...
        } finally {
            lock.unlock();//<3>
        }

    }

    public void methodB() {
        try {
            lock.lock();//<4>
            //methodB body...
        } finally {
            lock.unlock();//<5>
        }
    }

  

ReentrantLock提供了isHeldByCurrentThread()和getHoldCount()两个方法,前者用于判断锁是否被当先调用线程持有,如果被当前调用线程持有则返回true;后者不仅会判断锁是否被当前线程持有,还会返回锁相对于当前线程的引用计数,毕竟锁是可重入的,如果锁没有被任何线程持有,或者被不是持有锁的线程调用getHoldCount()方法,就会返回0。

这两个方法的实现原理也很简单,我们知道在Java中可以调用Thread.currentThread()来获取当前线程对象。当我们调用ReentrantLock.lock()方法成功获取锁之后,ReentrantLock内部会用一个独占线程(exclusiveOwnerThread)字段来标识当前占用锁的Thread线程对象,如果线程释放了锁且锁的引用计数为0,则将独占线程字段标记为null。当要判断锁是否被当前线程持有,或者锁相对于当前线程的引用计数,则获取调用方线程的Thread对象,和内部的独占线程字段做下对比,如果两者的引用相等,代表当前线程占用了锁,如果引用不相等,则表示当前所可能处于无主状态,或者锁被其他线程持有。

如下面的代码,我们希望只有持有lock的线程才可以执行methodB()和methodC()方法,就可以用isHeldByCurrentThread()和getHoldCount()进行判断。

 

    private final ReentrantLock lock = new ReentrantLock();

    public void methodA() {
        try {
            lock.lock();
            methodB();
            methodC();
            //methodA body...
        } finally {
            lock.unlock();
        }
    }

    public void methodB() {
        if (lock.getHoldCount() != 0) {
            //methodB body...
        }
    }

    public void methodC() {
        if (lock.isHeldByCurrentThread()) {
            //methodC body...
        }
    }

  

需要注意的一点是,官方有给出isHeldByCurrentThread()和getHoldCount()两个方法的使用范围,仅针对于debug和测试。真正的生产环境如果有重入锁的需要,官方还是推荐用try{}finally{}这一套,在try代码块里获取锁,在finally块中释放锁。

创建ReentrantLock对象时,如果使用的是无参构造方法,则默认创建非公平锁(NonfairSync),如果调用的是ReentrantLock(boolean fair)有参构造方法,fair为true则创建公平锁(FairSync)。

public class ReentrantLock implements Lock, java.io.Serializable {
	//...
	//默认创建非公平锁
	public ReentrantLock() {
        sync = new NonfairSync();
    }
	
	//根据参数指定创建公平锁或非公平锁,true为公平锁。
	public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
	//...
}

 

之前说过,当有多个线程竞争锁时,获取锁失败的线程,会形成一个队列。如果有多个线程竞争公平锁时,会优先把锁分配给等待锁时间最长的线程,即队头的线程,队列中越往后的线程等待锁的时间越短,排在队尾的线程等待时间最短。如果使用的是非公平锁,则不保证会按照等待时长顺序将锁分配。在多线程的场景下,公平锁在吞吐量方面的表现不如非公平锁,但两者在获得锁和保证不饥饿的差异并不大。

需要注意的是,公平锁不能保证线程调度的公平性,竞争公平锁的多个线程中,可能会出现一个线程连续多次获得锁的情况。比如:Thread-1、Thread-2都要竞争同一个锁(lock),但此时锁已经被其他线程占据,Thread-1、Thread-2竞争失败,预备进入等待队列,这时Thread-1、Thread-2的CPU时间片消耗完毕被挂起,而其他线程刚好释放锁将锁变为无主状态,此时Thread-3抢锁成功,并调用下面的doThread3()方法,连续10次获取锁并释放锁将锁变为无主状态。这种情况,就是上面说的公平锁无法保证线程调度的公平性,按照顺序,Thread-3在Thread-1、Thread-2竞争失败后才开始竞争,按理锁的分配顺序应该是Thread-1->Thread-2->Thread-3,但由于线程的调度问题,Thread-1、Thread-2尚未入队,而锁被释放后刚好被Thread-3“捡漏”

    public void methodA() {
        try {
            lock.lock();
            //methodA body...
        } finally {
            lock.unlock();
        }
    }

    public void doThread3() {
        for (int i = 0; i < 10; i++) {
            methodA();
        }
    }

  

除了调用ReentrantLock.lock()以阻塞的方式直到获取锁,ReentrantLock还提供了tryLock()和tryLock(long timeout, TimeUnit unit)两个方法来抢锁。我们看下面的代码,相信很多同学看到这两个方法后也能知道这两个方法和lock()方法的区别,tryLock()会尝试竞争锁,如果锁已被其他线程占用,则竞争失败,返回false,如果竞争成功,则返回true。tryLock(long timeout, TimeUnit unit)如果竞争锁失败后,会先进入等待队列,如果在过期前能竞争到锁,则返回true,如果在过期时间内都无法抢到锁,则返回false。

    public void methodD() {
        boolean hasLock = false;
        try {
            hasLock = lock.tryLock();//<1>非计时
            if (!hasLock) {//没有抢到锁则退出
                return;
            }
            //methodD body...
        } finally {
            if (hasLock) {
                lock.unlock();
            }
        }
    }

    public void methodE() {
        boolean hasLock = false;
        try {
            hasLock = lock.tryLock(5, TimeUnit.SECONDS);//<2>计时
            if (!hasLock) {//没有抢到锁则退出
                return;
            }
            //methodE body...
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (hasLock) {
                lock.unlock();
            }
        }
    }

  

需要注意的是:不管是公平锁还是非公平锁,不计时tryLock()都不能保证公平性,如果锁可用,即时其他线程正在等待锁,也会抢锁成功。

ReentrantLock内部会用一个int字段来标识锁的引用次数,因此,ReentrantLock虽然作为可重入锁,但它的最大可重入次数为2147483647(即:MaxInt32,2^31-1),不管我们是以递归或者是循环亦或者其他方式,一旦我们重复获取锁的次数超过这个次数,ReentrantLock就会抛出异常。

至此,我们了解了ReentrantLock的简单应用。下面,就请大家一起跟随笔者了解ReentrantLock的实现原理。下面的代码是笔者从ReentrantLock节选的部分代码,可以看到先前我们调用加锁(lock、lockInterruptibly、tryLock)、解锁(unlock)的代码,最后都会调用sync对象的方法,sync对象的类型是一个抽象类,在我们创建ReentrantLock对象时,会根据构造函数决定sync是公平锁(FairSync),还是非公平锁(NonfairSync),FairSync和NonfairSync都继承自Sync,所以ReentrantLock在创建好具体的Sync对象后,便不再管关心公平锁的逻辑或者是非公平锁的逻辑,ReentrantLock只知道抽象类Sync实现了它所需要的功能,这个功能是公平亦或是非公平,由具体的实现子类来关心。

public class ReentrantLock implements Lock, java.io.Serializable {
	//...
	private final Sync sync;
	
	abstract static class Sync extends AbstractQueuedSynchronizer {//...}
	
	static final class NonfairSync extends Sync {//...}
	
	static final class FairSync extends Sync {//...}
	
	public ReentrantLock() {
        sync = new NonfairSync();
    }
	
	public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
	
	public void lock() {
        sync.acquire(1);
    }
	
	public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
	
	public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
	
	public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
	
	public void unlock() {
        sync.release(1);
    }
	//...

}

  

鉴于ReentrantLock的无参构造函数是创建一个非公平锁,可见官方更倾向于我们使用非公平锁,这里,我们就先从非公平锁开始介绍。

当ReentrantLock为非公平锁时,调用lock()方法会直接调用sync.acquire(1),NonfairSync和Sync两个类都没有实现acquire(int arg),这个方法是由AbstractQueuedSynchronizer(抽象队列同步器,下面简称:AQS)实现的,也就是Sync的父类。

当线程竞争锁时,会先调用tryAcquire(arg)方法试图占有锁,AQS将tryAcquire(int arg)的实现交由子类,由子类决定是以公平还是非公平的方式占有锁,如果竞争成功tryAcquire(arg)则返回true,!tryAcquire(arg)的结果为false,于是就不会再调用<1>处后续的判断,直接返回。如果占有锁失败,这里会先调用addWaiter(Node mode)方法,将当前调用线程封装成一个Node对象,再调用acquireQueued(final Node node, int arg)将Node对象加入到等待队列中,并使线程陷入阻塞。

//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//<1>
            selfInterrupt();
    }

//AbstractQueuedSynchronizer将tryAcquire(int arg)的实现交由子类
//java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

  

我们先来看NonfairSync实现的tryAcquire(int acquires)方法,这里NonfairSync也是调用其父类Sync的nonfairTryAcquire(int acquires)方法。在AQS内部会维护一个volatile int state,可重入互斥锁会用这个字段存储占有锁的线程对锁的引用计数,即重复获取锁的次数。如果state为0,代表锁目前没有被任何线程占有,这里会用CAS的方式设置锁的引用计数,如果设置成功,则执行<2>处的代码将独占线程(exclusiveOwnerThread)的引用指向当前调用线程,然后返回true表示加锁成功。

如果当前state不为0,代表有线程正独占此锁,会在<3>处判断当前线程是否是独占线程,如果是的话则在<4>处增加锁的引用计数,这里同样是修改state的值,但不需要像<1>处那样用CAS的方式,因为<4>处的代码只有独占线程才可以执行,其他线程都无法执行。需要注意的一点是,state为int类型,最大值为:2^31-1,如果超过这个值state就会变为负数,就会报错。如果一个线程在竞争锁的时候,发现state不为0,且当前线程不是独占线程,则会返回false,表示抢锁失败。

//当调用AQS的acquire(int arg)时,会先调用由子类实现的tryAcquire(int acquires)方法
//java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
	protected final boolean tryAcquire(int acquires) {
			//这里会调用父类Sync的nonfairTryAcquire(int acquires)方法
            return nonfairTryAcquire(acquires);
        }
//java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
        final boolean nonfairTryAcquire(int acquires) {
			//获取当前线程对象
            final Thread current = Thread.currentThread();
			//这里会获取父类AQS的state字段,在可重入互斥锁里,state表示占有锁的线程的引用计数
            int c = getState();
			//如果state为0,表示目前锁是无主状态
            if (c == 0) {
				//如果锁处于无主状态,则用CAS修改state,如果修改成功,表示占有锁成功
                if (compareAndSetState(0, acquires)) {//<1>
					//占有锁成功后,这里会设置锁的独占线程
                    setExclusiveOwnerThread(current);//<2>
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//<3>如果state不为0,代表现在有线程占据锁,如果请求锁的线程和独占线程是同一个线程,则增加当前线程对锁的引用计数
				//锁的最大可重入次数为(2^31-1),超过这个最大范围,int就会变为负数,判断nextc为负数时报错。
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
				//重新设置state的值
                setState(nextc);//<4>
                return true;
            }
            return false;
        }

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	//在可重入互斥锁中,state代表独占线程当前的重入次数
	private volatile int state;
	
	protected final int getState() {
        return state;
    }
	
    protected final void setState(int newState) {
        state = newState;
    }
	//...
}

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	//独占线程,当有线程占据可重入互斥锁时,会用此字段存储占有锁的线程
	private transient Thread exclusiveOwnerThread;
	
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
	
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

  

按照AbstractQueuedSynchronizer.acquire(int arg)的逻辑,如果抢锁失败,会继而执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这段代码。这里我们需要先来了解下Node的数据结构,Node类是AQS的一个静态内部类。如果眼尖的同学看到下面的prev和next,一定能很快猜出这就是我们先前所说的等待队列,等待队列实质上是一个双端链表,即每个节点都可以知道自己的前驱,也可以知道自己的后继。

//java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
    static final class Node {
		static final Node SHARED = new Node();
		static final Node EXCLUSIVE = null;
		static final int CANCELLED =  1;
		static final int SIGNAL    = -1;
		//...
		volatile int waitStatus;
		volatile Node prev;
		volatile Node next;
		volatile Thread thread;
		Node nextWaiter;
		//...
		//返回当前节点的前驱节点
        final Node predecessor() {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {}
		//...
		//创建Node节点
		Node(Node nextWaiter) {//<1>
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
	}

    

这里简单介绍下Node的字段:

  • prev指向当前节点的前驱节点,next指向当前节点的后继节点。
  • thread字段在调用<1>处的构造方法时,会将thread指向当前调用线程的Thread对象。
  • waitStatus(等待状态)初始值为0,当waitStatus为SIGNAL(-1)时,表示当前节点的后继节点所指向的线程(node.next.thread)陷入阻塞,当前节点如果被移除(CANCELLED)或在占有锁后要释放锁的时候,需要唤醒后继节点的线程。这里有多种可能导致当前节点的等待状态变为移除,比如调用tryLock(long timeout, TimeUnit unit) 超时会获取到锁,或者调用lockInterruptibly()后线程被中断。
  • nextWaiter可以用来表示一个节点的线程到底是独占线程(EXCLUSIVE)还是共享线程(SHARED),独占线程一般用于可重入互斥锁(ReentrantLock)或者可重入读写锁(ReentrantReadWriteLock )的写锁,而共享线程则表示当前线程是可以和其他共享线程一起共享资源的,一般用于可重入读写锁的读锁。

如果对上面Node字段还有不理解的地方不用心急,笔者在后面还会和大家一起深入了解这几个字段。

在简单了解了Node的数据结构后,我们来看看AQS是如何将一个线程封装成一个Node对象,并将其加入到等待队列。addWaiter(Node mode)会根据传入的参数node,决定创建的节点是独占节点还是共享节点,先前ReentrantLock传入的是Node.EXCLUSIVE,所以这里是独占节点,在执行完<1>处的代码后,节点创建完毕,节点的thread字段也保存了当前线程对象的引用。之后会进入<2>处的循环,这里是通过CAS自旋的方式将节点加入到等待队列,之所以用这种方式是因为可能存在多个线程同时要入队的情况,用CAS自旋保证每个节点的前驱和后继的有序性。当节点要入队时,会先获取尾节点,如果在<3>处判断尾节点不为null,则将当前节点的前驱指向尾节点,并用CAS的方式设置当前节点为设置为尾节点,如果原先的尾节点(oldTail)的指向没有被任何线程修改,这里用CAS将当前节点设置成尾节点就会成功,于是原先尾节点的后继指向当前节点,当前节点入队成功。但我们也要考虑尾节点为null的情况,即第一个进入等待队列的节点,此时头节点(header)和尾节点(tail)都为null,这里就会执行<4>处的分支,进行队列初始化。初始化队列的时候,同样存在并发问题,所以这里依旧用CAS初始化头节点成功,再将头节点指向的Node对象赋值给尾节点。初始化队列完毕后,会再开始新的一轮循环,用CAS的方式尝试将节点入队,入队成功后,则返回当前节点。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    private transient volatile Node head;//等待队列的头节点
	private transient volatile Node tail;//等待队列的尾节点
	//...
    private Node addWaiter(Node mode) {
		//为竞争锁的线程创建一个Node对象,并用Node.thread字段存储调用线程Thread对象
        Node node = new Node(mode);//<1>

        for (;;) {//<2>
            Node oldTail = tail;
            if (oldTail != null) {//<3>
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {//<4>
                initializeSyncQueue();
            }
        }
    }

    private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
	
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }
	//...
}

  

在执行完addWaiter(Node.EXCLUSIVE)确定节点入队后,就要将返回节点传入到方法:acquireQueued(final Node node, int arg)。之前我们说过,抢锁失败的节点会进入一个等待队列,等待锁的分配,我们已经在addWaiter(Node mode)看到线程是如何入队的,那接下来就要看看线程是如何等待锁的分配。在看acquireQueued(final Node node, int arg)之前,我们先来思考下如果是我们自己会如何设计将锁分配给线程?最简单的做法是每个线程都在一个死循环中去轮询锁的状态,如果发现锁处于无主状态并抢锁成功,线程则跳出循环访问资源。但这个做法有个缺点就是会消耗CPU时间片,尤其对于一些优先级不高的线程,相比于优先级高的线程它们可能永远无法竞争到锁,永远访问不到资源处于饥饿状态。那么有没有相比死循环更好的做法呢?我们是否可以先把一个入队的线程阻塞起来,先让它不要消耗宝贵的CPU时间片,当占据锁的线程完全释放锁(state变为0)时,则去唤醒队列中等待时长最长的线程,这样也不用担心优先级低的线程无法与优先级高的线程竞争锁,导致处于饥饿状态,一举两得。

这里我们还要再加深下对等待队列Node的理解才能往下看acquireQueued(final Node node, int arg),大家思考下,Node中的thread字段是用来指向竞争锁的线程对象,通过这个对象,我们可以用释放锁的线程唤醒等待锁的线程,占用锁的线程在完全释放锁将锁变为无主状态后,唤醒等待锁的线程,这个等待锁的线程如果成功占据了锁,是否可以将本身线程中Node.thread置为null?此刻线程已经占据了锁,它不会再陷入阻塞,也不需要有其他的线程来唤醒自身。所以等待队列的头节点的thread(header.thread)字段永远为null,因为锁被头节点的线程所占用。

当然,也可能出现锁被占用但头节点(header)本身就为null,这种情况一般出现在我们初始化好一个ReentrantLock后,只有一个线程占有了锁,此时调用tryAcquire(int acquires)会调用ReentrantLock.Sync.nonfairTryAcquire(int acquires)方法,这个方法只会简单修改state状态,并不会新增一个头节点。除非锁已有线程占据,且出现新的线程竞争锁,这时候新的线程在进入等待队列的时候,会初始化队列,为本身占据锁的线程补上一个头节点,初始化队列的时候调用的是Node的无参构造方法,所以头节点的thread字段为null,表示锁被当前头节点原先指向的线程所占据。

在了解这些基本知识后,下面我们终于可以来看看大家迫不及待的acquireQueued(final Node node, int arg)了。当把封装了当前线程的Node对象传入到acquireQueued(final Node node, int arg)方法时,并不会立即阻塞当前线程等待其他线程唤醒。这里会先在<1>处获取当前节点的前驱节点p,判断p是不是头节点,如果p是头节点,则当前线程即有占有锁的可能。因为占据锁的线程会先释放锁,再通知队列中的线程抢锁。所以会存在当前节点入队前锁已被释放的情况,于是判断前驱节点p是头节点,会再调用tryAcquire(int acquires)方法抢锁,如果抢锁成功,就可以按照我们上面所说的套路,调用setHead(Node node)将当前节点设置为头节点,设置当前节点的线程引用为null,然后返回。

如果当前节点的前驱节点不是头节点,这里就要调用shouldParkAfterFailedAcquire(Node pred, Node node)设置前驱节点的等待状态(waitStatus),先前说过,这个等待状态可以用来表示下个节点的阻塞状态。假设有一个锁已经被其他线程占有,Thread-1、Thread-2要来抢锁,此时必然是抢锁失败的,这里会把Thread-1、Thread-2分别封装成Node1和Node2并进行入队,Node1和Node2初始的等待状态都为0,假定Node1先Node2入队,Node1为Node2的前驱节点(即:Node2.prev=Node1),Node1不是头节点,所以不会去抢锁,这里直接进入<2>处分支的shouldParkAfterFailedAcquire(Node pred, Node node)方法,Node1的初始等待状态为0,所以<3>处和<5>处的分支是进不去的,只能进入<4>处的分支,将Node1的等待状态设置为SIGNAL,表示Node1的后继节点处于等待唤醒状态,然后返回false,于是<2>处的判断不成立,又开始新的一轮循环,假定头节点的线程依旧没释放锁,Node1依旧不是头节点,还是直接执行shouldParkAfterFailedAcquire(Node pred, Node node)方法,此时判断Node2的前驱节点Node1的等待状态为-1,表示可以阻塞Node1后继节点Node2所指向的线程,所以这里会返回true,进入<2>处的分支,调用parkAndCheckInterrupt()方法,在这个方法中会调用LockSupport.park(Object blocker)阻塞当前的调用线程,直到有其他线程调用LockSupport.unpark(Node2.thread)唤醒Node2被阻塞的线程,或Node2.thread被中断才会退出parkAndCheckInterrupt()。我们注意到在<5>处有一个判断,前驱节点的等待状态>0,一般状态为CANCELLED(1),表示前驱节点被移除。之所以会存在被移除的节点,是因为我们可能以tryLock(long timeout, TimeUnit unit)的方式往等待队列中添加节点,如果超时还未获得锁,这个节点就要被移除;我们还可能用lockInterruptibly()的方式往等待队列中添加节点,如果节点所对应的线程被中断,这个节点也处于被移除状态。所以<5>处如果发现前驱节点的等待状态大于0,会一直往前驱节点遍历直到找到等待状态<=0的节点将其作为前驱节点,并将前驱节点的后继指向当前节点。要注意的是,等待状态为-1时,代表当前节点的后继节点等待唤醒,>0的时候,代表当前节点被移除,前者的状态与后继节点有关,后者的状态仅与自身有关。如果在自旋期间线程出现其他异常,则会调用<6>处的代码将节点从等待队列移除,并抛出异常。cancelAcquire(Node node)会在后面介绍,这里我们只要先知道这是一个将节点从队列中移除的方法。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    private transient volatile Node head;
	//...
	final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();//<1>
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))//<2>
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);//<6>
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
	//...
	//设置当前节点为头节点,此时可以清空头节点指向的线程引用
	private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
	//...
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)//<3>
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {//<5>
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {//<4>
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
	//...
	private final boolean parkAndCheckInterrupt() {
		//阻塞调用线程,可调用LockSupport.unpark(Thread thread)唤醒或由线程中断唤醒。
        LockSupport.park(this);
		//返回线程是否由中断唤醒,返回true为被中断唤醒,但此方法会清除线程的中断标记
        return Thread.interrupted();
    }
	//...
}

  

能从boolean acquireQueued(final Node node, int arg)方法中返回的线程,都是成功占有锁的线程,但返回结果分当前线程是否被中断,true为被中断。可能存在这样一种情况,前一个线程释放锁完毕后,即将唤醒后一个线程,此时后一个线程被中断唤醒,后一个线程发现其Node节点的前驱节点为头节点,且锁为无主状态,于是抢锁成功直接返回。这里要标记线程的中断状态interrupted,因为线程会从parkAndCheckInterrupt()中被唤醒,最后会执行Thread.interrupted()返回当前线程是否由中断唤醒,但Thread.interrupted()会清除中断标记,所以在占据锁之后会根据返回的interrupted状态,决定是否设置线程的中断状态。如果一个线程在调用acquireQueued(final Node node, int arg)方法的后都未被中断,直到前一个线程调用LockSupport.unpark(Thread thread)唤醒该线程,那么这个线程就不是用中断的形式唤醒,也就不用设置线程的中断状态。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
			//根据acquireQueued()的返回,决定是否设置线程的中断标记
            selfInterrupt();
    }
	//...
	static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
	//...
}

  

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议 转载请注明原文链接: https://www.cnblogs.com/beiluowuzheng/p/14948225.html

Java并发之ReentrantLock源码解析(二)-编程思维

在了解如何加锁时候,我们再来了解如何解锁。可重入互斥锁ReentrantLock的解锁方法unlock()并不区分是公平锁还是非公平锁,Sync类并没有实现release(int arg)方法,这里会实现调用其父类AbstractQueuedSynchronizer的release(int arg)方法。在releas

Java并发之ReentrantLock源码解析(三)-编程思维

ReentrantLock和BlockingQueue 首先,看到这个标题,不要怀疑自己进错文章,也不要怀疑笔者写错,哈哈。本章笔者会从BlockingQueue(阻塞队列)的角度,看看juc包下的阻塞队列是如何使用ReentrantLock。这个章节笔者会介绍部分阻塞队列的源码,但不会着墨过多,我们的重点依旧在Ree

Java并发之ReentrantLock源码解析(四)-编程思维

Condition 在上一章中,我们大概了解了Condition的使用,下面我们来看看Condition再juc的实现。juc下Condition本质上是一个接口,它只定义了这个接口的使用方式,具体的实现其实是交由子类完成。 public interface Condition { void await() thr

Java并发之ThreadPoolExecutor源码解析(三)-编程思维

Worker 先前,笔者讲解到ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core),在这个方法中工作线程可能创建成功,也可能创建失败,具体视线程池的边界条件,以及当前内存情况而定。 那么,如果线程池当前的状态,是允许创建Worker对象的,那么创建

Java并发之ThreadPoolExecutor源码解析(二)-编程思维

ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一种实现,可以用若干已经池化的线程执行被提交的任务。使用线程池可以帮助我们限定和整合程序资源,尽可能避免创建新的线程来执行任务从而降低任务调用的开销,在执行大量异步任务的时候反而能获得更好的性能。此外,Threa

线程池源码解析-编程思维

1.创建线程池相关参数 线程池的创建要用ThreadPoolExecutor类的构造方法自定义创建,禁止用Executors的静态方法创建线程池,防止内存溢出和创建过多线程消耗资源。 corePoolSize: 线程池核心线程数量,不会自动销毁,除非设置了参数allowCoreThreadTimeOut=true,那么