第6章 Java并发包中锁原理剖析

LockSupport工具类

LockSupport是创建锁和其他同步类的基础。

LockSupport类与每个使用它的线程都会关联一个许可证,默认情况下调用LockSupport类的方法的线程是不持有许可证的。

下面介绍LockSupport类中的几个主要函数。

1. void park()

如果park方法拿到了与LockSupport关联的许可证,则调用LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。

如下代码直接在main函数里面调用park方法,最终只会输出 “begin park!”,然后当前线程被挂起,这时因为在默认情况下调用线程是不持有许可证的。

  1. public static void main(String[] args) {
  2. System.out.println("begin park!");
  3. LockSupport.park();
  4. System.out.println("end park!");
  5. }

在其他线程调用unpark(Thread thread)方法并且将当前线程作为参数时,调用park方法而被阻塞的线程会返回。另外,如果其他线程调用了阻塞线程的interrupt()方法,设置了中断标志或者线程被虚假唤醒,则线程也会返回。所以在调用park方法时最好也使用循环条件判断方式。

注意: 因调用park方法而被阻塞的线程被其他线程中断而返回时并不会抛出InterruptedException异常。

2. void unpark(Thread thread)

当一个线程调用unpark时,如果参数thread线程没有持有thread与LockSupport类相关联的许可证,则让thread线程持有。如果thread因调用park()而被挂起,则unpark方法会使其被唤醒。如果thread之前没有调用park,则调用unpark方法后再调用park方法会立即返回,代码如下。

  1. public static void main(String[] args) {
  2. System.out.println("begin park!");
  3. LockSupport.unpark(Thread.currentThread());
  4. LockSupport.park();
  5. System.out.println("end park!");
  6. }

输出如下:

  1. begin park!
  2. end park!

下面再来看一个例子来加深对park和unpark的理解。

  1. public static void main(String[] args) throws InterruptedException {
  2. Thread thread = new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. System.out.println("child thread begin park!");
  6. // 挂起自己
  7. LockSupport.park();
  8. System.out.println("child thread unpark!");
  9. }
  10. });
  11. thread.start();
  12. // 确保调用unpark前子线程已经将自己挂起
  13. Thread.sleep(1000);
  14. System.out.println("main thread begin unpark!");
  15. LockSupport.unpark(thread);
  16. }

子线程将自己挂起,主线程中调用了unpark方法使得子线程得以继续运行。

3. void parkNanos(long nanos)

和park方法类似,如果调用park方法的线程已经拿到了与LockkSupport关联的许可证,则调用LockSupport.parkNanos(long nanos)方法会立即返回。不同之处在于,如果没有拿到许可证,则调用线程会被挂起nanos时间后自动返回。

抽象同步队列AQS概述

AQS——锁的底层支持

AbstractQueuedSynchronizer抽象同步队列简称AQS,是实现同步器的基础组件。

以下为AQS的类结构图:

第6章 Java并发包中锁原理剖析 - 图1

AQS是一个FIFO的双向队列,内部通过head和tail两个节点来对队列进行维护。

Node是AQS的一个静态内部类,属性SHARED和EXCLUSIVE分别代表用来标识线程是获取共享资源和独占资源时被阻塞挂起放入AQS队列的。thread为Node持有的Thread;waitStatus用于记录当前线程的状态,CANCELLED表示线程被取消,SIGNAL表示线程需要唤醒,CONDITION表示线程在条件队列里面等待,PROPAGATE表示释放共享资源时需要通知其他节点。

AQS维护了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。

AQS内部类ConditionObject用来结合锁实现线程同步。

AQS实现线程同步的关键是对state进行操作,根据state是否属于一个线程,操作state的方式可分为独占方式和共享方式。

独占方式下获取和释放资源的方法为:

void acquire(int arg) void acauireInterruptibly(int arg) boolean release(int arg)

共享方式下获取和释放资源的方法为:

void acauireShared(int arg) void acauireSharedInterruptibly(int arg) boolean releaseShared(int arg)

独占方式下,获取和释放资源的流程如下:

当一个线程调用acquire(int arg)获取独占资源时,会首先使用tryAcquire方法进行尝试,具体就是设置state的值,成功则世界返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)挂起自己。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

