Java并发之ReentrantLock源码解析(四)-编程思维

Condition

在上一章中,我们大概了解了Condition的使用,下面我们来看看Condition再juc的实现。juc下Condition本质上是一个接口,它只定义了这个接口的使用方式,具体的实现其实是交由子类完成。

public interface Condition {
	void await() throws InterruptedException;
	void awaitUninterruptibly();
	long awaitNanos(long nanosTimeout) throws InterruptedException;
	boolean await(long time, TimeUnit unit) throws InterruptedException;
	boolean awaitUntil(Date deadline) throws InterruptedException;
	void signal();
	void signalAll();
}

  

相信这里不需要笔者对await()、signal()、signalAll()这三个方法做介绍了,甚至有的人看到其他方法也知道这些方法的作用。这里笔者就简单介绍下:

  • awaitUninterruptibly():释放锁后并挂起当前线程,此方法会忽略中断,直到收到条件成立的信号被唤醒,执行条件成立的代码。
  • awaitNanos(long nanosTimeout):传入一个时间,单位为:纳秒。如果返回的值>0,则代表等待线程在既定时间内获得条件成立的信号,可以执行条件成立之后的代码;如果小于等于0,则表示线程在既定时间内没有收到条件成立的信号,不能执行条件成立的代码。
  • await(long time, TimeUnit unit)和awaitUntil(Date deadline)类似,也是限定一个超时时间,在超时时间内如果收到条件成立的信号,则返回true执行条件成立之后的代码,否则不能执行条件成立后的代码。

下面,我们来看看Condition接口在juc包下的具体实现,在juc包下,其实有两个内部类分别实现了Condition接口,第一个还是我们的老朋友:AbstractQueuedSynchronizer.ConditionObject,第二个是AbstractQueuedLongSynchronizer.ConditionObject,这两个内部类的实现十分类似,甚至可以时候是一模一样,这里笔者介绍AbstractQueuedSynchronizer.ConditionObject的实现。

我们先来看await()方法,看看在这个方法中是如何释放锁在阻塞线程的,以及当线程被唤醒后又是如何重新抢锁。当线程进入await()后,会先在<1>处创建一个封装了线程本身Thread对象的Node节点,然后在<2>处释放线程占有的锁。如果有多个线程都进入同一条件对象的await()方法,每个线程都会有一个对应的Node节点,并且这些Node节点会形成一个队列。线程在释放锁后会进入<3>处陷入阻塞,直到收到条件成立的信号或者线程中断的信号,于是线程从<3>处被唤醒并跳出循环。在<4>处我们又看到我们的老朋友acquireQueued(final Node node, long arg),如果记性好的朋友会知道这个方法也可以阻塞线程,直到线程抢到了锁。如此,await方法便完成了类似wait()方法的工作。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    public class ConditionObject implements Condition, java.io.Serializable {

	    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();//<1>
            long savedState = fullyRelease(node);//<2>
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);//<3>
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//<4>
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
	}
	//...
}

  

当然,上面的await()方法远不止笔者所说的那么简单,刚才笔者所说的知识一个大概的脉络,其中还有很多细节值得深究。下面就请大家跟笔者一起逐行逐句地了解await()的原理。

在await()开始,我们可以看到在ConditionObject类本身,会维护两个字段firstWaiter和lastWaiter,通过这两个字段可以针对某一条件对象形成一个等待队列。当有线程调用了addConditionWaiter()方法时,会先在<1>处当前线程是否是独占锁的线程,如果不是则抛异常,是的话则继续执行,之后会获取条件对象的尾节点,如果尾节点不为空,且尾节点的状态不为Node.CONDITION,这里会判断该条件节点已经不是有效的条件节点,会调用<2>处的unlinkCancelledWaiters()将条件节点从队列中移除,然后再重新获取尾节点。之后会为调用线程创建一个Node节点,等待状态为CONDITION(-2),表明这是一个条件节点,如果尾节点为空,代表当前队列中没有节点,于是执行<3>处的代码把当前节点赋值给头节点(firstWaiter),如果尾节点不为空,则将当前节点赋值给原先尾节点的nextWaiter,之后将当前节点赋值给尾节点。由于这段代码是只有占有锁的线程才可以执行的,所以这段代码是线程安全的。

