SourceCode Java Code

AQS源码分析

Posted on 2021-03-17,15 min read

AbstractQueuedSynchronizer

  • 注意:本文根据JDK8进行分析,后续版本的实现有一定变化。
  • 队列同步器使用了CLH锁(基于单向链表的自旋锁,申请加锁的线程通过前驱节点的变量进行自旋。在前置节点解锁后,当前节点会结束自旋,并进行加锁)。AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
    • CLH 中的头节点是一个虚拟的头节点,获取了锁的线程是不在队列中的
  • 队列同步器使用了模板方法模式,只需要重写指定的方法就可以实现同步的语义。AQS有两个内部类:ConditionObject和Node
  • 如果一个节点的前驱节点的状态被设置为了signal,那么下一个节点就能够被阻塞了,因为如果前驱节点为signal的时候,在其释放(release)的时候,会唤醒下一个节点。
  • 在AQS中,同步队列,如果一个线程获取到了同步状态,就会从同步队列中"移除"。(当前节点被设置为头节点,但是头节点的thread是为null的)
  • AQS存在着两个队列:一个是同步队列,一个是条件队列
    • 同步队列:双向队列
    • 条件队列:单向队列
  • 有几点需要注意:
    • 当在wait的时候被中断是需要抛出异常的
    • LockSupport调用park阻塞的时候从阻塞状态中恢复的原因有两个,所以需要额外判断。
      • 调用了unpark
      • 当前线程在park的时候被中断了。

源码

AQS方法

方法源码

获取独占同步状态

  • 流程:

    • 调用tryAcquire方法获取同步状态,如果获取失败,则调用addWaiter生成一个新的节点,并将其放在同步队列中。
    • 同步队列中,如果当前节点的前驱节点是头节点,就尝试获取同步状态,如果获取成功就将当前节点设为头节点然后返回。如果失败,则调用shouldParkAfterFailedAcquire来查看当前节点是否需要阻塞,如果需要阻塞就调用parkAndCheckInterrupt阻塞当前节点,并且检查中断。
    • shouldParkAfterFailedAcquire中,如果前驱节点为single,表明前驱节点已经被阻塞了,则当前节点也可以放心的阻塞。返回true表明可以阻塞。如果前驱节点已经被取消了,则将前驱节点移出队列。否者将当前节点的状态设置为single。
  • acquire:获取资源,独占模式

