【Java 并发编程】— AQS 源码探索之共享式

2022/6/9 1:21:40

本文主要是介绍【Java 并发编程】— AQS 源码探索之共享式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

在【Java 并发编程】——AQS 源码探索之独占式一文中从源码详细介绍了 AQS 独占式的实现方式。本文将介绍 AQS 的共享式,顾名思义,共享式就是允许多个线程同时访问同一个资源。

共享式实例

在独占式中,AQS 中的状态用来表示可获取或者已独占(比如 0 表示可获取,1 表示已被占用)。共享式中,状态已不再是具体数值,而是一个范围:大于等于 0 表示可获取,小于 0 表示已被占满。

下面是一个自定义共享式同步工具类 TwinsLock,同一时刻最多允许两个线程访问:

public class TwinsLock implements Lock {

    private Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        private Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must larger than 0");
            }
            // 初始化 state 的值,表示可同时访问的线程数量
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for(;;) {
                // 获取当前
                int currentCount = getState();
                // 计算剩余可用数量
                int newCount = currentCount - reduceCount;
                if (newCount < 0 || compareAndSetState(currentCount, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for(;;) {
                int currentCount = getState();
                int newCount = currentCount + returnCount;
                if (compareAndSetState(currentCount, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
    
    // 省略其他方法
}

按照惯例,定义静态内部类实现 AbstractQueuedSynchronizer,共享式需要重写 tryAcquireShared() 和 tryReleaseShared() 方法。TwinsLock 类的作用是同时允许两个线程通过,其他线程需要等待。获取和释放的具体逻辑可以看上面代码注释,使用方式如下

TwinsLock lock = new TwinsLock();
lock.lock();
try {
    // do sth...
} finaly {
    lock.unlock();
}

使用方式上和 ReentrantLock 一毛一样有木有。

获取

接下来分析获取的流程,lock.lock() 调用的是 sync.acquireShared() 方法

public final void acquireShared(int arg) {
    // tryAcquireShared 需要子类重写
    if (tryAcquireShared(arg) < 0)
        // 获取失败后调用
        doAcquireShared(arg);
}

/**
 * 共享模式获取,不响应中断
 */
private void doAcquireShared(int arg) {
    // 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 为头节点
            if (p == head) {
                // 重新获取
                int r = tryAcquireShared(arg);
                // 获取成功
                if (r >= 0) {
                    // 设置头节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 如果检测到中断,就中断自己
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            // 判断是否应该阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 阻塞并检查中断
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

共享式获取失败后的操作 doAcquireShared() 和独占式中操作很相似,获取的前提是前驱节点必须是头节点,否则进行阻塞。获取成功后执行 setHeadAndPropagate() 方法,并检查中断,如果需要中断,那就中断当前线程,最后返回。看看 setHeadAndPropagate 方法

private void setHeadAndPropagate(Node node, int propagate) {
    // 把获取成功的节点设置为头头节点
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        // 获取下一个节点
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

重点瞅瞅 doReleaseShared()

/**
 * Release action for shared mode -- signal successor and ensure
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 *
 * 共享模式下的唤醒动作 -- 唤醒后继者并保证这种方式传播下去
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 1. h = null,队列还没初始化
        // 2. h == tail,队列刚初始化,就一个头节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点的状态为 SIGNAL
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头节点没有发生变化,退出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

这个方法是将头节点的的状态先从 SIGNAL 改为 0,再从 0 改为 PROPAGATE,至于为什么不一步到位,可以看看 unparkSuccessor() 中的代码

if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

这里将状态小于 0 的改成 0,如果 doReleaseShared() 方法直接将头节点的状态改为 PROPAGATE,那这里相当于做了一次无用功。如果改变状态失败,说明头节点被改变了,那么进行下一次循环,重新获取头节点。从if (h == head) {break;}可以知道每次只会唤醒头节点的后继节点。

释放

释放代码

public final boolean releaseShared(int arg) {
    // tryReleaseShared() 由子类实现
    if (tryReleaseShared(arg)) {
        // 释放操作,分析如上
        doReleaseShared();
        return true;
    }
    return false;
}

共享式和独占式区别

状态

独占式中状态表示可获取已占用,比如 0 表示可以获取,获取成功后将状态改为 1,这种改变通过 CAS 实现,代码如下

if (compareAndSetState(0, 1)) {
    // 获取成功
}

释放的时候将状态改为 0 即可

setState(0);

而共享式中状态一般用来表示的可用许可数量,当许可大于或等于 0 表示允许获取,每次获取成功后减掉指定许可数量并改变状态,直到状态小于 0 表示不可获取

for(;;) {
    int currentCount = getState();
    int newCount = currentCount - reduceCount;
    if (newCount < 0 || compareAndSetState(currentCount, newCount)) {
        return newCount;
    }
}

可以看到,不管是独占式还是共享式,核心还是状态的改变。

唤醒

独占式中,队列中阻塞线程需要前驱节点唤醒,而只有前驱节点在释放操作是才会去唤醒。

而共享式中,除了释放的时候唤醒,重新获取成功的时候也会去唤醒后继节点。

入队

独占式中获取代码如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

代码中可以知道,如果线程获取同步状态失败,那么就将加入队列尾部。

而共享式则不同,

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

共享式中获取失败就直接返回,不会再加入队列。因为共享式一般用来允许指定数量的线程同时访问共享资源,当同步状态小于 0,则表示访问的线程数已达上限,后来的线程只能拒之门外。



这篇关于【Java 并发编程】— AQS 源码探索之共享式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程