下面,我们再来看看unlinkCancelledWaiters()是如何将一个状态非等待的节点从队列中移除,在条件队列中,只要状态不等于CONDITION,就是非条件节点。所有进入条件队列的节点最开始都是条件节点,但可能因为各种原因从条件节点变成非条件节点,所以要从队列中移除,至于是哪些原因笔者会在后面细说,这里只要知道存在条件节点会变成非条件节点的情况,一旦出现这种情况就要把非条件节点从条件队列移除就行了。这和原先AQS的节点有些类似,也有些许差别,原先的AQS认为只要节点的等待状态>0,就是一个失效节点,就要从队列中移除。

当要从条件队列中移除非条件节点,会从头节点开始循环遍历,next指向当前节点的下一个节点,用于下一轮判断是否有条件节点。如果遍历到的节点是条件节点,则会执行<5>处的代码将当前节点赋值给trail,所以trail永远指向遍历过程中最靠后的条件节点。如果当前节点不是条件节点,则会清空当前节点的nextWaiter便于GC,同时检查最靠近当前节点的有效节点trail是否为null,为null代表从头结点到当前节点都不是有效节点,于是把下一个节点next赋值给头节点,否则将最靠后的条件节点trail.nextWaiter指向下一个节点,一直到next为null,跳出循环。如果从头节点到尾节点都不是条件节点,trail则永远为null,而执行<4>处的分支到最后,firstWaiter和lastWaiter会都为null,条件队列中没有任何节点。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	private transient volatile Node head;
	private transient volatile Node tail;
	//...
	static final class Node {
		static final Node SHARED = new Node();
		static final Node EXCLUSIVE = null;
		static final int CANCELLED =  1;//节点状态为取消
		static final int SIGNAL    = -1;//当前节点的后继节点等待唤醒
		static final int CONDITION = -2;//节点等待条件成立
		volatile int waitStatus;//等待状态
		volatile Thread thread;//节点指向线程对象
		/**
		 * 一般用于连接到下一个条件节点,或者用特殊值Node.SHARED
		 * 表明这是一个共享节点。
		 */
		Node nextWaiter;
		//...
		Node(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }
		//...
	}
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
		private transient Node firstWaiter;
		private transient Node lastWaiter;
		//...
		private Node addConditionWaiter() {
			if (!isHeldExclusively())//<1>
				throw new IllegalMonitorStateException();
			Node t = lastWaiter;
			// If lastWaiter is cancelled, clean out.
			if (t != null && t.waitStatus != Node.CONDITION) {
				unlinkCancelledWaiters();//<2>
				t = lastWaiter;
			}

			Node node = new Node(Node.CONDITION);

			if (t == null)
				firstWaiter = node;//<3>
			else
				t.nextWaiter = node;
			lastWaiter = node;
			return node;
		}
		//...
		private void unlinkCancelledWaiters() {
			Node t = firstWaiter;
			Node trail = null;
			while (t != null) {
				Node next = t.nextWaiter;
				if (t.waitStatus != Node.CONDITION) {//<4>
					t.nextWaiter = null;
					if (trail == null)
						firstWaiter = next;
					else
						trail.nextWaiter = next;
					if (next == null)
						lastWaiter = trail;
				}
				else
					trail = t;//<5>
				t = next;
			}
		}
	}
	//...
}

  

在把线程封装为Node节点并加入条件队列后,await()还会调用fullyRelease(Node node)释放锁,可以看到这里先调用getState(),获取锁的引用计数,然后再调用release(int arg)清空锁的引用计数,如果不是占用锁的线程调用此方法,则release(int arg)会返回false,这里会抛出IllegalMonitorStateException异常,如果释放锁成功,则返回原先的引用计数savedState。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
		//...
	}
	//...
}

    

之后会判断当前节点是否在同步队列,即节点是否在AQS的header、tail的队列中,这里肯定是不在的,因为在下面代码的<1>处就可以判定当前节点的等待状态是CONDITION,且节点没有前驱节点,所以isOnSyncQueue(Node node)将返回false,!isOnSyncQueue(node)为true,这里会进入<2>处的循环将当前线程陷入阻塞。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)//<1>
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {//<2>
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
		//...
	}
	//...
}

  

既然当前线程已经阻塞,那我们就要想办法唤醒线程,让线程继续往下执行,我们都知道有两个办法可以唤醒阻塞在await()的线程,一种是调用signal()方法,一种是中断线程。我们先从signal()开始讲起。