public final void acquire(int arg) {
    // 尝试获取,获取失败则生成一个生成一个新的节点添加到尾部,然后排他获取锁
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果进入了if,则代表acquireQueued方法被中断了,于是恢复终端状态
        selfInterrupt();
}
  • acquireQueued:尝试获取同步状态。如果不被阻塞,则代表当前线程争抢不激烈,就CAS获取同步状态。如果阻塞,并且在阻塞的过程中(如果是doAcquireInterruptibly的话,则在中断后会直接抛出异常。其他和acquireQueued差不多
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点为头节点,就尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                // 获取成功将当前节点设置为头节点然后返回。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 检测当前节点获取失败后是否应该阻塞,如果应该的话就阻塞并且检测中断状态。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // return前调用,进入此处则证明出现了异常。则取消当前尝试。
        if (failed)
            cancelAcquire(node);
    }
}
  • shouldParkAfterFailedAcquire:当获取同步状态失败的时候是否应该阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
		// 如果前驱节点为Single,则表明前驱节点已经被阻塞了。则当前节点就能够安全的被阻塞了。
        return true;
    if (ws > 0) {
		// 如果前驱节点已经被取消了,则将被取消的节点移出队列。然后返回false
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        // 将前驱节点设置为Signal,然后返回false。因为前驱节点为signal的时候,在其release的时候会通知后续节点唤醒。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • addWaiter:添加一个指定模式的节点,通过mode指定,并将其添加到队尾,Node.EXECUSIVENode.SHARED
private Node addWaiter(Node mode) {
    // 创建一个新的节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 尝试快速添加,如果失败则调用enq进行cas添加
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • Enq:cas添加到队尾,成功则返回当前节点的前驱节点
private Node enq(final Node node) {
    // cas添加到队尾,成功则返回当前节点的前驱节点
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化,一开始的时候可能为空。当尾部为空的时候,则代表头也为空
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

释放同步状态

  • release:释放同步状态
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒一个阻塞的线程,如果存在的话。
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • unparkSuccesser:唤醒一个阻塞中的节点,如果存在的话。
private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 默认唤醒下一个节点,如果下一个节点已经取消或者为空,则从尾部找到一个没有被取消的节点唤醒。
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

await等待

https://blog.csdn.net/kailuan2zhong/article/details/105391230

  • 大致流程:

    • 如果当前线程被中断了的话,就抛出异常。否则在条件队列中添加一个新的节点,添加新节点的同时去除非condition的节点,然后释放当前的同步状态。
    • 不断的轮询当前节点是否在同步队列中(判断同步队列是通过当前节点的状态和有没有前后节点来判断的),如果不在同步队列中就继续阻塞当前节点。同时还需要判断当前的线程是否中断。判断线程中断的时候同时还要判断是否调用了signal,如果调用了就恢复中断状态,否则抛出异常。(类似synchronized,如果在wait的时候中断线程是会抛出异常的)
      • 如果调用了signal将当前节点加入到了同步队列会跳出循环。
      • 如果当前线程因为中断而被加入到了同步队列中会跳出循环。
        • 在循环中线程被唤醒的原因有两个:1是因为调用了signal,在signal中调用了LockSupport的unpark方法导致线程被唤醒;2是因为在LockSupport的park途中线程被中断了,LockSupport会自动从阻塞状态中恢复过来。所以还需要额外的判断被唤醒的原因。
    • 跳出循环后则代表当前线程已经被添加到了同步队列中,于是调用acquireQueued重新抢夺同步状态。
    • 抢夺到同步状态后根据mode判断是否要抛出异常(在等待过程中被中断需要抛出异常)或恢复中断状态
  • await:等待,并释放资源。

    • 注意:此处为什么不是将当前的节点添加到条件队列,而是在条件队列里生成一个新的节点?
      • 因为调用await的时候已经获取到了同步状态,此时当前节点已经不算在同步队列中了。
public final void await() throws InterruptedException {
    // 如果线程被中断了的话直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加一个新的节点到等待队列。在addConditionWaiter中,如果发现尾节点的状态不对,则调用unlinkCancelledWaiters将等待队列中的非condition状态的节点移除。
    Node node = addConditionWaiter();
    // 释放所有资源。到此时,锁已经被释放了。
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    /**
         * 不断轮询当前是否在同步队列中,如果不在同步队列中就继续阻塞(除非等待时候被中断了)。通过当前节点的状态来进行判断,如果当前节点的状态为condition或前驱节点为空,则代表还在条件队列中。如果后续节点不为空(next,同步队列的下一个是通过nextWaiter判断的,所以不冲突),则代表已经在同步队列中了。否则从尾节点找到前驱当前节点。
         * 如果在同步队列中那么就代表被唤醒了(调用了singal方法)
         */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 此处线程已经被唤醒,唤醒的原因有两个:一是因为线程被中断,二是因为signal
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            // 判断在等待过程中线程是否被中断,如果被中断了就跳出循环重新抢夺时间片,抢夺到了后然后报告中断状态。
            break;
    }
    // 重新竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        // 如果当前节点有后续节点,清除被取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 恢复中断状态
        reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter:在条件队列中添加一个节点。同时会清除非condition的节点
private Node addConditionWaiter() {    
	Node t = lastWaiter; // 获取尾部指针,看来是采用尾插法    
	// If lastWaiter is cancelled, clean out.    
	if (t != null && t.waitStatus != Node.CONDITION) {        
		unlinkCancelledWaiters(); // 如果尾节点的状态不为condition,则遍历队列。清除非condition的节点。
		t = lastWaiter;    
	}    
	Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建一个新节点    
	if (t == null) // 尾结点为空,说明队列是空的        
		firstWaiter = node; // 初始化队列    
	else        
		t.nextWaiter = node; // 尾插    
	lastWaiter = node; // 调整尾指针指向    
	return node; // 返回新增节点对象
}
  • fullRelease:释放所有状态
  • isOnSyncQueue:判断节点是否在同步队列中
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            // 如果当前节点的状态为condition,或前去节点为null,则当前节点是一个独立的节点,即不在同步队列中
            return false;
        if (node.next != null) // 如果当前节点的下一个不为null,则代表已经在同步队列中了(此处是同步队列的判断,条件队列的判断是通过nextWaiter来判断的)
            return true;
		// 前置节点不为空。此时不代表当前节点一定在同步队列中,因为CAS可能会失败,需要不断的从尾节点轮询来获取当前节点的确切状态
        return findNodeFromTail(node);
    }
  • checkInterruptWhileWaiting:判断在等待过程中线程是否中断,同时报告线程中断的原因
// 判断当前线程在等待的过程中是否被中断
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

// 如果被中断了判断被中断的时候是否已经调用了signal。即判断被唤醒的原因。因为如果还在等待过程中被中断了的话需要抛出异常。
final boolean transferAfterCancelledWait(Node node) {

    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        // 此时代表被唤醒的时候还没有调用signal,如果CAS替换成功,则表明当前线程被唤醒的原因是因为在阻塞的过程中调用了interrupt
        enq(node);
        return true;
    }
    // 此时代表由于signal被唤醒。
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

signal通知

  • signal:通知
// 从条件队列中获取一个等待时间最长的唤醒
public final void signal() {
    if (!isHeldExclusively()) //模板方法,判断是持有同步状态。
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); 
}
  • doSignal:
private void doSignal(Node first) {
    do {
        
        if ( (firstWaiter = first.nextWaiter) == null)
            // 如果节点没有后续节点了,就将lastWaiter置为空
            lastWaiter = null;
        // 将first节点孤立出来(从条件队列中移除)
        first.nextWaiter = null;
        // 将其从条件队列中转换到同步队列
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

  • transferForSignal:将当前节点从条件队列中移交到同步队列
final boolean transferForSignal(Node node) {
    /*
         *如果当前节点状态无法被替换,那么当前节点就被取消了
         */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
    // 将当前节点添加到同步队列并且返回前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果当前节点的前驱节点被取消了或尝试替换等待状态失败就唤醒当前线程。
    // 前者:如果同步状态为取消,则唤醒线程,在await()逻辑中,被唤醒的线程会检查线程状态,此时的取消会导致在transferAfterCancelledWait()方法中,无法将node状态由CONDITION转为0,也就进而不停让出线程cpu时间,导致线程被取消。
    // 后者:无法更改等待状态的值,就说明有其他线程在操作该node。
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

内部类

ConditionObject

  • 成员变量:
    • firstWaiter:条件队列第一个节点
    • lastWaiter:条件队列最后一个节点
  • 参数:
    • REINTERRUPT:表示在退出的时候重新中断
    • THROW_IE:表示在退出的时候抛出InterruptedException

Node

  • CLH中的一个节点

  • 参数:

    • waitStatus:如果waitStatus非负表示节点不需要发信号。
      • cancelled:值为1,表明当前线程被取消了。
      • initial:值为0,初始状态
      • signal:值为-1,表明了当前节点已经被阻塞了,等待唤醒
      • condition:值为-2,表明当前节点在等待Condition。当其他线程对Condition调用了signal方法后,该节点会从等待队列中转移到同步队列中,加入到对同步状态的获取中
      • propagate:值为-3,表示下一次共享式同步状态获取将会无条件的被传播下去
    • prev:前结点的引用,如果前节点移除后会找到一个新的前节点。在队列中的线程是前节点永远存在
    • next:后节点的引用,后节点为空并不一定表示在队尾。
    • thread:当前节点绑定的线程
    • nextWaiter:下一个等待节点的链接,用于共享或者条件队列的时候。

下一篇: Spring解决循环依赖→

loading...