线程池
- 线程池在系统启动时即创建大量空闲的线程,程序将一个 Runnable 对象或 Callable 对象传给线程池,线程池就会启动一个线程来执行它们的
run()
或call()
方法 - 当
run()
或call()
方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个 Runnable 对象或 Callable 对象的run()
或call()
方法 - 线程池的组成部分
- 线程管理池(ThreadPool):用于创建并管理线程池,包括创建,销毁,添加新任务
- 工作线程(PoolWorker):线程池中的线程在没有任务的时候处于等待状态,可以循环的执行任务
- 任务接口(Task):每个任务必须实现接口,用来提供工作线程调度任务的执行,规定了任务的入口以及执行结束的收尾工作和任务的执行状态等
- 任务队列(TaskQueue):用于存放没有处理的任务,提供一种缓存机制
- 线程池的使用:1. 创建线程池;2. 向线程池提交任务;3. 关闭线程池
- 如何确定合适数量的线程?对于计算型任务,cpu 数量的 1~2 倍;对于 IO 型任务,则需多一些线程,要根据具体的 IO 阻塞时长进行考量决定
Executors 工具类
- 创建线程池的静态方法
ExecutorService newCachedThreadPool()
:创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中(无界线程池,已有 60 秒钟未被使用的线程会被终止并从缓存中移除)new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
ExecutorService newFixedThreadPool(int nThreads)
:创建一个可重用的、具有固定核心线程数的线程池new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
ExecutorService newSingleThreadExecutor()
:创建一个只有单线程的线程池new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
:创建一个线程池,可在指定延迟后执行或定期执行线程任务new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())
SynchronousQueue 不储存元素的阻塞队列;LinkedBlockingQueue 的默认容量为 Integer.MAX_VALUE
ExecutorService 接口
- Executor 的子接口,代表尽快执行线程的线程池(只要线程池中有空闲线程,就立即执行线程任务),方法
void execute(Runnable command)
:在未来某个时间执行给定的命令Future<?> submit(Runnable task)
:将一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Runnable 对象代表的任务,其中 Future 对象代表 Runnable 任务的返回值——但 run () 方法没有返回值,所以 Future 对象将在run()
方法执行结束后返回 null,但可以调用 Future 的isDone()
、isCancelled()
方法来获得 Runnable 对象的执行状态<T> Future<T> submit(Runnable task, T result)
:将一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Runnable 对象代表的任务,其中 result 显式指定线程执行结束后的返回值,所以 Future 对象将在run()
方法执行结束后返回 resultFuture<T> submit(Callable<T> task)
:将一个 Callable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Callable 对象代表的任务,其中 Future 代表 Callable 对象里call()
方法的返回值List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
:执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表T invokeAny(Collection<? extends Callable<T>> tasks)
:执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果void shutdown()
:启动线程池的关闭序列,调用该方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成,当线程池中的所有任务都执行完成后,池中的所有线程都会死亡List<Runnable> shutdownNow()
:试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
ScheduledExecutorService 接口
- ExecutorService 的子接口,代表可在指定延迟后或周期性地执行线程任务的线程池,方法
ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
:指定 callable 任务将在 delay 延迟后执行ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
:指定 command 任务将在 delay 延迟后执行ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:指定 command 任务将在 delay 延迟后执行,而且以设定频率重复执行(在 initialDelay 后开始执行,依次在 initialDelay+period、initialDelay+2*period… 处重复执行)ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟(如果任务在任一次执行时遇到异常,就会取消后续执行;否则,只能通过程序来显式取消或终止该任务)
ThreadPoolExecutor
线程池的实现类
构造器ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 构造器中几个参数的含义:
- corePoolSize(线程池基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建(如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程)
- maximumPoolSize(线程池最大线程数):线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
- keepAliveTime(线程保持活动的时间):当线程数大于 corePoolSize 时,线程空闲之后保持存活的时间
- TimeUnit(线程保持活动时间的单位):可以使用 TimeUnit 时间单位来设置
- BlockingQueue(任务队列):用于保存等待执行的任务的阻塞队列,可以选择以下几个:
- ArrayBlockingQueue:基于数组的阻塞队列,按照 FIFO 原则进行排序
- LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。吞吐量通常要高于 ArrayBlockingQueue,Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 使用了这个队列
- SynchronousQueue:一个不储存元素的阻塞队列,每一个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于 LinkedBlockingQueue,Executors.newCachedThreadPool() 使用了这个队列
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列
- ThreadFactory(线程工厂):用于设置创建线程的工厂
- RejectedExecutionHandler(饱和策略):这个本身是 Java 的一个接口,当队列和线程池都满了,需要一种策略处理新的任务,在这个类的最下部提供了四种内置的实现类:
- AbortPolicy:直接抛出异常(默认策略)
- CallerRunsPolicy:只用调用者所在的线程来运行任务
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前的任务
- DiscardPolicy:不处理,直接丢弃
- 自定义策略:实现 RejectedExecutionHandler 接口,自定义策略(如记录日志或持久化不能处理的任务)
提交一个新任务到线程池时,线程池的处理流程如下:
- 判断当前线程池的线程数是否小于设置的线程池基本大小,小于,则创建新线程来执行任务,否则
- 判断任务队列是否满了,没满,则将新提交的任务放到任务队列中,否则
- 判断当前线程池的线程数是否小于设置的线程池最大线程数,小于,则创建新线程来执行任务,否则
- 交给饱和策略来处理新任务
ScheduledThreadPoolExecutor
- ThreadPoolExecutor 的子类
- 构造器ScheduledThreadPoolExecutor(int corePoolSize)ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
通过 ThreadPoolExecutor 的方式创建线程池
- 方式 1:使用 com.google.guava 包
// com.google.common.util.concurrent.ThreadFactoryBuilder
// 设置线程名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
// 创建线程池
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
// 向线程池提交任务
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
Future<String> future = pool.submit(() -> Thread.currentThread().getName());
// 关闭线程池
pool.shutdown(); // gracefully shutdown
- 方式 2:使用 commons-lang3 包
// org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
- 方式 3:使用 Spring 提供的 ThreadPoolTaskExecutor 和 ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5); // 默认为 1
pool.setMaxPoolSize(50); // 默认为 Integer.MAX_VALUE
pool.setQueueCapacity(1000);
pool.setThreadNamePrefix("taskExecutor-");
// 当线程池关闭时等待当前被调度的任务完成
pool.setWaitForTasksToCompleteOnShutdown(true); // 默认为 false
return pool;
}
JUC
原子类
- java.util.concurrent.atomic
- AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新
- LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator
CAS 机制
- CAS(Compare and Swap),即比较并替换,是一种实现并发算法时常用到的技术,Java 并发包中的很多类都使用了 CAS 技术
- CAS 使用了 3 个基本操作数:内存地址 V,旧的预期值 A,要修改的新值 B
- CAS 指令执行时,当且仅当内存地址 V 中的实际值与预期值 A 相等时,将内存地址 V 的值修改为 B,否则就什么都不做
- 整个比较并替换的操作是一个原子操作(利用 Unsafe 提供的原子性操作方法)
- CAS 的缺点:
- 循环时间长开销很大
- 只能保证一个共享变量的原子操作
- ABA 问题: 解决方法:通过控制变量值的版本来保证 CAS 的正确性,如 AtomicStampedReference
并发容器类
- ConcurrentHashMap 通常优于同步的 HashMap
- ConcurrentSkipListMap 通常优于同步的 TreeMap
- 当列表读多写少时,CopyOnWriteArrayList 优于同步的 ArrayList
- ArrayBlockingQueue、SynchronousQueue
ConCurrentHashMap 原理
- jdk1.7 使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁(Segment 继承了 ReentrantLock),当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
- 共享变量都使用了 volatile 修饰
- get 方法不需要加锁,除非读到的 value 是 null 时才会加锁重读
- put 方法首先定位到 Segment,然后在 Segment 里进行插入操作
- jdk1.8 以后采用了 CAS + synchronized 来保证并发安全性
- 不允许 null 作为 key 或 value
CopyOnWriteArrayList 原理
- 数组 array 都使用了 volatile 修饰
- 当向容器添加、删除或修改元素的时候,先将当前容器进行 Copy,复制出一个新的容器,然后在新的容器里添加删除或修改元素,添加、删除或修改完元素之后,再将原容器的引用指向新的容器,整个过程加锁,保证了写的线程安全
- 读操作不需要获得锁
BlockingQueue 阻塞队列
- 支持两个附加操作的 Queue:获取元素时等待队列变为非空,存储元素时等待队列变得可用
- 已知实现类:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
对于不能立即满足但可能在将来某一时刻可以满足的操作,BlockingQueue 的方法的处理方式有四种:1. 抛出异常;2. 返回特殊值(null 或 false,具体取决于操作);3. 在操作可以成功前,无限期地阻塞当前线程;4. 在放弃前只在给定的最大时间限制内阻塞
安全失败(fail-safe)
- 采用安全失败机制的集合容器,在遍历时不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历
- java.util.concurrent 包下的容器都是安全失败,可以在多线程下并发使用,并发修改
CountDownLatch 计数器
- 一个同步辅助类,在完成一组正在其它线程中执行的操作之前,让一个或多个线程一直等待
- 构造器 CountDownLatch(int count):构造一个用给定计数初始化的 CountDownLatch
- 实例方法
void await()
:使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断boolean await(long timeout, TimeUnit unit)
void countDown()
:递减锁存器的计数,如果计数到达零,则释放所有等待的线程long getCount()
:返回当前计数
CyclicBarrier 栅栏
- 一个同步辅助类,让一组线程互相等待,直到所有参与者(线程)都到达某个公共屏障点(common barrier point),该 barrier 在释放等待线程后可以重用
- 构造器
- CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动
- CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
- 实例方法
int await()
:在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待int await(long timeout, TimeUnit unit)
int getNumberWaiting()
int getParties()
Semaphore 信号量
- 一般用于控制对某组资源的访问权限
- 可以控制同时访问的线程个数(限流)
- 通过 acquire() 获取一个许可,如果没有就等待,直到获取许可为止;通过 release() 释放一个许可
AQS 框架
- AQS(AbstractQueuedSynchronizer.java)是 Java 实现同步锁的基础框架
- AQS 工作原理:用 volatile int state 变量来表示当前同步锁的状态,用 getState() 获取同步锁的状态,用
compareAndSetState(int expect, int update)
来对 state 的状态进行修改
ReentrantLock 的实现
获取锁
- lock() 方法:首先尝试 CAS 获得锁
compareAndSetState(0, 1)
,成功则把占有锁的线程设置为当前线程,返回,失败则调用 acquire 方法 - acquire 方法为 AQS 实现的模板方法,它调用 tryAcquire 方法尝试获得锁,成功则返回,不成功则进入等待队列直至获取成功
- tryAcquire() 方法:查询当前 state 值,如果为 0 则说明当前锁还未被其他线程获取,则尝试 CAS 获得锁,成功则把占有锁的线程设置为当前线程,返回 true,失败返回 false;如果 state 不为 0 则说明该锁已经被其他线程获取,则检查获得锁的线程是否是当前线程以实现可重入特性,如果是,则更新 state 的值,并返回 true
- lock() 方法:首先尝试 CAS 获得锁
释放锁,unlock() 直接调用 tryRelease() 方法:
- 首先检查当前释放锁的线程,如果不是已占有锁的线程则抛出异常,因为 ReentrantLock 是独占式锁,释放锁的线程一定是占有锁的线程
- 将 state 值减 1,判断:如果 state 值等于 0 的,说明获取锁的所有方法都已经返回,则锁释放成功;如果 state 值不等于 0,说明只是部分递归的方法返回,部分递归方法还未返回,则释放失败,锁依然被占有
在并发环境下,获取锁和释放锁需要以下三个部件的协调:
- 锁状态:state 为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁;如果锁重入,state 进行 +1,释放锁就 -1,直到 state 又变为 0,代表释放锁,然后唤醒等待队列中的第一个线程,让其来占有锁
- 线程的阻塞和解除阻塞:AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程
- 等待队列:AQS 的等待队列基于是双向链表实现的,每个线程被包装成一个 Node 实例(属性:thread、waitStatus、pre、next),其中 head 节点不关联线程
非公平锁 NonfairSync 和公平锁 Sync 的不同之处:
- 非公平锁在调用 lock() 后,首先就会调用 CAS 进行一次抢锁,如果这个时候锁没有被占用,就直接获取到锁并返回
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现这个时候锁被释放了(state == 0),非公平锁会直接 CAS 抢锁,而公平锁会通过 hasQueuedPredecessors() 判断等待队列是否有线程处于等待状态,如果有则将自己放在等待队列的队尾
锁的类型
乐观锁
- 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据:采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作
- Java 中的乐观锁基本都是通过 CAS 操作实现的,CAS 是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败
悲观锁
- 悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会 block 直到拿到锁
- Java 中的悲观锁就是 synchronized,AQS 框架下的锁则是先尝试 CAS 乐观锁去获取锁,获取不到,才会转换为悲观锁,如 RetreenLock
Java 对象的数据结构
- 在 HotSpot 虚拟机中,Java 对象在内存中存储的布局可以分为 3 块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)
- 对象头中的 Mark Word 部分,用于存储对象自身的运行时数据,如哈希码(HashCode)、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等,其中的最后 2bit 是锁状态标志位(无锁、偏向锁、轻量级锁、重量级锁、GC 标识)
Java 中的锁
- JVM 从 1.5 开始,引入了偏向锁与轻量级锁,默认启用了自旋锁,它们都属于乐观锁
偏向锁
- 如果在运行过程中,同步锁只有当前线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁
- 如果在运行过程中,遇到了其它线程抢占锁,则持有偏向锁的线程会被挂起,JVM 会消除它身上的偏向锁,将锁恢复到标准的轻量级锁
- 通过消除资源无竞争情况下的同步原语,进一步提高了程序的运行性能
轻量级锁
- 由偏向所升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁
自旋锁
- 为了不让当前线程进入阻塞状态,让当前线程循环去获取锁
- 在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区,如果还不能获得锁,才会将线程在操作系统层面挂起
- 自旋锁的优缺点
- 使等待竞争锁的线程不需要做用户态和内核态之间的切换进入阻塞挂起状态,减少了不必要的上下文切换,执行速度快
- 需要消耗 CUP,让 CUP 在做无用功,所以需要设定一个自旋等待的最大时间(如设置为一个线程上下文切换的时间)
重量级锁 synchronized
- 重量级锁把除了拥有锁的线程都阻塞,防止 CPU 空转
- 非公平锁,可以重入
- synchronized 的执行过程:
- 检测 Mark Word 中的偏向锁标识是否为 1,线程 ID 是不是当前线程 ID,如果是,表示当前线程处于偏向锁
- 如果不是,则使用 CAS 将 Mark Word 中线程 ID 设置为当前线程 ID,如果成功则表示当前线程获得偏向锁
- 如果失败,则说明发生竞争,撤销偏向锁,进而升级为轻量级锁
- 当前线程使用 CAS 将对象头的 Mark Word 替换为指向当前线程的栈帧中 Lock Record 的指针,如果成功,当前线程获得锁
- 如果失败,表示其它线程竞争锁,当前线程尝试使用自旋来获取锁
- 如果自旋成功则依然处于轻量级状态
- 如果自旋失败,则膨胀为重量级锁
- 在所有的锁都启用的情况下,线程进入临界区时会先去获取偏向锁,如果已经存在偏向锁了,则会尝试获取轻量级锁,启用自旋锁,如果自旋也没有获取到锁,则使用重量级锁,没有获取到锁的线程阻塞挂起,直到持有锁的线程执行完同步块唤醒他们
锁优化
- 减少锁的时间:不需要同步执行的代码,能不放在同步快里面执行就不要放在同步快内,可以让锁尽快释放
- 减少锁的粒度:将物理上的一个锁,拆成逻辑上的多个锁,增加并行度,从而降低锁竞争,如 ConcurrentHashMap 在 jdk1.8 之前的版本
- 锁粗化:将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁,避免频繁的加锁解锁操作
- 使用读写锁:读操作加读锁,可以并发读,写操作使用写锁,只能单线程写,如 ReentrantReadWriteLock
- 使用 volatile + CAS 操作