当我们调用signal()时,依旧先判断调用线程现在是否占有了锁,如果不是占有锁的线程调用此方法则抛出IllegalMonitorStateException异常,如果线程为锁的独占线程,才接着往下执行。之后获取条件队列的头节点(firstWaiter),头节点非空的情况下将头节点传入到doSignal(Node first)方法中。

观察doSignal(Node first)这个方法,我们知道第一次传入的first一定会执行一次transferForSignal(Node node),这个方法的返回结果决定doSignal(Node first)方法体内的循环能否继续。我们先来看看transferForSignal(Node node)这个方法。

如果我们传入transferForSignal(Node node)的节点是一个条件节点,即等待状态为CONDITION,那么<1>处的结果一定为true,此时原先的条件节点变为非条件节点了,换句话说,这个节点应该从条件队列中移除。之后执行<2>处的代码将节点进入到同步队列,即AQS的队列,如果入队成功,会在<3>处返回当前节点的前驱节点。之后判断节点的前驱节点状态是否失效,或者前驱节点的等待状态能否用CAS的方式改为SIGNAL,如果失效或者CAS修改失败,则唤醒节点的线程,最后返回true。

如果是按照这样的流程执行,那么transferForSignal(first)的返回未true,(!transferForSignal(first) && (first = firstWaiter) != null)将无法成立,退出循环。此外我们注意到doSignal(Node first)方法中在每一次执行循环时都会把原先头节点(first)的下一个节点(first.nextWaiter)赋值给等待对象的头节点(firstWaiter),不管是transferForSignal(Node node)的返回是true或者false,都会移除原先的头节点,如果我们传入条件队列的节点已经不是条件节点,且队列中还有别的节点,(!transferForSignal(first) && (first = firstWaiter) != null)就会成立,直到遇到队列中最靠前的条件节点,或者队列中已无节点。在循环的时候,如果发现头节点(firstWaiter)为空,表示队列中没有多余的节点,也会置空尾节点(firstWaiter),同时每次循环时都会置空原先头节点(first)的nextWaiter的引用,方便垃圾回收。

所以总结一下doSignal(Node first)的作用,就是从条件队列的头节点开始,判断节点是否是条件节点,如果是的话则修改其等待状态,并将其放入AQS的同步队列中,并从条件队列移除该节点。如果头节点不是条件节点,则会一边遍历一边移除非条件的节点,直到遇到一个条件节点,或者条件队列中没有节点。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
		//如果传入的节点的等待状态不是CONDITION,代表节点已经不是条件节点,返回false
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))//<1>
            return false;
		
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
		//如果传入的节点是条件节点,到这里节点的状态会由CONDITION改为0,将原先的条件节点转化为非条件节点,并进入AQS的同步队列
        Node p = enq(node);//<2>
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))//<4>
            LockSupport.unpark(node.thread);
        return true;
    }
	//...
	private Node enq(Node node) {
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;//<3>
                }
            } else {
                initializeSyncQueue();
            }
        }
    }
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
		//...
		private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
		//...
	}
	//...
}

  

调用signal()方法将条件节点从条件队列移除放入AQS同步队列后,当先线程释放锁,唤醒同步队列头节点下一个节点的线程,于是原先调用await()陷入阻塞的线程会从循环<1>处的LockSupport.park(this)唤醒,接着执行checkInterruptWhileWaiting(node)方法,如果结果不为0则跳出循环,如果结果为0,则判断isOnSyncQueue(node)条件是否成立,如果isOnSyncQueue(node)范围结果为true则跳出循环,否则继续循环。