但一个线程调用release(int arg)会尝试使用tryRelease操作释放资源,这里也是改变state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。被阻塞的线程使用tryAcquire尝试,看当前state的值是否满足自己的需要,满足则该线程被激活,继续向下运行,否则还是会被放入AQS队列并被挂起。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

注意; AQS类并没有提供tryAcquire和tryRelease方法的实现,因为AQS是一个基础框架,这两个方法需要由子类自己实现来实现自己的特性。

共享方式下,获取和释放资源的流程如下;

当线程调用acquireShared(int arg)获取共享资源时,首先使用tryAcquireShared尝试获取资源并修改state,成功则直接放回,否则将当前线程封装为Node.SHARED类型的节点插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

当一个线程调用releaseShared(int arg)时会尝试使用tryReleasedShared操作释放资源并修改state,然后使用LockSupport.unpark(thread)激活AQS队列中的一个线程(thread)。被激活的线程会调用tryReleaseShared查看当前state是否满足自己需求,满足则该线程被激活,否则继续挂起。

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

注意: 同上,AQS没有提供tryAcquiredShared和tryReleaseShared方法的实现,这两个方法也需要由子类实现。

AQS——条件变量的支持

以下是使用条件变量的例子:

  1. ReentrantLock lock = new ReentrantLock();
  2. Condition condition = lock.newCondition();
  3. lock.lock();
  4. try{
  5. System.out.println("begin wait");
  6. condition.await();
  7. System.out.println("end wait");
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }finally {
  11. lock.unlock();
  12. }
  13. lock.lock();
  14. try{
  15. System.out.println("begin signal");
  16. condition.signal();
  17. System.out.println("end signal");
  18. }catch (Exception e){
  19. e.printStackTrace();
  20. }finally {
  21. lock.unlock();
  22. }

上述代码中,condition是由Lock对象调用newCondition方法创建的条件变量,一个Lock对象可以创建多个条件变量。

lock.lock()方法相当于进入synchronized同步代码块,用于获取独占锁;await()方法相当于Object.wait()方法,用于阻塞挂起当前线程,当其他线程调用了signal方法(相当于Object.notify()方法)时,被阻塞的线程才会从await处返回。

lock.newCondition()作用是new一个在AQS内部类ConditionObject对象。每个条件变量内部都维护了一个条件队列,用来存放调用该条件变量的await方法时被阻塞的线程。

注意: 这个条件队列和AQS队列不是一回事。

以下是await的源码:

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. // 创建新的node节点,并插入到条件队列末尾
  5. Node node = addConditionWaiter();
  6. // 释放当前线程的锁
  7. int savedState = fullyRelease(node);
  8. int interruptMode = 0;
  9. // 调用park方法阻塞挂起当前线程
  10. while (!isOnSyncQueue(node)) {
  11. LockSupport.park(this);
  12. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  13. break;
  14. }
  15. ...
  16. }

首先会构造一个类型为Node.CONDITION的node节点,然后将该节点处插入条件队列末尾,之后当前线程会释放获取的锁,并被阻塞挂起。这时如果有其他线程调用lock.lock()方法尝试获取锁,就会有一个线程获取到锁。

再来看signal源码:

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. // 将条件队列头元素移动到AQS队列等待执行
  7. doSignal(first);
  8. }

调用signal时,会把条件队列队首元素放入AQS中并激活队首元素对应的线程。

基于AQS实现自定义同步器

