一、JUC介绍
JUC 全称 Java Util Concurrent,是 JDK 1.5 正式引入的并发编程工具包(java.util.concurrent 包及子包),基于 volatile、CAS、AQS 三大底层基石,全面替代原始 synchronized、wait/notify、原生 Thread 手动创建的粗糙多线程方案,提供高性能、细粒度、功能丰富的并发工具,是 Java 后端高并发开发、面试核心体系。
1、JUC 底层三大基石
所有 JUC 工具的底层原理,全部基于这三个基础机制:
1. volatile(内存屏障)
- 作用:保证可见性、禁止指令重排序,不保证原子性
- 可见性:线程修改主内存变量,立即刷新;其他线程缓存失效,强制从主内存读取
- 有序性:插入内存屏障,禁止 CPU 指令重排,支撑单例 DCL、状态标志位
- 局限:
i++ 这类复合操作依然线程不安全
2. CAS(Compare And Swap,比较并交换,无锁算法)
- 原理:内存值 V、旧预期值 A、新值 B;仅当
V==A 时,才把 V 更新为 B,全程无锁、重试循环
- 优点:无阻塞、无线程上下文切换开销,并发性能远高于重量级锁
- 缺点:ABA 问题、循环自旋消耗 CPU、仅保证单个变量原子性
- 解决方案:
AtomicStampedReference(加版本号解决 ABA)
3. AQS(AbstractQueuedSynchronizer,抽象队列同步器)
JUC 所有锁、同步工具的底层父类(ReentrantLock、CountDownLatch、Semaphore 全部基于 AQS)
- 核心结构
- state 状态变量(volatile 修饰):标记锁占用、计数器数值
- 双向 CLH 队列:存储所有抢锁失败、阻塞等待的线程
- 工作流程:线程抢锁 → 成功修改 state;失败 → 入队阻塞;锁释放 → 唤醒队首线程
- 两大模式:独占模式(锁,如 ReentrantLock)、共享模式(计数器,如 CountDownLatch)
二、线程基础(Thread + Runnable)
这里不写!!!!!
三、Callable、FutureTask
为什么需要Callable 和 FuntrueTask??
Runnable缺点:
没有返回值,线程干完活拿不到结果;
不能抛出受检异常,内部异常只能自己 try-catch
一旦出问题,主线程完全感知不到
Callable源码:
1 2 3 4 5 6 7 8 9 10 11 12
| @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
@FunctionalInterface public interface Runnable { void run(); }
|
FutrueTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Callable 不能直接交给 Thread 执行!!!Callable 不能直接交给 Thread 执行!!! Thread 类的所有构造方法,只认 Runnable 接口,不认 Callable。 所以需要一个包装类
FutureTask ↳ 实现 RunnableFuture 接口 ↳ 同时继承 **Runnable** + **Future**
// 1. 包装 Callable(最常用) public FutureTask(Callable<V> callable)
// 2. 包装 Runnable,并且手动指定返回结果 public FutureTask(Runnable runnable, V result)
|
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class MyCallable implements Callable<String> { @Override public String call() throws Exception { int[] arr = {1,2,3,4,5,6,7}; for (int j : arr) { try { Thread.sleep(500); System.out.println(j); } catch (InterruptedException e) { throw new RuntimeException(e); } } return "遍历完毕"; } }
public class Test { public static void main(String[] args) { FutureTask<String> stringFutureTask = new FutureTask<String>(new MyCallable());
new Thread(() -> { String name = Thread.currentThread().getName(); System.out.println("====" + name + "===="); new Thread(stringFutureTask).start(); }).start();
System.out.println("正常执行");
try{ String s = stringFutureTask.get(); System.out.println(s); }catch(Exception e){ System.out.println(e.getMessage()); } } }
|
FutureTask
相关方法:
| 方法 |
返回值 |
作用详解 |
阻塞? |
抛出异常 |
| V get() |
V |
无限阻塞获取任务结果任务没执行完,主线程就一直等待,直到任务结束拿到返回值 |
强阻塞 |
InterruptedException``ExecutionException |
| V get(long timeout, TimeUnit unit) |
V |
限时阻塞获取结果等待指定时间,时间到任务还没完成直接放弃、抛异常,不无限卡死 |
限时阻塞 |
InterruptedException``ExecutionException``TimeoutException |
| boolean cancel(boolean mayInterruptIfRunning) |
boolean |
取消正在执行的任务true:中断正在运行的线程false:只取消未开始的任务,不中断正在跑的任务返回true:取消成功 |
无阻塞 |
无 |
| boolean isCancelled() |
boolean |
判断任务是否在完成之前被取消只有任务结束前被 cancel才返回 true正常执行完毕、异常结束都返回 false |
无阻塞 |
无 |
| boolean isDone() |
boolean |
判断任务是否已经结束****正常完成 / 异常结束 / 被取消,全部都算完成,都返回 true |
无阻塞 |
无 |
底层原理:
1. 内部状态(非常重要)
FutureTask 内部有一个 volatile 状态常量,标记任务生命周期
1 2 3 4 5 6 7
| NEW 初始状态,任务刚创建 COMPLETING 任务执行完毕,正在赋值结果 NORMAL 正常完成,有结果 EXCEPTIONAL 执行抛出异常 CANCELLED 任务被取消 INTERRUPTING 正在中断 INTERRUPTED 已中断
|
2. 执行原理底层
Thread.start() → 调用 FutureTask.run()
run() 内部执行 callable.call()
- 执行成功:把返回值存入成员变量,状态改为
NORMAL
- 执行失败:捕获异常,存入异常对象,状态改为
EXCEPTIONAL
- 主线程调用
get():
- 任务没完成 → 阻塞挂起主线程
- 任务完成 → 直接返回结果 / 抛出内部异常
3. 为什么要设计 FutureTask?
因为 Java 早期线程框架 Thread 只兼容 Runnable,为了不改动原有 Thread 源码,新增 Callable,用包装类兼容。
4. Future 体系的所有缺点(必考,必背,引出 CompletableFuture)
1 2 3 4 5 6 7 8
| 这是面试连环问核心,90% 考点全在这
1. **get () 方法阻塞主线程** 调用就卡住,主线程啥都干不了,只能干等,违背异步初衷。
2. **没有回调机制** 任务做完了不能自动触发后续逻辑,只能主动轮询
|
isDone()
1 2 3 4 5 6 7 8 9 10 11
| 判断
3. **多个任务无法组合**
不能实现:任务 A 做完再做任务 B、所有任务全部完成汇总、谁先完成用谁
4. **异常处理繁琐**
任务内部异常,只有调用
|
get()
1 2 3
| 时才会抛出,前期无法感知
5. **无法批量管理任务**
|
四、CompletableFuture
它是 Java 8+ 用来写异步代码的神器,比 Thread、Runnable 强太多,自动开线程、自动回调、自动等待、自动异常处理,写起来超级简单。
核心作用:
- 自动用线程池,不用手动 new Thread
- 链式编程,代码干净
- 支持等待多个任务完成
- 支持任务完成后自动回调
- 支持异常处理
- 支持先完成谁就用谁
一句话:异步任务想怎么写就怎么写,不用管线程怎么开。
继承
1 2
| Future<V> ↳ CompletableFuture<V> 继承Future所有方法
|
创建异步方法
无返回值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void Nos(){ CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { int[] arr = {1,2,3,4,5,6,7}; for (int j : arr) { try { Thread.sleep(500); System.out.println(j); } catch (InterruptedException e) { throw new RuntimeException(e); } } });
completableFuture.join(); }
|
有返回值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void Yes(){ CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { int[] arr = {1,2,3,4,5,6,7}; for (int j : arr) { try { Thread.sleep(500); System.out.println(j); } catch (InterruptedException e) { throw new RuntimeException(e); } } return "完成"; });
String context = null; try { context = completableFuture.get(); } catch (Exception e) { throw new RuntimeException(e); } System.out.println(context);
}
|
获取结构方法:
| 方法 |
是否阻塞 |
异常类型 |
作用 |
get() |
强阻塞 |
受检异常(需 try-catch) |
等待任务完成获取结果(继承 Future,开发不用) |
join() |
阻塞 |
运行时异常(无需捕获) |
开发首选,等待完成拿结果 |
getNow(T defaultValue) |
不阻塞 |
无 |
任务完成返回结果,没完成直接返回默认值 |
单任务链式调用:
| 方法 |
上一步结果入参 |
是否返回新值 |
对应函数式接口 |
作用 |
thenApply(Function) |
有 |
有 |
Function |
处理结果,转换数据,继续链式 |
thenApplyAsync(Function) |
有 |
有 |
Function |
同上,异步新线程执行 |
thenAccept(Consumer) |
有 |
无 |
Consumer |
只消费结果,打印 / 使用,不返回 |
thenAcceptAsync(Consumer) |
有 |
无 |
Consumer |
同上,异步新线程执行 |
thenRun(Runnable) |
无(不拿结果) |
无 |
Runnable |
只执行后续动作,完全不关心上一步结果 |
thenRunAsync(Runnable) |
无(不拿结果) |
无 |
Runnable |
同上,异步新线程执行 |
多任务批量组合
| 方法 |
参数 |
等待规则 |
返回值 |
allOf(...) |
多个 CF 可变参数 |
所有任务全部完成才继续 |
CompletableFuture<Void> 无返回 |
anyOf(...) |
多个 CF 可变参数 |
任意一个任务完成就结束 |
CompletableFuture<Object> 返回最先完成的结果 |
异常处理方法
| 方法 |
触发时机 |
返回值 |
作用 |
exceptionally(Function) |
仅任务出错时执行 |
兜底默认值 |
异常兜底、返回默认值,不中断流程 |
handle(BiFunction) |
无论成功、失败都执行 |
新结果 |
统一处理结果 + 异常,全能型 |
五、锁
多线程操作同一个共享资源,会出现:脏数据、数据覆盖、线程安全问题锁的作用:同一时间只允许一个线程进入临界区代码,保证安全。
1. synchronized 内置锁
四种使用方式
- 修饰普通成员方法:锁当前对象
this
- 修饰静态方法:锁类.class
- 代码块锁对象:锁自定义任意对象
- 代码块锁类:锁类字节码对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Object obj = new Object(); public void test(){ synchronized (obj){ } }
public synchronized void method1(){}
public static synchronized void method2(){}
|
2. Lock类
1 2 3 4 5 6 7 8
| public interface Lock { void lock(); void unlock(); boolean tryLock(); boolean tryLock(时间,单位); void lockInterruptibly(); Condition newCondition(); }
|
3. ReentrantLock 可重入锁
可重入独占锁,synchronized 的功能全面升级版。
1 2 3 4 5 6 7 8 9 10 11
| Lock lock = new ReentrantLock(false);
public void add(){ lock.lock(); try{ }finally { lock.unlock(); } }
|
Renntrantlock继承了Lock的方法,这里是特有方法:
| 方法 |
作用 |
说明 |
boolean isLocked() |
判断锁是否被占用 |
只要有线程拿着锁就返回 true |
boolean isFair() |
判断是否是公平锁 |
构造方法传入true为公平锁,默认非公平 |
boolean isHeldByCurrentThread() |
判断当前线程是否持有该锁 |
用于防止重复解锁异常 |
int getHoldCount() |
获取当前线程重入次数 |
重入锁专用,加几次锁,数字就是几,解锁对应次数才完全释放 |
int getQueueLength() |
获取等待队列线程总数 |
所有阻塞等待这把锁的线程数量 |
boolean hasQueuedThreads() |
是否有线程在排队等待锁 |
粗略判断有无阻塞线程 |
4. ReentrantReadWriteLock 可读写锁
读多写少(大量读、少量修改) 普通锁:读写互斥、读读也互斥,性能极差。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
private static final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private static final Lock readLock = rwLock.readLock(); private static final Lock writeLock = rwLock.writeLock();
private static String data = "初始数据";
public static void read() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + " 读取数据:" + data); } finally { readLock.unlock(); } }
public static void write(String newData) { writeLock.lock(); try { data = newData; System.out.println(Thread.currentThread().getName() + " 修改数据:" + data); } finally { writeLock.unlock(); } }
public static void main(String[] args) { new Thread(() -> read(), "读线程1").start(); new Thread(() -> read(), "读线程2").start(); new Thread(() -> read(), "读线程3").start();
new Thread(() -> write("新数据"), "写线程").start(); } }
|
相关方法(读写锁继承Lock):
| 方法 |
作用 |
ReentrantReadWriteLock() |
构造,默认非公平锁 |
ReentrantReadWriteLock(boolean fair) |
构造,可指定公平 / 非公平 |
Lock readLock() |
获取读锁对象 |
Lock writeLock() |
获取写锁对象 |
int getReadLockCount() |
获取读锁被持有次数 |
boolean isWriteLocked() |
判断是否被写锁占用 |
int getWriteHoldCount() |
当前线程写锁重入次数 |
int getReadHoldCount() |
当前线程读锁重入次数 |
boolean isFair() |
是否为公平锁 |
5. StampedLock 邮戳锁
读写锁的升级版、性能最强 ** 解决读写锁写锁饥饿问题 自带乐观读模式**,读几乎无锁,性能极高。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import java.util.concurrent.locks.StampedLock;
public class StampedLockTest { private static final StampedLock lock = new StampedLock(); private static int data = 100;
public void write(int newVal) { long stamp = lock.writeLock(); try { data = newVal; System.out.println(Thread.currentThread().getName() + " 写数据:" + data); } finally { lock.unlock(stamp); } }
public void readPessimistic() { long stamp = lock.readLock(); try { System.out.println(Thread.currentThread().getName() + " 悲观读:" + data); } finally { lock.unlock(stamp); } }
public void readOptimistic() { long stamp = lock.tryOptimisticRead(); int temp = data;
if (!lock.validate(stamp)) { stamp = lock.readLock(); try { temp = data; } finally { lock.unlock(stamp); } } System.out.println(Thread.currentThread().getName() + " 乐观读结果:" + temp); }
public static void main(String[] args) { StampedLockTest test = new StampedLockTest();
new Thread(test::readOptimistic,"乐观读线程1").start(); new Thread(test::readOptimistic,"乐观读线程2").start();
new Thread(test::readPessimistic,"悲观读线程").start();
new Thread(() -> test.write(200),"写线程").start(); } }
|
锁互斥规则(对比读写锁)
| 模式组合 |
是否互斥 |
| 乐观读 ↔ 乐观读 |
不互斥(完全并行) |
| 乐观读 ↔ 悲观读 |
不互斥 |
| 乐观读 ↔ 写锁 |
互斥(写锁会打断乐观读) |
| 悲观读 ↔ 悲观读 |
共享不互斥 |
| 悲观读 ↔ 写锁 |
互斥 |
| 写锁 ↔ 写锁 |
互斥 |
相关方法:
| 方法 |
返回值 |
作用详解 |
long writeLock() |
long 邮戳 |
加独占写锁,阻塞;返回本次锁邮戳 |
boolean tryWriteLock() |
boolean |
尝试获取写锁,不阻塞,成功返回 true |
long tryWriteLock(long time, TimeUnit unit) |
long |
限时尝试获取写锁,超时返回 0 |
void unlockWrite(long stamp) |
void |
释放写锁,必须传入加锁得到的邮戳 |
long readLock() |
long 邮戳 |
悲观读锁(共享),阻塞;返回邮戳 |
boolean tryReadLock() |
boolean |
尝试悲观读锁,不阻塞 |
long tryReadLock(long time, TimeUnit unit) |
long |
限时悲观读锁 |
void unlockRead(long stamp) |
void |
释放悲观读锁,传入邮戳 |
long tryOptimisticRead() |
long |
乐观读(核心),无锁、不阻塞,直接返回状态邮戳 |
boolean validate(long stamp) |
boolean |
乐观读校验:判断读期间有没有被写锁修改过 |
void unlock(long stamp) |
void |
万能解锁:自动识别写锁 / 读锁邮戳,统一释放(推荐) |
boolean isWriteLocked(long stamp) |
boolean |
判断当前是否被写锁占用 |
6.锁的核心理论概念
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| 9.1 悲观锁 vs 乐观锁
悲观锁 认为并发一定会冲突,上来就加锁。 代表:synchronized、ReentrantLock 乐观锁 认为不会冲突,不上锁,修改时校验版本。 代表:CAS、Atomic 原子类、StampedLock
9.2 公平锁 vs 非公平锁
公平锁:排队先来先得,不插队 非公平锁:新来线程可以直接插队抢锁(默认,性能更好)
9.3 可重入锁 同一个线程多次获取同一把锁不会死锁,释放时对应解锁即可。synchronized、ReentrantLock 全部默认可重入。
9.4 独占锁(排他锁)& 共享锁
独占锁:一把锁同一时间只能一个线程持有 synchronized、ReentrantLock、写锁 共享锁:多个线程可同时持有 读锁
9.5 死锁 四个必要条件(全部满足才会死锁) 互斥 占有且等待 不可抢占 循环等待 破坏任意一个即可避免。
|
六、并发底层核心原理
1.AQS
AQS = JUC 所有锁、工具类的底层父类、底层基石ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore 底层全都是 AQS。
它用一套统一框架,实现所有并发同步控制。
核心结构:
1 2 3 4 5
| volatile int state;
Node head, tail;
|
核心方法:
| 方法 |
说明 |
acquire(int arg) |
尝试获取资源,失败则进入队列等待 |
release(int arg) |
释放资源,唤醒后续节点 |
tryAcquire(int arg) |
需要子类实现 |
tryRelease(int arg) |
需要子类实现 |
你只需要重写 tryAcquire / tryRelease,AQS 帮你处理 线程排队、阻塞、唤醒。
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import java.util.concurrent.locks.AbstractQueuedSynchronizer;
class NonReentrantLock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; }
@Override protected boolean isHeldExclusively() { return getState() == 1 && getExclusiveOwnerThread() == Thread.currentThread(); } }
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); } }
|
2.CAS
CAS = Compare And Swap(比较并交换),是一种无锁的原子操作。
一句话:CAS 先比较内存值是否等于预期值,如果是就更新为新值,整个过程是原子的。
三个参数:
使用 CAS(AtomicInteger)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import java.util.concurrent.atomic.AtomicInteger;
public class CASTest { private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { count.incrementAndGet(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { count.incrementAndGet(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count.get()); } }
|
3.ABA问题
ABA 问题:CAS 操作时,内存值从 A → B → A,CAS 检查时发现还是 A,就认为没变过,但实际上中间被改过。
1 2 3 4 5 6 7 8 9 10
| // 场景:你账户有 100 元 // 线程1:转入 100 元(100 → 200) // 线程2:转出 100 元(100 → 0) // 线程3:检查余额是否还是 100,是就执行某个操作
// 问题: // 1. 线程1 转入 100(100→200) // 2. 线程2 转出 100(200→100) // 3. 线程3 CAS 发现还是 100,认为没变化 ✓ 错误! // 实际已经经历 100→200→100
|
解决(使用比较+版本号):
1 2
| AtomicStampedReference<V> ref = new AtomicStampedReference<>(initialValue, initialStamp);
|
七、原子操作类 Atomic
把普通变量变成”原子变量”,多线程下不用加锁就能安全操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| java.util.concurrent.atomic ├── 基本类型 │ ├── AtomicInteger │ ├── AtomicLong │ └── AtomicBoolean ├── 数组类型 │ ├── AtomicIntegerArray │ ├── AtomicLongArray │ └── AtomicReferenceArray ├── 引用类型 │ ├── AtomicReference │ ├── AtomicStampedReference (解决ABA) │ └── AtomicMarkableReference └── 字段更新器 ├── AtomicIntegerFieldUpdater ├── AtomicLongFieldUpdater └── AtomicReferenceFieldUpdater
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.atomic_;
import java.util.concurrent.atomic.AtomicInteger;
public class integer { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); atomicInteger.set(12); System.out.println(atomicInteger.get());
boolean success = atomicInteger.compareAndSet(10, 20); System.out.println("CAS结果: " + success + ", 值: " + atomicInteger.get());
System.out.println("incrementAndGet: " + atomicInteger.incrementAndGet()); System.out.println("getAndIncrement: " + atomicInteger.getAndIncrement()); System.out.println("decrementAndGet: " + atomicInteger.decrementAndGet());
System.out.println("addAndGet: " + atomicInteger.addAndGet(5));
atomicInteger.updateAndGet(x -> x * 2); System.out.println("updateAndGet: " + atomicInteger.get()); } }
|
八、线程池
线程池 = 提前创建好一堆线程,反复使用,避免频繁创建销毁线程的开销。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.ThreadPool_;
import java.util.concurrent.*;
public class Executors_ { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); pool.execute(() -> { System.out.println(Thread.currentThread().getName() + "执行任务" + taskId); }); } }
|
相关参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 队列类型 特点 使用场景 ArrayBlockingQueue 有界队列,必须指定大小 需要控制内存 LinkedBlockingQueue 无界队列(默认Integer.MAX) 任务数量未知,Executors.newFixedThreadPool用的这个 SynchronousQueue 不存储任务,直接交给线程 缓存线程池用的这个 PriorityBlockingQueue 优先级队列 任务有优先级 DelayQueue 延迟队列 定时任务
4种拒绝策略(面试必问) // 1. AbortPolicy(默认):直接抛异常 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 2. CallerRunsPolicy:让提交任务的线程自己执行 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 3. DiscardPolicy:直接丢弃,不抛异常 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 4. DiscardOldestPolicy:丢弃队列中最老的任务 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
|
相关方法:
| 方法 |
作用 |
使用频率 |
必背指数 |
execute(Runnable) |
提交任务(无返回值) |
⭐️⭐️⭐️⭐️⭐️ |
✅ 必背 |
submit(Callable/T) |
提交任务(有返回值) |
⭐️⭐️⭐️⭐️ |
✅ 必背 |
shutdown() |
优雅关闭线程池 |
⭐️⭐️⭐️⭐️⭐️ |
✅ 必背 |
shutdownNow() |
立即关闭线程池 |
⭐️⭐️ |
了解 |
awaitTermination() |
等待线程池关闭 |
⭐️⭐️⭐️ |
掌握 |
getActiveCount() |
获取活跃线程数 |
⭐️⭐️⭐️ |
掌握 |
getQueue().size() |
获取排队任务数 |
⭐️⭐️⭐️ |
掌握 |
九、线程安全集合
普通集合(ArrayList、HashMap)在多线程下会出问题,需要用线程安全集合代替。
| 场景 |
普通集合(❌不安全) |
线程安全集合(✅安全) |
| List |
ArrayList |
CopyOnWriteArrayList |
| Set |
HashSet |
CopyOnWriteArraySet、ConcurrentSkipListSet |
| Map |
HashMap |
ConcurrentHashMap |
| Queue |
LinkedList |
ConcurrentLinkedQueue、BlockingQueue |
1.ConcurrentHashMap(最常用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapDemo { public static void main(String[] args) { ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); map.put("a", 1); map.put("b", 2); Integer value = map.get("a"); map.putIfAbsent("c", 3); map.putIfAbsent("c", 4); map.replace("c", 3, 5); map.compute("count", (k, v) -> v == null ? 1 : v + 1); map.merge("count", 1, Integer::sum); map.forEach((k, v) -> System.out.println(k + "=" + v)); } }
|
2.CopyOnWriteArrayList(读多写少场景)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteDemo { public static void main(String[] args) { CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); list.add("a"); list.add("b"); list.get(0); list.remove("a"); Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { list.add("t1-" + i); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { list.add("t2-" + i); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("大小: " + list.size()); } }
|
3.BlockingQueue(阻塞队列,生产者消费者模式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); static class Producer implements Runnable { @Override public void run() { int i = 0; while (true) { try { queue.put(i); System.out.println(Thread.currentThread().getName() + " 生产: " + i++); Thread.sleep(500); } catch (InterruptedException e) { break; } } } } static class Consumer implements Runnable { @Override public void run() { while (true) { try { Integer value = queue.take(); System.out.println(Thread.currentThread().getName() + " 消费: " + value); Thread.sleep(1000); } catch (InterruptedException e) { break; } } } } public static void main(String[] args) { new Thread(new Producer(), "生产者1").start(); new Thread(new Consumer(), "消费者1").start(); new Thread(new Consumer(), "消费者2").start(); }
|
4.ConcurrentLinkedQueue(高性能无锁队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentQueueDemo { public static void main(String[] args) { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); queue.offer("a"); queue.add("b"); String peek = queue.peek(); String poll = queue.poll(); int size = queue.size(); for (String item : queue) { System.out.println(item); } } }
|
十、JUC 三大并发控制工具类
| 工具类 |
作用 |
比喻 |
| CountDownLatch |
等待多个线程完成 |
老师等所有学生交卷 |
| CyclicBarrier |
多个线程互相等待到齐 |
等人齐了再开饭 |
| Semaphore |
控制同时访问的线程数 |
停车场的限流杆 |
1. CountDownLatch - 等待所有人完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { int person = i; new Thread(() -> { System.out.println("第" + person + "个人开始干活"); try { Thread.sleep(2000); } catch (Exception e) {} System.out.println("第" + person + "个人干完了"); latch.countDown(); }).start(); } latch.await(); System.out.println("所有人都干完了,继续执行"); } }
|
2. CyclicBarrier - 等人齐了一起走
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("所有人都到齐了,出发!"); }); for (int i = 1; i <= 3; i++) { int person = i; new Thread(() -> { System.out.println("第" + person + "个人到达集合点"); try { barrier.await(); System.out.println("第" + person + "个人出发了"); } catch (Exception e) {} }).start(); } } }
|
3. Semaphore - 限流/控制并发数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import java.util.concurrent.Semaphore;
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 10; i++) { int car = i; new Thread(() -> { try { semaphore.acquire(); System.out.println("第" + car + "辆车停进来了"); Thread.sleep(2000); System.out.println("第" + car + "辆车开走了"); semaphore.release(); } catch (Exception e) {} }).start(); } } }
|
十一、线程安全总结与方案选型
1.并发问题根源演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class ConcurrencyProblems { private static boolean visibleFlag = true; public static void visibilityProblem() throws InterruptedException { Thread t1 = new Thread(() -> { while (visibleFlag) { } System.out.println("线程1退出"); }); Thread t2 = new Thread(() -> { try { Thread.sleep(1000); } catch (Exception e) {} visibleFlag = false; System.out.println("线程2修改了标志"); }); t1.start(); t2.start(); t1.join(); t2.join(); } private static int atomicCount = 0; public static void atomicityProblem() throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) atomicCount++; }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) atomicCount++; }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("期望: 20000, 实际: " + atomicCount); } private static int a = 0, b = 0, x = 0, y = 0; public static void orderingProblem() throws InterruptedException { for (int i = 0; i < 100000; i++) { a = 0; b = 0; x = 0; y = 0; Thread t1 = new Thread(() -> { a = 1; x = b; }); Thread t2 = new Thread(() -> { b = 1; y = a; }); t1.start(); t2.start(); t1.join(); t2.join(); if (x == 0 && y == 0) { System.out.println("发现指令重排序!x=" + x + ", y=" + y); break; } } } }
|
2.解决方案选型对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| public class SolutionComparison { private static int count = 0; private static final Object lock = new Object(); private static final AtomicInteger atomicCount = new AtomicInteger(0); private static final ReentrantLock reentrantLock = new ReentrantLock(); public static void syncSolution() throws InterruptedException { count = 0; Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { synchronized (lock) { count++; } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { synchronized (lock) { count++; } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("synchronized 结果: " + count); } public static void atomicSolution() throws InterruptedException { atomicCount.set(0); Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { atomicCount.incrementAndGet(); } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { atomicCount.incrementAndGet(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Atomic 结果: " + atomicCount.get()); } public static void lockSolution() throws InterruptedException { count = 0; Thread t1 = new Thread(() -> { for (int i = 0; i < 10000; i++) { reentrantLock.lock(); try { count++; } finally { reentrantLock.unlock(); } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 10000; i++) { reentrantLock.lock(); try { count++; } finally { reentrantLock.unlock(); } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Lock 结果: " + count); } private static volatile boolean flag = true; public static void volatileSolution() throws InterruptedException { flag = true; Thread t1 = new Thread(() -> { while (flag) { } System.out.println("线程1看到flag变化,退出"); }); Thread t2 = new Thread(() -> { try { Thread.sleep(1000); } catch (Exception e) {} flag = false; System.out.println("线程2修改flag"); }); t1.start(); t2.start(); t1.join(); t2.join(); } }
|
3.集合选型规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class CollectionSelection { public static void mapSelection() { ConcurrentHashMap<String, String> map1 = new ConcurrentHashMap<>(); Map<String, String> map2 = Collections.synchronizedMap(new HashMap<>()); map1.put("key", "value"); map1.putIfAbsent("key2", "value2"); map1.compute("key3", (k, v) -> v == null ? "new" : v + "updated"); } public static void listSelection() { CopyOnWriteArrayList<String> list1 = new CopyOnWriteArrayList<>(); List<String> list2 = Collections.synchronizedList(new ArrayList<>()); list1.add("item"); for (String item : list1) { System.out.println(item); } } public static void queueSelection() { BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(100); BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); ConcurrentLinkedQueue<String> queue3 = new ConcurrentLinkedQueue<>(); new Thread(() -> { try { queue1.put("task"); } catch (InterruptedException e) {} }).start(); new Thread(() -> { try { String task = queue1.take(); } catch (InterruptedException e) {} }).start(); } public static void setSelection() { CopyOnWriteArraySet<String> set1 = new CopyOnWriteArraySet<>(); ConcurrentSkipListSet<String> set2 = new ConcurrentSkipListSet<>(); set1.add("element"); set2.add("element"); } }
|
4.锁选型规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| public class LockSelection { public void simpleSync() { synchronized (this) { count++; } } private final ReentrantLock lock = new ReentrantLock(); public void advancedLock() throws InterruptedException { if (lock.tryLock(1, TimeUnit.SECONDS)) { try { doSomething(); } finally { lock.unlock(); } } else { System.out.println("获取锁失败,执行降级策略"); } } private final ReentrantLock fairLock = new ReentrantLock(true); public void fairLockExample() { fairLock.lock(); try { } finally { fairLock.unlock(); } } private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); private Object data; public Object read() { readLock.lock(); try { return data; } finally { readLock.unlock(); } } public void write(Object newData) { writeLock.lock(); try { this.data = newData; } finally { writeLock.unlock(); } } private final AtomicInteger atomicCounter = new AtomicInteger(0); public void lockFree() { atomicCounter.incrementAndGet(); } }
|
5.异步选型规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| public class AsyncSelection { private static final ExecutorService threadPool = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); public void asyncWithoutResult() { threadPool.execute(() -> { sendLog("用户登录"); }); } public void asyncWithResult() throws Exception { Future<String> future = threadPool.submit(() -> { Thread.sleep(1000); return "查询结果"; }); String result = future.get(); System.out.println("结果: " + result); } public void asyncCompose() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { return "用户信息"; }, threadPool); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { return 100; }, threadPool); CompletableFuture<String> combined = future1.thenCombine(future2, (user, score) -> user + " 分数: " + score ); combined.thenAccept(result -> { System.out.println("最终结果: " + result); }); } public void scheduledTask() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); scheduler.schedule(() -> { System.out.println("5秒后执行"); }, 5, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(() -> { System.out.println("每10秒执行一次"); }, 0, 10, TimeUnit.SECONDS); } @org.springframework.scheduling.annotation.Async public void springAsync() { System.out.println("Spring 异步执行"); } private void sendLog(String msg) { System.out.println(msg); } }
|
6.完整选型决策树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class SelectionDecisionTree { public void decision1() { } public void decision2() { } public void decision3() { } public void decision4() { } public void decision5() { } }
|
7.快速选型表
| 场景 |
首选方案 |
备选方案 |
禁止方案 |
| 计数器 |
AtomicInteger |
LongAdder |
synchronized + int |
| Map操作 |
ConcurrentHashMap |
Collections.synchronizedMap |
HashMap |
| List操作 |
CopyOnWriteArrayList |
Collections.synchronizedList |
ArrayList |
| 生产者消费者 |
ArrayBlockingQueue |
LinkedBlockingQueue |
自己实现 |
| 简单同步 |
synchronized |
ReentrantLock |
- |
| 需要超时 |
ReentrantLock |
- |
synchronized |
| 读多写少 |
ReadWriteLock |
CopyOnWriteArrayList |
synchronized |
| 异步无返回 |
execute() |
@Async |
new Thread() |
| 异步有返回 |
CompletableFuture |
submit() |
- |