那么我们先来看看阻塞线程被唤醒后第一个执行的方法checkInterruptWhileWaiting(node),这个方法也简单,仅仅是判断线程是否是被中断的,如果不是则返回0,我们的线程不是中断唤醒的,这里的返回结果一定是0,所以不能进入<2>处的分支跳出循环。之后我们判断isOnSyncQueue(node)的条件是否成立,首先节点的等待状态不是CONDITION,其次当前节点进入同步队列后,一定会有前驱节点,所以<3>处的分支一定进不去。如果节点有后继节点,那么可以判定这个节点一定是同步队列里的节点,但我们的节点进入同步队列后,我们假定没有其他线程请求锁,因此我们的节点没有后继节点,这里进不去分支<4>。之后,我们就只能调用findNodeFromTail(node)从尾节点开始寻找当前节点,可以确定的是从同步队列的尾部开始遍历一定能找到当前节点,因为当前节点就是尾节点。所以isOnSyncQueue(node)最终会返回true,表示当前节点已经在同步队列,因此while (!isOnSyncQueue(node))判定为false,跳出循环。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		public final void await() throws InterruptedException {
			if (Thread.interrupted())
				throw new InterruptedException();
			Node node = addConditionWaiter();
			int savedState = fullyRelease(node);
			int interruptMode = 0;
			while (!isOnSyncQueue(node)) {//<1>
				LockSupport.park(this);
				if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//<2>
					break;
			}
			if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
				interruptMode = REINTERRUPT;
			if (node.nextWaiter != null) // clean up if cancelled
				unlinkCancelledWaiters();
			if (interruptMode != 0)
				reportInterruptAfterWait(interruptMode);
		}
		//...
		private int checkInterruptWhileWaiting(Node node) {
			return Thread.interrupted() ?
				(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
				0;
		}
	}
	//...
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)//<3>
            return false;
        if (node.next != null) // If has successor, it must be on queue <4>
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
	//...
    private boolean findNodeFromTail(Node node) {
        // We check for node first, since it's likely to be at or near tail.
        // tail is known to be non-null, so we could re-order to "save"
        // one null check, but we leave it this way to help the VM.
        for (Node p = tail;;) {//<5>
            if (p == node)
                return true;
            if (p == null)
                return false;
            p = p.prev;
        }
    }
	//...
}

  

在了解了如何通过signal()唤醒阻塞线程后,我们再来看看如果是中断阻塞线程的流程是怎么走的。阻塞线程被中断后,从LockSupport.park(this)被唤醒执行checkInterruptWhileWaiting(node)里面的逻辑,这里判断Thread.interrupted()结果为true,所以checkInterruptWhileWaiting(node)的返回结果还要根据transferAfterCancelledWait(node)决定是返回THROW_IE(-1)或是REINTERRUPT(1)。

当调用transferAfterCancelledWait(node)方法,如果能成功在<1>处用CAS的方式修改节点的等待状态从条件节点(CONDITION)改为0,则把当前节点放入同步队列,并返回true,(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT)将把THROW_IE作为结果返回,表示中断期间到节点入队,节点的状态都是条件节点。如果在<1>处节点的状态已经不是CONDITION,那么表示节点的状态已经被改了,这里不能确定是别的线程先于中断之前执行signal()导致节点的等待状态被改,亦或是当前线程先被中断,之后别的线程调用signal()导致节点的等待状态被改,一旦出现这种情况,则不能进入<1>处的分支,会继续往下走,在<2>处会循环判断当前节点是否进入同步队列,如果不在同步队列,则一直循环直到别的线程将当前节点放入同步队列,最后返回false,(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT)将把REINTERRUPT作为结果返回。但不管是返回THROW_IE或是REINTERRUPT,我们都可以进入分支<3>跳出循环。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
		private static final int REINTERRUPT =  1;
		private static final int THROW_IE    = -1;
		//...
		public final void await() throws InterruptedException {
			if (Thread.interrupted())
				throw new InterruptedException();
			Node node = addConditionWaiter();
			int savedState = fullyRelease(node);
			int interruptMode = 0;
			while (!isOnSyncQueue(node)) {
				LockSupport.park(this);
				if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//<3>
					break;
			}
			if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
				interruptMode = REINTERRUPT;
			if (node.nextWaiter != null) // clean up if cancelled
				unlinkCancelledWaiters();
			if (interruptMode != 0)
				reportInterruptAfterWait(interruptMode);
		}
		//...
		private int checkInterruptWhileWaiting(Node node) {
			return Thread.interrupted() ?
				(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
				0;
		}
	}
	//...
	final boolean transferAfterCancelledWait(Node node) {
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {//<1>
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))//<2>
            Thread.yield();
        return false;
    }
	//...
}

  

我们已经了解了阻塞线程被唤醒、阻塞线程被中断后是如何离开循环的。下面我们来看看用这两种方式离开循环的线程又会如何执行之后的代码,首先线程对应的节点已经从条件队列进入同步队里,所以<1>处会调用acquireQueued(node, savedState)尝试获取锁,如果竞争锁失败则阻塞当前线程,直到被唤醒重新开始新一轮的竞争,直到抢锁成功。并且acquireQueued(node, savedState)会返回当前线程是否被中断,如果是用signal()唤醒的线程,是没有中断标记的,抢锁成功从acquireQueued(node, savedState)退出后,返回结果为false,这里不会进入<1>处的分支,所以interruptMode为0。然后判断节点如果nextWaiter不为空,则清理条件队列,移除队列中非条件节点。由于interruptMode为0,这里不会进入<3>处的分支。