下面基于AQS实现一个不可重入的独占锁。自定义AQS重写一系列函数,还需要定义原子变量state的含义。这里定义state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有。

  1. public class NonReentrantLock implements Lock, Serializable {
  2. // 内部帮助类
  3. private static class Sync extends AbstractQueuedSynchronizer {
  4. // 锁是否被持有
  5. @Override
  6. protected boolean isHeldExclusively() {
  7. return getState() == 1;
  8. }
  9. // 尝试获取锁
  10. @Override
  11. protected boolean tryAcquire(int arg) {
  12. if (compareAndSetState(0, 1)) {
  13. setExclusiveOwnerThread(Thread.currentThread());
  14. return true;
  15. }
  16. return false;
  17. }
  18. // 尝试释放锁
  19. @Override
  20. protected boolean tryRelease(int arg) {
  21. if(getState() == 0) {
  22. throw new IllegalMonitorStateException();
  23. }
  24. setExclusiveOwnerThread(null);
  25. setState(0);
  26. return true;
  27. }
  28. // 提供条件变量接口
  29. Condition newCondition() {
  30. return new ConditionObject();
  31. }
  32. }
  33. // 创建一个Sync来做具体工作
  34. private final Sync sync = new Sync();
  35. @Override
  36. public void lock() {
  37. sync.acquire(1);
  38. }
  39. @Override
  40. public boolean tryLock() {
  41. return sync.tryAcquire(1);
  42. }
  43. @Override
  44. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  45. return sync.tryAcquireNanos(1, unit.toNanos(time));
  46. }
  47. @Override
  48. public void unlock() {
  49. sync.tryRelease(1);
  50. }
  51. @Override
  52. public void lockInterruptibly() throws InterruptedException {
  53. sync.acquireInterruptibly(1);
  54. }
  55. @Override
  56. public Condition newCondition() {
  57. return sync.newCondition();
  58. }
  59. }

NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync继承了AQS。由于是独占锁,:Sync只重写了tryAcquire、tryRelease、isHeldExclusively。此外,Sync提供了newCondition方法来支持条件变量。

下面使用自定义的锁来实现简单的生产-消费模型

  1. final static NonReentrantLock lock = new NonReentrantLock();
  2. final static Condition notFull = lock.newCondition();
  3. final static Condition notEmpty = lock.newCondition();
  4. final static Queue<String> queue = new LinkedBlockingQueue<>();
  5. final static int queueSize = 10;
  6. public static void main(String[] args) {
  7. Thread producer = new Thread(new Runnable() {
  8. @Override
  9. public void run() {
  10. // 获取独占锁
  11. lock.lock();
  12. try {
  13. while (true) {
  14. // 队列满了则等待
  15. while (queue.size() >= queueSize) {
  16. notEmpty.await();
  17. }
  18. queue.add("ele");
  19. System.out.println("add...");
  20. notFull.signalAll();
  21. }
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } finally {
  25. // 释放锁
  26. lock.unlock();
  27. }
  28. }
  29. });
  30. Thread consumer = new Thread(new Runnable() {
  31. @Override
  32. public void run() {
  33. // 获取独占锁
  34. lock.lock();
  35. try {
  36. while (true) {
  37. // 队列空则等待
  38. while (queue.size() == 0) {
  39. notFull.await();
  40. }
  41. String ele = queue.poll();
  42. System.out.println("poll...");
  43. notEmpty.signalAll();
  44. }
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. } finally {
  48. // 释放锁
  49. lock.unlock();
  50. }
  51. }
  52. });
  53. producer.start();
  54. consumer.start();
  55. }

代码使用了NonReentrantLock来创建lock,并调用lock.newCondition创建了两个条件变量用来实现生产者和消费者线程的同步。

ReentrantLock的原理

类图结构

第6章 Java并发包中锁原理剖析 - 图2

构造函数如下:

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  4. public ReentrantLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }

可以看到,ReentrantLock最终还是依赖AQS,并且根据传入的参数来决定其内部是一个公平锁还是非公平锁(默认为公平锁)。

Sync类直接继承自AQS,它的子类NonfairSync和FairSync分别实现了获取锁的非公平与公平策略。

AQS的state表示线程获取锁的可重入次数。state为0表示当前锁没有被任何线程持有。当一个线程第一次获取该所是会尝试使用CAS设置state为1,成功后记录该锁的持有者为当前线程。以后每一次加锁state就增加1,表示可重入次数。当该线程释放该锁时,state减1,如果减1后state为0,则当前线程释放该锁。

获取锁

void lock()

当一个线程调用该方法时,如果锁当前没有被其他线程占有并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程,并且将state置为1;如果当前线程已经获取过该锁,则将state的值增加1;如果该锁已经被其他线程持有,则调用该方法的线程会被放入AQS队列中阻塞挂起等待,

  1. public void lock() {
  2. sync.lock();
  3. }

ReentrantLock的lock()委托给了sync,根据创建ReentrantLock构造函数选择sync的实现时NonfairSync还是FairSync,这个锁是一个公平锁或者非公平锁。

