并发编程-读写锁ReentranReadWriteLock应用场景及源码解析
2021/11/8 17:39:54
本文主要是介绍并发编程-读写锁ReentranReadWriteLock应用场景及源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录
- 前言
- 读写锁介绍
- 什么是读写锁
- 特性
- 具体实现
- 源码解析
- readLock.lock()
- tryAcquireShared
- fullTryAcquireShared
- doAcquireShared
- unLock
- 总结
- 加锁流程
- 解锁流程
- 结尾
前言
假设你现在需要实现这样一个需求
给你一个Map集合(共享资源),实现如下需求
- 可以允许两个线程同时调用Map的get方法读取数据
- 不允许两个线程同时调用Map的put方法修改数据
- 不允许两个线程一个调用put方法,一个调用get方法,也就是说当一个线程调用put方法时,另一个想调用get方法的线程需要阻塞等put方法完成
你会如何实现?
这里可能会有同学想到用ConcurentHashMap,确实ConcurentHashMap可以满足第一点和第二点,但是它的put方法和get方法是相互隔离的,也就是说满足不了第三点,这种情况下就需要用到读写锁了
读写锁介绍
什么是读写锁
读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。
特性
这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读线程来访问共享资源,最大可能的读线程数为实际的逻辑CPU数。写线程是排他性的,一个读写锁同时只能有一个写线程或多个读线程(与CPU数相关),但不能同时既有读线程又有写线程。
- 读-读能共存(可以用多个线程同时的读)
- 读-写不能共存(读的时候不能有其他线程去修改,或者修改的时候不能有其他线程去读)
- 写-写不能共存(修改的时候不能再有其他线程去修改)
具体实现
读写锁在Java中的具体实现是ReentranReadWriteLock
ReentranReadWriteLock实现了ReadWriteLock接口,里面定义了读锁和写锁
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
然后我们来粗略看一下ReentranReadWriteLock的基本结构
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; //内部类,读锁 private final ReentrantReadWriteLock.ReadLock readerLock; //内部类,写锁 private final ReentrantReadWriteLock.WriteLock writerLock; //这个sync大家应该很熟悉了吧,继承了AQS final Sync sync; //可以指定是否公平锁 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //内部类ReadLock public static class ReadLock implements Lock, java.io.Serializable { //里面也有一个sync private final Sync sync; //注意这里的sync是外面的ReentrantReadWriteLock传进来的 protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //加锁的方法 public void lock() { sync.acquireShared(1); } //解锁 public void unlock() { sync.releaseShared(1); } ......... }
粗略看了下,我们可以得到几个信息
- ReentrantReadWriteLock支持指定是否公平锁
- 和ReentrantLock同样间接继承了AQS,通过sync做锁的控制
- 通过构造器传入的同一个sync和两个不同的静态内部类来控制读写锁的分离
现在我们基本知道了ReentranReadWriteLock的内部结构,下一步我们先来看看它的一些基本使用,
实践出真知,我们来看看如何用它来解决上面的共享Map的问题
public class ReadWriteLockTest { public static void main(String[] args) { //这里使用我们自定义的map,主要是为了给大家模拟当put比较慢时会出现的情况 TestMap<Integer, Integer> map = new TestMap<>(); for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { try { map.put(finalI, finalI); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { map.get(finalI); }).start(); } } //定义一个map static class TestMap<K, T>{ //定义一个map private Map<K, T> map = new HashMap<>(); //取值的方法 T get(K key) { T t = map.get(key); System.out.println("取到值" + t); return t; } //存值的方法 void put(K key, T value) throws InterruptedException { //模拟map插入的过程,方便演示问题 Thread.sleep(500); map.put(key, value); System.out.println("插入值" + key); } } } 运行结果如下: 取到值null 取到值null 取到值null 取到值null 取到值null 插入值0 插入值1 插入值4 插入值2 插入值3
上图代码会出现的问题就是,当我在往map里面存值还没存完的时候,其他线程同时取值时,这时取出来的是空的(因为还没存进去)
那么如果要实现当有人正在存值时,此时取值的人必须先等待所有存值完毕后,再去取,该如何实现?下面我们来改造下TestMap
//定义一个支持读写锁的map static class TestMap<K, T>{ //定义一个map private Map<K, T> map = new HashMap<>(); private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); //读锁 private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); //写锁 private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); //取值的方法 T get(K key) { readLock.lock(); T t = map.get(key); System.out.println("取到值" + t); readLock.unlock(); return t; } //存值的方法 void put(K key, T value) throws InterruptedException { writeLock.lock(); //模拟map插入的过程,方便演示问题 Thread.sleep(500); map.put(key, value); System.out.println("插入值" + key); writeLock.unlock(); } } 运行结果如下: 插入值0 插入值1 插入值2 插入值3 插入值4 取到值0 取到值1 取到值2 取到值3 取到值4
可以看到加锁之后的结果是符合我们的预期的,也就是写线程是优先的,这里提醒下大家unlock方法最好放到finally里面执行,防止不必要的死锁,我这里是为了方便
那么ReentranReadWriteLock源码里是怎么做到写线程独享资源, 读线程共享资源的呢?
下面让源码来解答大家的疑问
源码解析
在文章的开头我们已经了解了ReentranReadWriteLock的内部结构,首先我们来看一下读锁的lock方法
readLock.lock()
public void lock() { //从名字可以发现这个是共享锁 sync.acquireShared(1); } public final void acquireShared(int arg) { //尝试获取共享锁 if (tryAcquireShared(arg) < 0) //获取锁失败的话会进入这里 doAcquireShared(arg); }
tryAcquireShared
从名称可以看到这个是共享锁,然后我们来看看tryAcquireShared方法
Thread current = Thread.currentThread(); int c = getState(); //这里把int类型的c分成了上下两部分前16位和后16位 //前16位代表读锁 后16位代表写锁 //这里是把state & 65535 得到后 16 位的值 //如果不等于0说明写锁已经被持有并且不是当前线程的话就返回-1 //如果是当前线程的话说明还有资格继续获取读锁 //这里就体现了锁降级,也就是说获取了写锁的同时也能获取读锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //右移16位 得到c的高16位 int r = sharedCount(c); //这里调用hasQueuedPredecessors方法,这个我们在ReentrantLock里已经讲解过了 //判断队列中是否有优先级更高的线程,hasQueuedPredecessors方法要返回false才会进行下一步 //此时compareAndSetState设置读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r==0说明此时读锁是没有被占用的 if (r == 0) { //把当前线程设置成第一个读者 firstReader = current; //并且设置读锁的拥有次数 firstReaderHoldCount = 1; } else if (firstReader == current) { //这里是判断如果读锁此时已经被占用,并且第一个读线程是当前线程,次数加1 firstReaderHoldCount++; } else { //只有当读锁已经被占用,并且第一个读者不是当前线程的时候会进入下面 //代表的是最后一个获取读锁的线程的计数器 HoldCounter rh = cachedHoldCounter; //如果计数器==null 或者 不是当前线程 if (rh == null || rh.tid != getThreadId(current)) //readHolds.get()获取的是当前线程的读取锁的次数 //把当前线程的计数器设置为cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); //说明计数器已经存在而且是rh是当前线程 else if (rh.count == 0) //这里如果数量==0的话,把rh设置到readHolds中 //这个地方困惑了我挺久,看其他文章都是些啥覆盖本地..没明白 //我认为这里是因为count==0时,在fullTryAcquireShared方法中已经把readHolds给清除了,所以这里重新设置一次 readHolds.set(rh); //入锁次数加1 rh.count++; } return 1; } //这里是死循环获取读锁 return fullTryAcquireShared(current);
给大家描述下这个方法的流程:
- 写锁存在并且不是当前线程,直接返回-1,意味着如果写锁存在并且是当前线程的话,是可以继续获取读锁的,这里体现了锁降级
- 判断是否有优先级获取读锁
- 获取读锁之后,设置firstReader ,firstReaderHoldCount, cachedHoldCounter 等参数, 这三个参数其实是为了记录读锁的获取次数和类似一个缓存的作用,主要是为了性能考虑的
- 获取失败后进入fullTryAcquireShared方法死循环获取读锁
fullTryAcquireShared
然后看一下fullTryAcquireShared方法
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); //判断是否存在写锁 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //读者是否应该堵塞,也就是说是否有优先级更高的线程 //这里分为公平锁和非公平锁的两种实现 //返回true说明应该堵塞,返回false说明可以有资格去抢锁 //公平锁:判断队列中是否有优先级更高的节点hasQueuedPredecessors方法 //非公平锁:主要是看队列的head节点的后面,是否是写锁的等待线程,作用主要是为了防止写锁饥饿 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly //是否是重入 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //不是重入的话 if (rh == null) { //找到最后一个持有读锁的线程 rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //读锁次数超过65535就抛出异常 这里对应16位 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //尝试加锁,和之前的逻辑一样 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
这里主要就是注意下readerShouldBlock的非公平实现的apparentlyFirstQueuedIsExclusive方法,这里的非公平其实也不是完全非公平,这里为了防止写锁饥饿做了处理
doAcquireShared
然后看看获取锁失败放入队列的doAcquireShared方法
private void doAcquireShared(int arg) { //这里和reentrantLock的区别就是加了个shared final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //这里判断是头结点的话再次去获取锁 int r = tryAcquireShared(arg); if (r >= 0) { //获取锁成功后,设置head并且唤醒下一个share结点 //这里也是共享锁和独占锁的区别 //独占锁这里是不会唤醒下一个节点的 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //唤醒队列后的直到写锁节点前的所有节点 doReleaseShared(); } }
可以发现这个方法和reenTrantLock的acquireQueued方法差不多
差别主要是两点:
- 指定了node的shared状态
- 获取锁成功后会唤醒队列后的多个share节点
unLock
加锁的方法看完了,接下来看看解锁unLock的方法
public final boolean releaseShared(int arg) { //对共享变量进行操作-- if (tryReleaseShared(arg)) { //成功后唤醒下一个share线程 doReleaseShared(); return true; } return false; } //对共享变量进行操作--,这里不具体展开了 protected final boolean tryReleaseShared(int unused) { //...... } //这里是唤醒队列后的多个节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //①唤醒下一个节点 //如果下一个节点是读节点:抢到锁后会执行setHeadAndPropagate(node, r)方法,把自身设置成头结点,并且继续唤醒下一个读节点 //如果下一个节点是写节点: //这里有两个情况: //1.1,此时是setHeadAndPropagate中调用的doReleaseShared,这是读节点未解锁,写节点依然会堵塞 //1.2,此时是releaseShared解锁方法调用的doReleaseShared,读锁未被占用,写节点抢锁成功 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //什么时候会有h==head的情况呢? //对应上面的1.1:此时写节点依然堵塞,所有head节点没有更新,此时退出循环 //对应上面的1.2:此时写节点设置head成功,所以h!=head,此时会继续循环唤醒队列中的所有的读节点 //第三种情况:队列遍历完成,此时h==head if (h == head) // loop if head changed break; } }
注意这里的unLock会唤醒队列中的所有的读节点!!!
因为这里是循环遍历,退出条件是h == head
到这里为止,读锁的加解锁的过程就分析完成了,至于写锁的过程和ReenTrantock的大同小异,这里就不再赘述了
总结
加锁流程
1,判断是否有写锁存在,如果有写锁并且写锁的持有线程不是当前线程,直接加锁失败
2,判断此时线程是否是优先级最高的线程,是的话就设置state的高位(前16位存放读锁)
3,设置锁成功后设置各种参数例如firstReader,firstReaderHoldCount,cachedHoldCounter等参数
4,如果设置锁不成功的话就把当前线程加入到CLH队列中,再次尝试抢锁,失败的话就挂起自己,等待被唤醒
5,被唤醒后,继续尝试抢锁,抢锁成功后,把自己设置为head节点,如果下一个节点也是share节点的话,会一直沿着队列唤醒直到队列的下一个写节点为止
解锁流程
1,更改共享变量
2,唤醒队列中的所有的读节点
结尾
有不当之处欢迎指出,一起学习,共勉
这篇关于并发编程-读写锁ReentranReadWriteLock应用场景及源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-29Elasticsearch慢查询日志配置
- 2024-05-29揭秘华为如此多成功项目的产品关键——Charter模板
- 2024-05-29海外IDC业务拓展的7大挑战
- 2024-05-29InLine Chat功能优化对标Github Copilot,CodeGeeX带来更高效、更直观的编程体验!
- 2024-05-29CodeGeeX 智能编程助手 6 项功能升级,在Visual Studio插件市场霸榜2周!
- 2024-05-29AutoMQ 生态集成 Apache Doris
- 2024-05-292024年IDC行业的深度挖掘:机遇、挑战与未来展望
- 2024-05-29五款扩展组件齐发 —— Volcano、Keda、Crane-scheduler 等,邀你体验
- 2024-05-29AutoMQ 对象存储数据高效组织的秘密: Compaction
- 2024-05-29活动预告|来 GIAC 大会听大数据降本利器:AutoMQ 基于云原生重新设计的 Kafka