如果线程是被中断的,在抢锁成功退出acquireQueued(node, savedState)方法后,返回结果为true,如果判断interruptMode不为THROW_IE,代表当前线程无法判断中断是先signal()或者后signal()执行,于是标记中断模式interruptMode为REINTERRUPT,进入<3>处的分支执行reportInterruptAfterWait(int interruptMode)方法,设置当前线程的中断标记为true。如果interruptMode为THROW_IE,表示在中断期间到用CAS修改节点的等待状态为非条件成功,都没有别的线程修改节点状态,所以进入<3>处的分支执行reportInterruptAfterWait(int interruptMode)会抛出中断异常。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
	public class ConditionObject implements Condition, java.io.Serializable {
		//...
		private static final int REINTERRUPT =  1;
		private static final int THROW_IE    = -1;
		//...
		public final void await() throws InterruptedException {
			if (Thread.interrupted())
				throw new InterruptedException();
			Node node = addConditionWaiter();
			int savedState = fullyRelease(node);
			int interruptMode = 0;
			while (!isOnSyncQueue(node)) {
				LockSupport.park(this);
				if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
					break;
			}
			if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//<1>
				interruptMode = REINTERRUPT;
			if (node.nextWaiter != null) // clean up if cancelled <2>
				unlinkCancelledWaiters();
			if (interruptMode != 0)//<3>
				reportInterruptAfterWait(interruptMode);
		}
		//...
	}
	//...
	private void reportInterruptAfterWait(int interruptMode)
		throws InterruptedException {
		if (interruptMode == THROW_IE)
			throw new InterruptedException();
		else if (interruptMode == REINTERRUPT)
			selfInterrupt();
	}
	//...
}

  

下面,笔者再介绍下awaitNanos(long nanosTimeout)方法,在<1>处根据当前系统时间加上外部传入的等待时间nanosTimeout,算出过期时间。然后用initialNanos保存一下初始的等待时间,因为这个等待时间在<3>处的循环是有可能减少的,后面还需要初始的剩余时间。然后我们调用addConditionWaiter()方法将当前线程作为Node节点放入条件队列,再调用fullyRelease(node)释放当前线程对锁的占用,之后进入<3>处的循环,在这个循环中剩余等待时间nanosTimeout会不断减少,如果小于等于0则退出循环;如果循环的过程中判断剩余等待时间大于1000ns(SPIN_FOR_TIMEOUT_THRESHOLD)则选入计时阻塞,如果线程在阻塞期间内被中断,或者到达过期时间后线程被唤醒,则会执行之后的checkInterruptWhileWaiting(node),这里就不再对这个方法做介绍,先前介绍过这个方法,并且也讲解过中断和唤醒是如何离开循环的。
离开循环后,线程调用acquireQueued(node, savedState)重新抢锁,抢锁成功后根据中断模式interruptMode判断是要抛出异常,还是标记线程中断状态,亦或是无事发生。

如果线程是被唤醒或是重新标记线程中断状态,则会执行<5>处之后的代码,这里会根据过期时间和当前时间算出剩余时间remaining,一般这个剩余时间(remaining)都是小于等待时间(initialNanos),除非发生溢出。如果剩余时间小于等待时间,则返回剩余时间,否则出现溢出则返回Long.MIN_VALUE。

public class ConditionObject implements Condition, java.io.Serializable {
	//...
	static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
	//...
	public final long awaitNanos(long nanosTimeout)
			throws InterruptedException {
		if (Thread.interrupted())
			throw new InterruptedException();
		// We don't check for nanosTimeout <= 0L here, to allow
		// awaitNanos(0) as a way to "yield the lock".
		final long deadline = System.nanoTime() + nanosTimeout;//<1>
		long initialNanos = nanosTimeout;//<2>
		Node node = addConditionWaiter();
		int savedState = fullyRelease(node);
		int interruptMode = 0;
		while (!isOnSyncQueue(node)) {//<3>
			if (nanosTimeout <= 0L) {
				transferAfterCancelledWait(node);
				break;
			}
			if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
				LockSupport.parkNanos(this, nanosTimeout);
			if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
				break;
			nanosTimeout = deadline - System.nanoTime();//<4>
		}
		if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
			interruptMode = REINTERRUPT;
		if (node.nextWaiter != null)
			unlinkCancelledWaiters();
		if (interruptMode != 0)
			reportInterruptAfterWait(interruptMode);
		long remaining = deadline - System.nanoTime(); // avoid overflow<5>
		return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
	}
	//...
}

  