先来看非公平锁的情况:

  1. final void lock() {
  2. // CAS设置state为1
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. // 调用AQS的acquire方法
  7. acquire(1);
  8. }

默认state为0,所以第一个调用Lock的吸纳成会通过CAS设置状态值为1,CAS成功则表示当前线程获取到了锁,然后设置该锁持有者为当前线程。

如果此时有其他线程企图过去该锁,CAS会失败,然后会调用AQS的acquire方法。

再贴下acquire的源码:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

之前说过,AQS并没有提供可用的tryAcquire方法,tryAcquire方法需要子类自己定制。这里会调用ReentrantLock重写的tryAcquire方法。下面先看非公平锁的代码。

  1. protected final boolean tryAcquire(int acquires) {
  2. return nonfairTryAcquire(acquires);
  3. }
  4. final boolean nonfairTryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. // (1)锁未被持有
  8. if (c == 0) {
  9. if (compareAndSetState(0, acquires)) {
  10. // 设置锁的持有者为当前线程
  11. setExclusiveOwnerThread(current);
  12. return true;
  13. }
  14. }
  15. // (2)锁已经被某个线程持有,如果该线程为当前线程
  16. else if (current == getExclusiveOwnerThread()) {
  17. int nextc = c + acquires;
  18. if (nextc < 0) // overflow
  19. throw new Error("Maximum lock count exceeded");
  20. // 增加重入数
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }

源码比较简单,分析见注释。

下面来看非公平性体现在哪儿。首先非公平指的是先尝试获取锁的线程并不一定首先获取该锁。

假设线程A执行到代码(1)发现线程已经被持有然后执行到(2)发现当前线程不是锁持有者,则返回false被放入AQS中进行等待。假设这时线程B也执行到了代码(1),发现state为0(假设占有该锁的其他线程释放了该锁), 就成功获取了锁,而比B先请求锁的线程A还在等待,这就是非公平性的体现。

下面看FairSync重写的tryAcquire方法。

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. // 公平性策略
  6. if (!hasQueuedPredecessors() &&
  7. compareAndSetState(0, acquires)) {
  8. setExclusiveOwnerThread(current);
  9. return true;
  10. }
  11. }
  12. else if (current == getExclusiveOwnerThread()) {
  13. int nextc = c + acquires;
  14. if (nextc < 0)
  15. throw new Error("Maximum lock count exceeded");
  16. setState(nextc);
  17. return true;
  18. }
  19. return false;
  20. }

由代码可知,公平的tryAcquire方法与非公平的区别在于增加了一个hasQueuedPredecessors方法来判断是否有线程在当前线程前尝试获取锁。

下面是hasQueuedPredecessors的具体实现。

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail; // Read fields in reverse initialization order
  3. Node h = head;
  4. Node s;
  5. return h != t &&
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }

如果当前线程节点有前驱节点则返回true,否则如果当前AQS队列为空或者当前线程节点是AQS的第一个节点则返回false。其中h==t说明当前队列为空,直接返回false;如果h!=t并且s==null说明有一个元素将要作为AQS的第一个节点入队(AQS入队包含两步操作:首先创建一个哨兵头节点,然后将第一个元素插入哨兵节点后面),那么返回true;如果h!=t并且s!=null且s.thread!=Thread.currentThread()说明队列里面的第一个元素不是当前线程,那么返回true。

void lockInterruptibly()

与lock()方法类似,不同之处在于,它对中断进行相应,就是当前线程在调用该方法时,如果其他线程调用了当前线程的interrupt方法,则当前线程会抛出inetrruptedException 异常,然后返回。

  1. public void lockInterruptibly() throws InterruptedException {
  2. sync.acquireInterruptibly(1);
  3. }
  4. public final void acquireInterruptibly(int arg)
  5. throws InterruptedException {
  6. // 如果当前线程被中断,则直接抛出异常并返回
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. if (!tryAcquire(arg))
  10. doAcquireInterruptibly(arg);
  11. }

boolean tryLock()

尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁并返回true,否则返回false。该方法不会引起当前线程阻塞。

  1. public boolean tryLock() {
  2. return sync.nonfairTryAcquire(1);
  3. }
  4. final boolean nonfairTryAcquire(int acquires) {
  5. final Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c == 0) {
  8. // 非公平策略
  9. if (compareAndSetState(0, acquires)) {
  10. setExclusiveOwnerThread(current);
  11. return true;
  12. }
  13. }
  14. else if (current == getExclusiveOwnerThread()) {
  15. int nextc = c + acquires;
  16. if (nextc < 0) // overflow
  17. throw new Error("Maximum lock count exceeded");
  18. setState(nextc);
  19. return true;
  20. }
  21. return false;
  22. }

上述代码与非公平锁的tryAcquire方法类似,所以tryLock使用的是非公平策略。

boolean tryLock(long timeout, TimeUnit unit)

  1. public boolean tryLock(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  4. }

设置了超时时间,如果超时时间到了还没有获取到该锁则返回false。

释放锁

void unlock()

尝试释放锁,如果当前线程持有该锁,则调用该方法会让该线程对该线程持有的AQS状态值减1,如果减去1后状态值为0,则当前线程会释放该锁。

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. protected final boolean tryRelease(int releases) {
  5. int c = getState() - releases;
  6. // 如果当前线程不是该锁持有者直接抛出异常
  7. if (Thread.currentThread() != getExclusiveOwnerThread())
  8. throw new IllegalMonitorStateException();
  9. boolean free = false;
  10. // 若state变为0,则清空锁持有线程
  11. if (c == 0) {
  12. free = true;
  13. setExclusiveOwnerThread(null);
  14. }
  15. // 设置可重入次数减1
  16. setState(c);
  17. return free;
  18. }

读写锁ReentrantReadWriteLock原理

解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,同一时间只能有一个线程获取该锁,而实际中会出现读多写少的情况,显然使用ReentrantLock满足不了这个需求,这时就需要用到ReentrantReadWriteLock。ReentrantReadWriteLock采用读写分离的策略,允许多个线程同时获取写锁。

类图结构

第6章 Java并发包中锁原理剖析 - 图3

读写锁内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能。而Sync继承自AQS,并且也提供了公平与非公平的实现。下面只介绍非公平读写锁的实现。

我们知道AQS中只维护了一个state状态,ReentrantReadWriteLock巧妙地使用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。

  1. static final int SHARED_SHIFT = 16;
  2. // 读锁状态值65536
  3. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  4. // 写锁状态值65535
  5. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  6. // 写锁掩码,二进制,16个1
  7. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  8. // 读锁线程数
  9. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  10. // 写锁可重入数
  11. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

读写锁中的firstReader用来记录第一个获取到读锁的线程,firstReaderHoldCount记录第一个和获取到读锁的线程获取读锁的可重入次数。HoldCounter类型的cachedHoldCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数。

  1. static final class HoldCounter {
  2. int count = 0;
  3. // Use id, not reference, to avoid garbage retention
  4. final long tid = getThreadId(Thread.currentThread());
  5. }

readHolds是ThreadLocal变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。ThreadLocalHoldCounter继承了ThreadLocal,因此initialValue方法返回一个HoldCounter对象。

  1. static final class ThreadLocalHoldCounter
  2. extends ThreadLocal<HoldCounter> {
  3. public HoldCounter initialValue() {
  4. return new HoldCounter();
  5. }
  6. }

写锁的获取与释放

void lock()

写锁与写锁、写锁与读锁是互斥的,如果当前已经有线程获取了读锁或写锁,则请求获取写锁的线程会被阻塞挂起。写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单地把可重入次数加1后返回。

  1. public void lock() {
  2. sync.acquire(1);
  3. }
  4. public final void acquire(int arg) {
  5. if (!tryAcquire(arg) &&
  6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  7. selfInterrupt();
  8. }

lock()内部调用了acquire方法,其中tryAcquire是ReentrantReadWriteLock内部的Sync类重写的。

  1. protected final boolean tryAcquire(int acquires) {
  2. Thread current = Thread.currentThread();
  3. int c = getState(); // 总状态
  4. int w = exclusiveCount(c); // 读锁状态
  5. // (1)c!=0说明读锁或写锁已经被获取
  6. if (c != 0) {
  7. //(2)w==0说明已经有线程获取了读锁,w!=0并且当前线程不是写锁拥有者,则返回false
  8. if (w == 0 || current != getExclusiveOwnerThread())
  9. return false;
  10. // (3)当前线程已经获取了写锁,判断可重入次数
  11. if (w + exclusiveCount(acquires) > MAX_COUNT)
  12. throw new Error("Maximum lock count exceeded");
  13. // (4)设置可重入次数
  14. setState(c + acquires);
  15. return true;
  16. }
  17. // (5)c==0说明锁还没有被获取,此处第一次获取
  18. if (writerShouldBlock() ||
  19. !compareAndSetState(c, c + acquires))
  20. return false;
  21. setExclusiveOwnerThread(current);
  22. return true;
  23. }

(1)如果当前AQS状态值不为0,说明当前已经有线程获取到了读锁或写锁。如果w==0说明state的低16位为0,而state不为0,那么高16位必不为0,说明有线程获取了读锁,所以直接返回false(读写互斥,保障数据一致性)。

(2)如果w!=0说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,不是则返回false。

执行到(3)说明当前线程已经获取到了该锁,所以判断该线程的可重入次数是否超过了最大值,是则抛出异常,否则执行(4)增加可重入次数。

如果state为0说明目前没有线程获取到读锁和写锁,所以执行(5)。对于writerShouldBlock(),非公平锁的实现为

  1. final boolean writerShouldBlock() {
  2. return false;
  3. }

说明(5)抢占式地执行CAS尝试获取写锁。

公平锁的实现为

  1. final boolean writerShouldBlock() {
  2. return hasQueuedPredecessors();
  3. }

还是使用hasQueuedPredecessors来判断当前线程节点是否有前驱节点。

void lockInterruptibly()

会对中断进行相应

  1. public void lockInterruptibly() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }

boolean tryLock()

非阻塞方法,尝试获取写锁,如果当前没有其他线程持有读锁或写锁,则当前线程获取写锁并返回true,否则返回false。如果当前线程已经持有了该写锁则增加state的值并返回true。

  1. public boolean tryLock( ) {
  2. return sync.tryWriteLock();
  3. }
  4. final boolean tryWriteLock() {
  5. Thread current = Thread.currentThread();
  6. int c = getState();
  7. if (c != 0) {
  8. int w = exclusiveCount(c);
  9. if (w == 0 || current != getExclusiveOwnerThread())
  10. return false;
  11. if (w == MAX_COUNT)
  12. throw new Error("Maximum lock count exceeded");
  13. }
  14. // 非公平策略
  15. if (!compareAndSetState(c, c + 1))
  16. return false;
  17. setExclusiveOwnerThread(current);
  18. return true;
  19. }

此处代码于tryAcquire方法类似,只是使用了非公平策略。

void unlock()

