

线程锁技术:Lock & Condition 实现线程同步通信所属包:java.util.concurrent.locks

线程锁 说明
Synchronized 同步方法,锁对象是this;同步静态方法,锁对象是字节码.class;同步代码块,锁对象是任意对象,但必须是同一个对象
Lock 同步锁接口
ReentrantLock lock(),unlock(),newCondition()
ReadWriteLock 读写锁接口
ReentrantReadWriteLock readLock()获取读锁,writeLock()获取写锁
Condition 线程间通信 await()等待 signal()唤醒

1. Lock

Lock比传统线程模型中的synchronized方式更加面向对象,相对于synchronized 方法和语句它具有更广泛的锁定操作,此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。



方法声明 功能描述
lock() 获取锁
tryLock() 尝试获取锁
unock() 释放锁
newCondition() 获取锁的Condition


  1. Lock lock = new ReentrantLock();
  2. public void doSth(){
  3. lock.lock();
  4. try {
  5. // 执行某些操作
  6. }finally {
  7. lock.unlock();
  8. }
  9. }




  • 如果代码只读数据,就可以很多人共同读取,但不能同时写。
  • 如果代码修改数据,只能有一个人在写,且不能同时读数据。


  1. class CachedData {
  2.   Object data;
  3. volatile boolean cacheValid;
  4.   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5.   void processCachedData() {
  6.    rwl.readLock().lock();
  7.    if (!cacheValid) {
  8.    // Must release read lock before acquiring write lock
  9.    rwl.readLock().unlock();
  10.    rwl.writeLock().lock();
  11.    // Recheck state because another thread might have acquired
  12.     // write lock and changed state before we did.
  13.    if (!cacheValid) {
  14.    data = ...
  15.    cacheValid = true;
  16.    }
  17.    // Downgrade by acquiring read lock before releasing write lock
  18. rwl.readLock().lock();
  19.    rwl.writeLock().unlock(); // Unlock write, still hold read
  20.    }
  21.    use(data);
  22.    rwl.readLock().unlock();
  23.   }
  24. }




  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.locks.ReadWriteLock;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. public class CacheDemo {
  6. //定义一个map用于缓存对象
  7. private Map<String, Object> cache = new HashMap<String, Object>();
  8. //获取一个读写锁对象
  9. private ReadWriteLock rwl = new ReentrantReadWriteLock();
  10. //带有缓存的获取指定值的方法
  11. public Object getData(String key){
  12. rwl.readLock().lock(); //上读锁
  13. Object value = null;
  14. try{
  15. value = cache.get(key); //获取要查询的值
  16. if(value == null){ //线程出现安全问题的地方
  17. rwl.readLock().unlock(); //没有数据,释放读锁,上写锁
  18. // 多个线程去上写锁,第一个上成功后,其他线程阻塞,第一个线程开始执行下面的代码,最后
  19. // 释放写锁后,后面的线程继续上写锁,为了避免后面的线程重复写入,进行二次判断
  20. rwl.writeLock().lock();
  21. try{
  22. if(value==null){ //二次判断,防止其他线程重复写数据
  23. value = "aaaa"; //实际是去查询数据库
  24. }
  25. }finally{
  26. rwl.writeLock().unlock(); //写完数据,释放写锁
  27. }
  28. rwl.readLock().lock(); //恢复读锁
  29. }
  30. }finally{
  31. rwl.readLock().unlock(); //最终释放读锁
  32. }
  33. return value; //返回获取到的值
  34. }
  35. }


  1. Lock lock = new ReentrantLock();
  2. try {
  3. lock.lock();
  4. //需要加锁的代码
  5. }finally {
  6. lock.unlock();
  7. }


  1. public class ReadWriteLockTest {
  2. public static void main(String[] args) {
  3. final Queue3 q3 = new Queue3();
  4. for(int i=0;i<3;i++)
  5. {
  6. new Thread(){
  7. public void run(){
  8. while(true){
  9. q3.get();
  10. }
  11. }
  12. }.start();
  13. new Thread(){
  14. public void run(){
  15. while(true){
  16. q3.put(new Random().nextInt(10000));
  17. }
  18. }
  19. }.start();
  20. }
  21. }
  22. }
  23. class Queue3{
  24. private Object data = null;
  25. ReadWriteLock rwl = new ReentrantReadWriteLock
  26. ();
  27. public void get(){
  28. rwl.readLock().lock();
  29. try {
  30. System.out.println(Thread.currentThread().getName() + " be ready to read data!");
  31. Thread.sleep((long)(Math.random()*1000));
  32. System.out.println(Thread.currentThread().getName() + "have read data :" + data);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }finally{
  36. rwl.readLock().unlock();
  37. }
  38. }
  39. public void put(Object data){
  40. rwl.writeLock().lock();
  41. try {
  42. System.out.println(Thread.currentThread().getName() + " be ready to write data!");
  43. Thread.sleep((long)(Math.random()*1000));
  44. this.data = data;
  45. System.out.println(Thread.currentThread().getName() + " have write data: " + data);
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }finally{
  49. rwl.writeLock().unlock();
  50. }
  51. }
  52. }
  1. Thread-0 be ready to read data!
  2. Thread-2 be ready to read data!
  3. Thread-4 be ready to read data!
  4. Thread-0have read data :null
  5. Thread-2have read data :null
  6. Thread-4have read data :null
  7. Thread-5 be ready to write data!
  8. Thread-5 have write data: 7975
  9. Thread-5 be ready to write data!
  10. Thread-5 have write data: 9832
  11. Thread-3 be ready to write data!
  12. Thread-3 have write data: 2813
  13. Thread-3 be ready to write data!
  14. Thread-3 have write data: 7998
  15. Thread-1 be ready to write data!
  16. Thread-1 have write data: 6737
  17. Thread-1 be ready to write data!
  18. ...