最后,笔者再介绍下signalAll()便结束了ReentrantLock源码解析的篇章。当然,这里并没有完全介绍完Condition在AQS的实现,还剩:awaitUninterruptibly()、await(long time, TimeUnit unit)、awaitUntil(Date deadline),但这些方法和awaitNanos(long nanosTimeout)都大同小异,笔者相信各位看官一定也能独立思考这几个方法。

下面,我们来看看signalAll(),其实这个方法只要有前面的基础知识,也能轻松理解,signalAll()和signal()的不同是,signal()调用transferForSignal(first)方法只要条件队列中有一个条件节点转化为非条件节点并进入同步队列,则会退出。而signalAll()会调用transferForSignal(first)方法把队列中所有节点传入并加入到同步队列,直到条件队列为空。

public class ConditionObject implements Condition, java.io.Serializable {
	//...
	public final void signalAll() {
		if (!isHeldExclusively())
			throw new IllegalMonitorStateException();
		Node first = firstWaiter;
		if (first != null)
			doSignalAll(first);
	}
	//...
	private void doSignalAll(Node first) {
		lastWaiter = firstWaiter = null;
		do {
			Node next = first.nextWaiter;
			first.nextWaiter = null;
			transferForSignal(first);
			first = next;
		} while (first != null);
	}
	//...
}

  

至此,我们算是了解了ReentrantLock的使用方式以及实现原理。不知道大家有没有这样的感受,本章与其说是在讲ReentrantLock,更像是在讲AQS,ReentrantLock的内部类只是实现了AQS要求的一些接口,有自己的特性而已。

事实上,笔者真正的目的也是带大家去了解AQS,在笔者看来AQS绝对可以说是juc包下的核心且没有之一,只是AQS太过复杂抽象,需要用多个维度去看待。所以笔者这里用ReentrantLock作为切入点,和大家一起学习ReentrantLock和AQS。

ReentrantLock到此就告一段落,在后面的章节,笔者还会和大家一起从更多的角度去看待AQS,我们下一篇文章见。

版权声明:本文版权归作者所有,遵循 CC 4.0 BY-SA 许可协议, 转载请注明原文链接
https://www.cnblogs.com/beiluowuzheng/p/14952922.html

Java并发之Semaphore源码解析(一)-编程思维

Semaphore 前情提要:在学习本章前,需要先了解笔者先前讲解过的ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫。下面,我们进入本章正题Semaphore。 从概念上来讲,信号量(Semaphore)会维护一组许可证用于限制线程对资源的访问,当我们有一资源允许线

Java并发之Semaphore源码解析(二)-编程思维

在上一章,我们学习了信号量(Semaphore)是如何请求许可证的,下面我们来看看要如何归还许可证。 可以看到当我们要归还许可证时,不论是调用release()或是release(int permits),都会调用AQS实现的releaseShared(int arg)方法。在releaseShared(int arg

Java并发之ReentrantLock源码解析(三)-编程思维

ReentrantLock和BlockingQueue 首先,看到这个标题,不要怀疑自己进错文章,也不要怀疑笔者写错,哈哈。本章笔者会从BlockingQueue(阻塞队列)的角度,看看juc包下的阻塞队列是如何使用ReentrantLock。这个章节笔者会介绍部分阻塞队列的源码,但不会着墨过多,我们的重点依旧在Ree

Java并发之ReentrantLock源码解析(二)-编程思维

在了解如何加锁时候,我们再来了解如何解锁。可重入互斥锁ReentrantLock的解锁方法unlock()并不区分是公平锁还是非公平锁,Sync类并没有实现release(int arg)方法,这里会实现调用其父类AbstractQueuedSynchronizer的release(int arg)方法。在releas

Java并发之ReentrantLock源码解析(一)-编程思维

ReentrantLock ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源。相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继