使state减1,如果减1后state为0,则当前线程会释放锁。

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. public final boolean release(int arg) {
  5. if (tryRelease(arg)) {
  6. Node h = head;
  7. // 激活AQS队列里面的一个线程
  8. if (h != null && h.waitStatus != 0)
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }
  14. protected final boolean tryRelease(int releases) {
  15. // 检查是否使锁持有者调用的unlock
  16. if (!isHeldExclusively())
  17. throw new IllegalMonitorStateException();
  18. // 获取可重入值,没有考虑高16位,因为获取写锁时读锁状态值 肯定为。
  19. int nextc = getState() - releases;
  20. boolean free = exclusiveCount(nextc) == 0;
  21. if (free)
  22. setExclusiveOwnerThread(null);
  23. setState(nextc);
  24. return free;
  25. }

读锁的获取与释放

读锁通过ReadLock来实现。

void lock()

获取的锁,如果写锁没有被其他线程持有,则可以获取读锁,并将state的高16位加1;否则阻塞。

  1. public void lock() {
  2. sync.acquireShared(1);
  3. }
  4. // 来自AQS
  5. public final void acquireShared(int arg) {
  6. if (tryAcquireShared(arg) < 0)
  7. doAcquireShared(arg);
  8. }

lock方法调用了AQS的acquireShared方法,其内部又调用了Sync重写的tryAcquireShared方法。

  1. protected final int tryAcquireShared(int unused) {
  2. Thread current = Thread.currentThread();
  3. int c = getState();
  4. // 如果有其他线程获取了写锁,返回-1
  5. // 如果写锁被当前线程持有,那么也可以获取读锁,因为同一个线程同时最多只能执行读或写中的一个操作
  6. if (exclusiveCount(c) != 0 &&
  7. getExclusiveOwnerThread() != current)
  8. return -1;
  9. int r = sharedCount(c);
  10. // 公平策略
  11. if (!readerShouldBlock() &&
  12. r < MAX_COUNT &&
  13. compareAndSetState(c, c + SHARED_UNIT)) {
  14. // 读锁被第一次获取
  15. if (r == 0) {
  16. firstReader = current;
  17. firstReaderHoldCount = 1;
  18. // 读锁被获取过,且当前线程就是第一次获取读锁的线程
  19. } else if (firstReader == current) {
  20. firstReaderHoldCount++;
  21. } else {
  22. // 记录最后一个获取读锁的线程或记录其他线程读锁的可重入次数
  23. HoldCounter rh = cachedHoldCounter;
  24. // 如果rh为空或者rh不是当前线程,需要通过get方法创建一个新的HoldCounter用来记录当前线程的可重入次数
  25. // 并将其设为cachedHoldCounter
  26. if (rh == null || rh.tid != getThreadId(current))
  27. cachedHoldCounter = rh = readHolds.get();
  28. // 运行到此处说明当前线程已经被设为最后一个获取读锁的线程,rh.count==0说明当前线程已经完全释放了读锁,
  29. // 现在又要获取读锁,需要更新自己对应的HoldCounter
  30. else if (rh.count == 0)
  31. readHolds.set(rh);
  32. // 增加重入数
  33. rh.count++;
  34. }
  35. return 1;
  36. }
  37. // 尝试一次失败后自旋获取
  38. return fullTryAcquireShared(current);
  39. }

代码中readerShouldBlock用于决定代码公平与否。非公平锁的实现如下。

  1. final boolean readerShouldBlock() {
  2. return apparentlyFirstQueuedIsExclusive();
  3. }
  4. final boolean apparentlyFirstQueuedIsExclusive() {
  5. Node h, s;
  6. return (h = head) != null &&
  7. (s = h.next) != null &&
  8. !s.isShared() &&
  9. s.thread != null;
  10. }

仅当AQS队列存在元素且第一个元素在尝试获取写锁时才会阻塞当前线程,否则就算有线程在尝试获取读锁也不会让步(非公平性的体现)。

void unlock()

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }

具体操作委托给sync。

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }
  8. protected final boolean tryReleaseShared(int unused) {
  9. Thread current = Thread.currentThread();
  10. ...
  11. for (;;) {
  12. int c = getState();
  13. int nextc = c - SHARED_UNIT;
  14. if (compareAndSetState(c, nextc))
  15. return nextc == 0;
  16. }
  17. }

将state减去一个单位,如果结果为0,则返回true,调用doReleaseShared方法释放一个由于获取读锁而被阻塞的线程;如果不为0,说明仍有线程持有读锁,返回false。

案例介绍

下面基于ReentrantLock实现线程安全的list,适用于读多写少的情况

  1. public class ReentrantLockList {
  2. private ArrayList<String> array = new ArrayList<>();
  3. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  4. private final Lock readLock;
  5. private final Lock writeLock;
  6. public ReentrantLockList() {
  7. readLock = lock.readLock();
  8. writeLock = lock.writeLock();
  9. }
  10. public void add(String e) {
  11. writeLock.lock();
  12. try{
  13. array.add(e);
  14. }finally {
  15. writeLock.unlock();
  16. }
  17. }
  18. public void remove(String e) {
  19. writeLock.lock();
  20. try{
  21. array.remove(e);
  22. }finally {
  23. writeLock.unlock();
  24. }
  25. }
  26. public String get(int index) {
  27. readLock.lock();
  28. try{
  29. return array.get(index);
  30. }finally {
  31. readLock.unlock();
  32. }
  33. }
  34. public int size() {
  35. return array.size();
  36. }
  37. }

更多

相关笔记:《Java并发编程之美》阅读笔记