2. Condition


Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法wait和notify的使用


在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。

方法声明 功能描述
await() 线程等待
await(long time, TimeUnit unit) 线程等待特定的时间,超过等待时间则为超时
signal() 随机唤醒某个等待线程
signalAll() 唤醒所有等待中的线程


说明:该应用是 java.util.concurrent.locks包中Condition接口中的示例代码。使用了两个Condition分别用于管理取数据的线程,和存数据的线程,这样就可以明确的唤醒需要的一类线程,如果使用一个Condition,当队列满了之后,唤醒的并不一定就是取数据的线程

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length) //循环判断队列是否已存满
  11. notFull.await(); //如果队列存满了,则要存入数据的线程等待
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;//当队列放满,指针回到0
  14. ++count; //添加了一个数据
  15. notEmpty.signal(); //队列中有数据了,所以就唤醒取数据的线程
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0) //循环判断,队列是否有空位
  24. notEmpty.await(); //要取的线程等待
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count; //取走一个,说明队列有空闲的位置,
  28. notFull.signal(); //所以通知存入的线程
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }


  1. public class ConditionCommunication {
  2. public static void main(String[] args) {
  3. final Business business = new Business();
  4. new Thread(
  5. new Runnable() {
  6. @Override
  7. public void run() {
  8. for(int i=1;i<=5;i++){
  9. business.sub(i);
  10. }
  11. }
  12. }
  13. ).start();
  14. for(int i=1;i<=5;i++){
  15. business.main(i);
  16. }
  17. }
  18. class Business {
  19. Lock lock = new ReentrantLock();
  20. Condition condition = lock.newCondition();
  21. private boolean bShouldSub = true;
  22. public void sub(int i){
  23. lock.lock();
  24. try{
  25. while(!bShouldSub){
  26. try {
  27. condition.await();
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. for(int j=1;j<=2;j++){
  33. System.out.println("sub thread sequence of " + j + ",loop of " + i);
  34. }
  35. bShouldSub = false;
  36. condition.signal();
  37. }finally{
  38. lock.unlock();
  39. }
  40. }
  41. public void main(int i){
  42. lock.lock();
  43. try{
  44. while(bShouldSub){
  45. try {
  46. condition.await();
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. for(int j=1;j<=4;j++){
  52. System.out.println("main thread sequence of " + j + ",loop of " + i);
  53. }
  54. bShouldSub = true;
  55. condition.signal();
  56. }finally{
  57. lock.unlock();
  58. }
  59. }
  60. }
  61. }


  1. sub thread sequence of 1,loop of 1
  2. sub thread sequence of 2,loop of 1
  3. main thread sequence of 1,loop of 1
  4. main thread sequence of 2,loop of 1
  5. main thread sequence of 3,loop of 1
  6. main thread sequence of 4,loop of 1
  7. sub thread sequence of 1,loop of 2
  8. sub thread sequence of 2,loop of 2
  9. main thread sequence of 1,loop of 2
  10. main thread sequence of 2,loop of 2
  11. main thread sequence of 3,loop of 2
  12. main thread sequence of 4,loop of 2
  13. sub thread sequence of 1,loop of 3
  14. sub thread sequence of 2,loop of 3
  15. main thread sequence of 1,loop of 3
  16. main thread sequence of 2,loop of 3
  17. main thread sequence of 3,loop of 3
  18. main thread sequence of 4,loop of 3
  19. sub thread sequence of 1,loop of 4
  20. sub thread sequence of 2,loop of 4
  21. main thread sequence of 1,loop of 4
  22. main thread sequence of 2,loop of 4
  23. main thread sequence of 3,loop of 4
  24. main thread sequence of 4,loop of 4
  25. sub thread sequence of 1,loop of 5
  26. sub thread sequence of 2,loop of 5
  27. main thread sequence of 1,loop of 5
  28. main thread sequence of 2,loop of 5
  29. main thread sequence of 3,loop of 5
  30. main thread sequence of 4,loop of 5


  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class MyArrayBlockingQueue<T> {
  5. // 数据数组
  6. private final T[] items;
  7. // 锁
  8. private final Lock lock = new ReentrantLock();
  9. // 队满的条件
  10. private Condition notFull = lock.newCondition();
  11. // 队空条件
  12. private Condition notEmpty = lock.newCondition();
  13. // 头部索引
  14. private int head;
  15. // 尾部索引
  16. private int tail;
  17. // 数据的个数
  18. private int count;
  19. public MyArrayBlockingQueue(int maxSize) {
  20. items = (T[]) new Object[maxSize];
  21. }
  22. public MyArrayBlockingQueue() {
  23. this(10);
  24. }
  25. public void put(T t) {
  26. lock.lock();
  27. try {
  28. while (count == getCapacity()) {
  29. System.out.println("数据已满,等待");
  30. notFull.await();
  31. }
  32. items[tail] = t;
  33. if (++tail == getCapacity()) {
  34. tail = 0;
  35. }
  36. ++count;
  37. notEmpty.signalAll(); // 唤醒等待数据的线程
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. public T take() {
  45. lock.lock();
  46. try {
  47. while (count == 0) {
  48. System.out.println("还没有数据,请等待");
  49. notEmpty.await();
  50. }
  51. T ret = items[head];
  52. items[head] = null;
  53. if (++head == getCapacity()) {
  54. head = 0;
  55. }
  56. --count;
  57. notFull.signalAll(); // 唤醒添加数据的线程
  58. return ret;
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. } finally {
  62. lock.unlock();
  63. }
  64. return null;
  65. }
  66. public int getCapacity() {
  67. return items.length;
  68. }
  69. public int size() {
  70. lock.lock();
  71. try {
  72. return count;
  73. } finally {
  74. lock.unlock();
  75. }
  76. }
  77. public static void main(String[] args) {
  78. MyArrayBlockingQueue<Integer> aQueue = new MyArrayBlockingQueue<Integer>();
  79. aQueue.put(3);
  80. aQueue.put(24);
  81. for (int i = 0; i < 5; i++) {
  82. System.out.println(aQueue.take());
  83. }
  84. }
  85. }


  1. 3
  2. 24
  3. 还没有数据,请等待

3. Condition练习

一共有3个线程,两个子线程先后循环2次,接着主线程循环3次,接着又回到两 个子线程先后循环2次,再回到主线程又循环3次,如此循环5次。



解决方案:可以将一个文件中的那个同名外部类放进类中,但是静态不能创建内部类的实例对象,所以需要加上static,这样两个类的名称就不一样了。 一个是原来的类名,一个是在自己类名前面加上外部类的类名。

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class ThreeConditionCommunication {
  5. public static void main(String[] args){
  6. final Business business = new Business();
  7.    //创建并启动子线程老二
  8. new Thread(new Runnable(){
  9. @Override
  10. public void run() {
  11. for(int i=1;i<=5;i++){
  12. business.sub2(i);
  13. }
  14. }
  15. }).start();
  16.    //创建并启动子线程老三
  17. new Thread(new Runnable(){
  18. @Override
  19. public void run() {
  20. for(int i=1;i<=5;i++){
  21. business.sub3(i);
  22. }
  23. }
  24. }).start();
  25. //主线程
  26. for(int i=1;i<=5;i++){
  27. business.main(i);
  28. }
  29. }
  30. static class Business{
  31. Lock lock = new ReentrantLock();
  32. Condition condition1 = lock.newCondition();
  33. Condition condition2 = lock.newCondition();
  34. Condition condition3 = lock.newCondition();
  35. //定义一个变量来决定线程的执行权
  36. private int ShouldSub = 1;
  37. public void sub2(int i){
  38. //上锁,不让其他线程执行
  39. lock.lock();
  40. try{
  41. if(ShouldSub != 2){ //如果不该老二执行,就等待
  42. try {
  43. condition2.await();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. for(int j=1;j<=2;j++){
  49. System.out.println("sub thread sequence of"+i+",loop of "+j);
  50. }
  51. ShouldSub = 3; //准备让老三执行
  52. condition3.signal(); //唤醒老三
  53. }finally{
  54. lock.unlock();
  55. }
  56. }
  57. public void sub3(int i){
  58. lock.lock();
  59. try{
  60. if(ShouldSub != 3){
  61. try {
  62. condition3.await();
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. for(int j=1;j<=2;j++){
  68. System.out.println("sub2 thread sequence of"+i+",loop of "+j);
  69. }
  70. ShouldSub = 1; //准备让老大执行
  71. condition1.signal(); //唤醒老大
  72. }finally{
  73. lock.unlock();
  74. }
  75. }
  76.    //主线程
  77. public void main(int i){
  78. lock.lock();
  79. try{
  80. if(ShouldSub!=1){
  81. try {
  82. condition1.await();
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. }
  87. for(int j=1;j<=3;j++){
  88. System.out.println("main thread sequence of"+i+", loop of "+j);
  89. }
  90. ShouldSub = 2; //准备让老二执行
  91. condition2.signal(); //唤醒老二
  92. }finally{
  93. lock.unlock();
  94. }
  95. }
  96. }
  97. }


  1. main thread sequence of1, loop of 1
  2. main thread sequence of1, loop of 2
  3. main thread sequence of1, loop of 3
  4. sub thread sequence of1,loop of 1
  5. sub thread sequence of1,loop of 2
  6. sub2 thread sequence of1,loop of 1
  7. sub2 thread sequence of1,loop of 2
  8. main thread sequence of2, loop of 1
  9. main thread sequence of2, loop of 2
  10. main thread sequence of2, loop of 3
  11. sub thread sequence of2,loop of 1
  12. sub thread sequence of2,loop of 2
  13. sub2 thread sequence of2,loop of 1
  14. sub2 thread sequence of2,loop of 2
  15. main thread sequence of3, loop of 1
  16. main thread sequence of3, loop of 2
  17. main thread sequence of3, loop of 3
  18. sub thread sequence of3,loop of 1
  19. sub thread sequence of3,loop of 2
  20. sub2 thread sequence of3,loop of 1
  21. sub2 thread sequence of3,loop of 2
  22. main thread sequence of4, loop of 1
  23. main thread sequence of4, loop of 2
  24. main thread sequence of4, loop of 3
  25. sub thread sequence of4,loop of 1
  26. sub thread sequence of4,loop of 2
  27. sub2 thread sequence of4,loop of 1
  28. sub2 thread sequence of4,loop of 2
  29. main thread sequence of5, loop of 1
  30. main thread sequence of5, loop of 2
  31. main thread sequence of5, loop of 3
  32. sub thread sequence of5,loop of 1
  33. sub thread sequence of5,loop of 2
  34. sub2 thread sequence of5,loop of 1
  35. sub2 thread sequence of5,loop of 2

4. 多路等待和通知

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length)
  11. notFull.await();
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;
  14. ++count;
  15. notEmpty.signal();
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0)
  24. notEmpty.await();
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count;
  28. notFull.signal();
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }