[TOC]
Java-锁与并发
一切还要从一个场景出发:
ArrayList是一个线程不安全的集合,请封装它的增删改查方法,实现线程安全。
一、开始
这很简单,我们有两个实现思路:synchronized
或者lock
。不多解释,直接看代码:
1 | package safeList; |
不管是用synchronized
还是lock
,都是给四个方法简单粗暴的上了锁,这样能够实现写写互斥、读写互斥、读读互斥——唉,这不对吧,读读是不应该互斥的。
lock
还有另一个实现类:ReentrantReadWriteLock
,可以帮助我们实现读写锁分离,读读之间不互斥,直接看代码:
1 | package safeList; |
这样,我们就完成了对一个ArrayList
的增删改查封装,保证了线程安全。
这是我在面试时被问的一道题目,其实我对上面的做法还是很在意:直接使用ReentrantReadWriteLock
似乎有点过于轮椅了,只要知道这个类,就可以轻松的实现要求。
那么能不能给自己上点难度,提高一点程序设计理解呢?OK,于是一个机灵,我决定自己实现一个简单的MyReadWriteLock
,不求细节,思路为主。
二、MyReadWriteLock
1、联想到信号量
我想到了一个很类似的场景:操作系统进程与线程的【读者-写者】问题:有读者和写者两组并发进程,共享一个文件,要求:
- 允许多个读者可以同时对文件执行读操作
- 只允许一个写者往文件中写信息
- 任何一个写者在完成写操作之前不逊于其他读者或写者工作。
要求2、3很好实现,只要为文件设一个信号量rw
,然后用P
、V
操作同步即可。稍难一点的地方在于:怎么保证读读之间不互斥?
我们的做法是:设置一个变量readCount
表示当前读进程的数量。当每次读进程要对信号量rw
进行P
操作时,先检查readCount
是否等于0,如果等于,说明这是第一个读进程,那么执行P(rw)
;如果不等于0,说明已经有读进程在占用文件了,其他读进程可以直接读,但是不用P(rw)
,只需要把readCount++
就可以了。
我们就参考这种思路来实现一下。
2、实现
下面的方法:
- 用
synchronized
关键字来保证原子性。 - 用
wait()
和notifuAll()
实现等待和唤醒线程。 - 附上一提,我们不需要用
volatile
保证变量的可见性,因为synchronized
也包括了保证可见性。
1 | public class MyReadWriteLock{ |
OK,这样我就根据自己的理解,设计了一个MyReadWriteLock
的实现方案——然而,这其实是一坨。
看看我是使用什么保证操作之间的原子性的?
synchronized——它是Java底层实现的锁机制,特点是重量级,随意使用
synchronized
会导致并发性能严重降低;相对来说,lock
的机制就要稍轻量一些。我有保证线程上的锁只能被自己释放吗?
没有,如果有两个线程用我这套锁去同步,那一个线程完全可以不讲武德:不进行
writeLock()
或者readLock()
,直接先writeUnlock()
或者readUnlock()
,把别人的锁释放了,自己再锁上;另一个线程也可以如此。那所谓的锁机制不就成了君子协定了吗?进程间是怎么样通信的?
使用的是 Java 自带的
wait()
和notifyAll()
,这还是要建立在使用synchronized
的基础上——这些方法只能在synchronized
代码块或synchronized
方法中使用,否则会抛出IllegalMonitorStateException
。而且,
notifyAll()
是唤醒所有线程,那么这些线程又要全部起来竞争一下这把锁,又消耗了一波CPU资源。
所以我又想:那lock
的底层实现是什么?能不能借鉴学习一下?
三、Lock的底层实现原理
Java 主要提供了两种方式来实现锁:
synchronized
关键字(JVM 层面)Lock
接口(ReentrantLock 等)
这两种方式的底层实现都依赖于 CAS、自旋、锁升级机制等技术。
1、CAS(Compare And Swap)
CAS 是 无锁编程(Lock-Free)中常用的原子操作,它通过硬件指令来保证 某个变量的更新是线程安全的,避免使用互斥锁(synchronized
),提高性能。如果你知道数据库中的【乐观锁】,那下面的原理一定非常好理解。
核心原理:
1)CAS需要三个值
V
:当前内存中的实际值E
:旧值N
:新值
2)只有当V=E
时,才会将V
更新为N
;否则说明已经有其他线程修改了V
,本次操作失败。
也就是说,CAS操作其实就是根据【乐观锁思想】保证了下面操作的原子性:
1 | if(V==E){ |
3)示例:
1 | AtomicINteger atomicInt = new AtomicInteger(0); |
2、自旋锁(Spin Lock)
自旋锁是一种非阻塞锁概念,当线程想要占用锁时:
- 如果锁被占用,线程不会进入阻塞,而是循环检查锁是否被释放,这种忙等待的方式称为自旋。
3、Lock溯源
Lock的实现过程有这么几个关键词:
- int 类型状态
- 双向链表
- CAS+自旋。
在lock的构造函数中,定义了一个NonFairSync,
1 | public ReentrantLock() { |
NonfairSync 又是继承于Sync
1 | static final class NonfairSync extends Sync |
一步一步往上找,找到了AbstractQueuedSynchronizer
1 | abstract static class Sync extends AbstractQueuedSynchronizer |
最后这个鬼,又是继承于AbstractOwnableSynchronizer(AOS)。
他们之间的关系如图:
- AOS主要是保存获取当前锁的线程对象。
- FairSync 与 NonfairSync的区别在于,是不是保证获取锁的公平性,因为默认是NonfairSync。
- AQS中包含了主要的代码。
4、手写MyLock
这是我们的测试main
函数:
1 | public class LockExample { |
这是目前的锁:
1 | public class MyLock { |
我们顺着思路,一步一步填充MyLock。
1、多线程同步的基础是锁,那么锁到底做了什么呢?
先lock(),这里lock是需要阻塞的,如果没有阻塞则成功返回,表示拿到了锁。
那么我们写一个循环,并且调用一个原子操作 CAS,如果原子操作成功了,就返回;如果失败了,就一直循环,相当于阻塞。
那释放锁自然是把flag
置回 false。
1 | public class MyLock { |
2、如果能完成第一步,那么我们 LockExample 中的多线程执行就能够得到正确的结果,说明我们已经实现了一个简易锁。
看看我们还有没有其他的问题?有的。
我们的锁现在还不能辨识是哪个线程抢到了自己,换句话说,一个线程可以释放已经被其他线程占用的锁——只要它不守规矩,不先lock(),直接unlock(),就可以了。
我们需要做到: lock 只能被当前占用的线程释放。
实现:
- 给 MyLock 新增一个成员变量
owner
。 - 当线程抢到锁时,把
owner
设置当前线程。 - 当释放锁时,判断当前线程是否为
owner
,不是的话,就抛个异常出来。
1 | private Thread owner = null; |
3、现在有点像样了,但是还不行。目前,所有线程在没拿到锁后,都会不断循环尝试获取锁,这是一种对CPU的不必要消耗。
我们可以设计:当线程没拿到锁时,进行阻塞:LockSupport.park()
;直到被别人唤醒,才继续争抢锁。
这个唤醒的工作自然是由释放锁的线程来完成了,但是这又紧接着许多问题:
- 问题一:我们上哪去找正在等待的线程?
- 问题二:如果有多个正在等待的线程,应该唤醒哪一个呢?
带着这两个问题,我们需要维护一个【等待链表】(为了方便查询,设计成双向),这个链表的节点结构是这样的:
1 | class Node{ |
在 MyLock 中维护这个链表的【头结点】 Node head
以及【尾节点】Node tail
。而唤醒线程的原则,我们就遵守【先进先出】。
这样我们就可以回答上面的两个问题了:
- 问题一解:用一个双向链表维护被阻塞的线程。
- 问题二解:遵守【先进先出】原则。
这里还有一个重点:
head
和tail
是所有线程都能访问的,意味着也面临多线程操作,举个例子:
1 | current.prev=tail; |
这段代码在多线程情况下是肯定会出问题的。
所以当我们对head
、tail
进行各种引用操作时,也要保证原子性——用【原子引用】包装head
、tail
:
1 | private AtomicReference<Node> head=new AtomicReference<>(new Node()); |
OK,继续改造!
每次阻塞一个线程时,我们设计以下思路:
- 先创建一个节点
node
,插入到链表,并维护双向链表结构。 - 让线程休眠:
LockSupport.park()
- 当被唤醒时,要进行判断:是不是轮到自己了?以及是不是真能抢到锁?如果均为肯定,则加锁;如果不是,则继续休眠,等待被唤醒。
这里的第1点是有问题的:可能有多个线程并发的要插入到这个等待队列中,这也是我们使用原子引用封装head
、tail
的原因
所以第1点应该改为:先创建一个节点node
,循环的尝试插入到链表,直到成功,并维护双向链表结构。
4、OK,整理一下上述思路,我们就可以得到一个实现了 lock() 的 MyLock,让我们直接用代码概括吧。
1 | public class MyLock { |
5、我们现在开始补充释放锁 unlock() 的逻辑。
1 | public void unlock() { |
我们在lock()
中已经实现了:
- 线程如果拿到锁,则直接返回
- 如果没拿到锁的话,则要安全的把自己放在链表上,然后休眠。
- 被唤醒后,如果能拿到锁的话,要安全的把自己从链表中删除。
而在unlock()
中,我们其实就只要实现【当一个线程释放锁时,唤醒等待队列中的第一个线程】。
直接上代码:
1 | public void unlock() { |
这里的flag.set(flag)
不需要使用CAS操作,因为我们分析可知,此处是不可能有竞争的。
这里有两点非常重要:
flag.set(false)
一定要在LockSupport.unpark(headNode.next.thread);
之前,因为你要唤醒别的线程去抢锁,你肯定得先把锁释放了。flag.set(false)
一定要在Node headNode = head.get();
之后,因为一旦flag.set(false)
,那就意味着所有的线程都可以并发,执行下面的代码;如果代码写成这样:
1
2
3
4
5flag.set(false);
Node headNode=head.get(); //(1)
if(headNode.next!=null){
LockSupport.unpark(headNode.next.thread); //(2)
}这里(2)处的
headNode
与(1)处的headNode
就可能指向的不是同一个对象了。
6、OK,整理一下我们当前的版本。
1 | public class MyLock { |
我在这里还应该提醒一点:
我们这里实现的并不是一个严格保证【先到先得】机制的锁。我们能保证的是:如果一个进程进入阻塞队列了,那么在这个阻塞队列的范围内,我们保证【先到先得】。
如何理解?假设这样一个场景:
当线程A执行完
unlock()
中的LockSupport.unpark(headNode.next.thread);
这一条语句,此时锁被释放,等待队列中的线程B被唤醒;同时新来了一个线程C,因为锁被释放了,所以它直接快B一步,抢到了锁;
线程B发现还是抢不到锁,所以再次休眠,等待线程C的唤醒;
也许被线程C唤醒时,又会跑来一个线程D,重复发生上述故事………
这在我们的代码中是完全可能发生的。事实上,这就是所谓的【非公平锁】。
7、如何实现一个公平锁呢?
其实很简单,引起不公平的原因其实就是这一段代码,我们把它注释掉,就成为【公平锁】了。
1 | public void lock() { |
8、目前这个版本是存在死锁可能性的。
我们用注释标注几行代码:
1 | public class MyLock { |
假设这样的场景:
有线程A和线程B并发,线程A先抢到了锁,执行完了自己的任务,准备释放锁,此时它在(2)处,还没有执行;
线程B没有抢到锁,所以准备把自己放到等待队列中,此时它在(1)处,没有执行;
此时线程A执行(2)(3)处代码,问题发生了——线程B执行了
tail.compareAndSet(currentTail, current)
,所以(2)(3)处headNode
是存在的;但是线程B没有执行(1)处,所以headNode.next
是空,所以线程A不会去唤醒线程B。线程A完成了
lock
与unlock
,拍拍屁股走了,但是线程B却一直在休眠。除非有新的线程进来,否则线程B将一直休眠,死锁产生。
怎么解决呢?其实也很简单。
把这段代码改成这样即可:
1 | // 拿到锁的话,要安全的把自己从链表中删除 |
细品即可。
9、最终版本
总结Lock的本质:状态变量 + 双向链表 + CAS + 自选
1 | package safeList; |
四、可重入锁
1、可重入锁概念
【可重入锁】指的是:当一个线程获得一把锁时,可再次获得这把锁,而不会发生死锁。
在 Java 中,synchronized
是可重入锁,支持同一个线程多次进入同步代码块。
举例:
1 | class ReentrantExample { |
输出:
1 | methodA() |
Java 提供的 ReentrantLock
也是可重入锁,允许同一个线程多次获取同一把锁。
其实reentrantLock
就翻译为【可重入锁】。
2、什么时候会有一个线程重复获得锁?
- 方法递归调用
- 同一个锁保护的多个方法之间的调用(如上)
- 在一个线程中循环获得锁
3、手写可重入锁MyReentrantLock
核心思路:
1)将 AtomicBoolean flag = new AtomicBoolean(false)
改为AtomicInteger state = new AtomicInteger(0)
state
描述的是当前【锁被同一个线程获取的次数】。
线程能直接获取锁的条件也就变成了:【锁当前是自由的】或者【锁已经被当前线程获取】
2)在lock()
的入口处,修改【线程直接获取锁】的逻辑,支持通过【锁已经被当前线程获取】获取到锁。
1 | // 拿到锁直接返回(注掉这一段,就变成【公平锁】) |
3)修改锁的释放逻辑:
1 | public void unlock() { |
注意(1)处:
在不可重入锁版本中,
owner=null
没有存在的必要,不过就算加上也不会有错。在当前可重入锁版本中,
owner=null
是必须的,否则会有两个线程获取到锁的可能。假设可重入场景下,不写
owner=null
:线程A获取到锁,执行完任务后,释放掉锁,但此时 owner 仍指向线程A;
线程B通过
if(state.get()==0)
获取到锁,但还没来得及将 owner 指向自己;线程A又尝试争抢锁,由于
else if(owner==Thread.currentThread())
是成立的,所以线程A也获得锁;A、B同时获得锁,锁失效。
MyReentrantLock最终版本
1 | package safeList; |
测试main
函数:
1 | package safeList; |
五、无法理解的BUG
在用上面的测试函数对MyReentrantLock
进行测试时,我遇到了这样的情况:
它居然显示这里会有空指针异常,我无法理解:
1 | if(headNode.next!=null){ |
更离谱的是,前期试出过几次后,我在程序里循环执行10000次这个测试,但都没有复现!
更离谱的是,当我放弃循环,而是大概手点了50次之后,它又出现了……….
我发誓我没有修改任何一行代码!我发誓上面这张图也不是P图!
但是我隐约觉得这应该不是代码逻辑的问题。。。。