前言:为什么线程同步是并发编程的 "定海神针"
在多核 CPU 主导的时代,并发编程已经从 "高级特性" 变成了 "必备技能"。想象一下:当多个线程像多台挖掘机同时挖掘一个矿藏时,如果没有协调机制,必然会导致数据混乱、重复劳动甚至系统崩溃。线程同步就是这样一套 "交通规则",确保多个线程能够有序访问共享资源。
根据 Oracle 官方文档统计,约 70% 的并发 bug 根源都在于不正确的线程同步。本文将系统剖析 Java 中 12 种线程同步方式,从基础的synchronized到高级的StampedLock,从底层原理到实战案例,帮你构建完整的线程同步知识体系。
一、线程同步的本质:解决 "可见性、原子性、有序性" 三大难题
在深入具体同步方式之前,我们必须先理解线程同步要解决的核心问题。Java 内存模型(JMM)定义了线程和主内存之间的抽象关系,正是这种抽象导致了三大并发问题:
可见性:一个线程对共享变量的修改,其他线程不能立即看到原子性:一个操作不能被打断,要么全部执行,要么都不执行有序性:指令执行顺序可能被编译器或 CPU 优化打乱
所有线程同步机制的设计,都是为了解决这三个问题中的一个或多个。
二、synchronized:Java 原生的 "一键同步" 方案
synchronized是 Java 语言内置的同步机制,无需手动释放锁,是最常用也最基础的同步方式。
2.1 synchronized 的三种使用形式
2.1.1 同步实例方法
import lombok.extern.slf4j.Slf4j;
/**
* 同步实例方法演示
*
* @author ken
*/
@Slf4j
public class SynchronizedInstanceMethodDemo {
private int count = 0;
/**
* 同步实例方法,锁对象是当前实例
*/
public synchronized void increment() {
// 临界区代码
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
}
public static void main(String[] args) {
SynchronizedInstanceMethodDemo demo = new SynchronizedInstanceMethodDemo();
// 创建10个线程同时执行increment方法
for (int i = 0; i < 10; i++) {
new Thread(demo::increment, "Thread-" + i).start();
}
}
}
2.1.2 同步静态方法
import lombok.extern.slf4j.Slf4j;
/**
* 同步静态方法演示
*
* @author ken
*/
@Slf4j
public class SynchronizedStaticMethodDemo {
private static int staticCount = 0;
/**
* 同步静态方法,锁对象是当前类的Class对象
*/
public static synchronized void staticIncrement() {
// 临界区代码
staticCount++;
log.info("线程{}:staticCount = {}", Thread.currentThread().getName(), staticCount);
}
public static void main(String[] args) {
// 创建10个线程同时执行staticIncrement方法
for (int i = 0; i < 10; i++) {
new Thread(SynchronizedStaticMethodDemo::staticIncrement, "Thread-" + i).start();
}
}
}
2.1.3 同步代码块
import lombok.extern.slf4j.Slf4j;
/**
* 同步代码块演示
*
* @author ken
*/
@Slf4j
public class SynchronizedBlockDemo {
private int count = 0;
// 自定义锁对象,推荐使用专门的锁对象而非this或Class对象
private final Object lock = new Object();
public void increment() {
// 同步代码块,锁对象是lock
synchronized (lock) {
// 临界区代码
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
}
}
public static void main(String[] args) {
SynchronizedBlockDemo demo = new SynchronizedBlockDemo();
// 创建10个线程同时执行increment方法
for (int i = 0; i < 10; i++) {
new Thread(demo::increment, "Thread-" + i).start();
}
}
}
2.2 synchronized 的底层实现:从对象头到监视器锁
synchronized的实现依赖于 JVM 中的监视器锁(Monitor),而 Monitor 又依赖于操作系统的互斥锁(Mutex)实现。在 JDK 6 之后,HotSpot 虚拟机对synchronized进行了重大优化,引入了偏向锁、轻量级锁和重量级锁三种状态。
偏向锁:适用于单线程重复获取同一把锁的场景,通过记录线程 ID 避免每次加锁解锁的开销轻量级锁:适用于多线程交替执行的场景,通过 CAS 操作避免重量级锁的系统调用重量级锁:适用于多线程同时竞争的场景,需要操作系统介入,开销最大
2.3 synchronized 的使用场景与最佳实践
锁粒度要小:同步代码块应尽可能小,只包含必要的临界区代码避免锁嵌套:可能导致死锁避免使用 String 常量或基本类型包装类作为锁对象:可能导致不同地方的锁实际上是同一把锁优先使用同步代码块而非同步方法:灵活性更高,可选择更合适的锁对象
三、volatile:轻量级的 "可见性保证" 机制
volatile是 Java 提供的一种轻量级同步机制,它不能保证原子性,但能保证可见性和有序性。
3.1 volatile 的核心作用
import lombok.extern.slf4j.Slf4j;
/**
* volatile可见性演示
*
* @author ken
*/
@Slf4j
public class VolatileVisibilityDemo {
// 使用volatile修饰共享变量
private volatile boolean flag = false;
public void setFlag() {
this.flag = true;
log.info("线程{}:设置flag为true", Thread.currentThread().getName());
}
public void checkFlag() {
while (!flag) {
// 循环等待flag变为true
}
log.info("线程{}:检测到flag为true,退出循环", Thread.currentThread().getName());
}
public static void main(String[] args) {
VolatileVisibilityDemo demo = new VolatileVisibilityDemo();
// 线程1负责检测flag
new Thread(demo::checkFlag, "Checker").start();
// 休眠100ms,确保Checker线程先启动
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 线程2负责设置flag
new Thread(demo::setFlag, "Setter").start();
}
}
如果flag变量没有被volatile修饰,Checker线程可能永远不会退出循环,因为它看不到Setter线程对flag的修改。
3.2 volatile 的底层实现:内存屏障
volatile的可见性和有序性保证是通过内存屏障(Memory Barrier)实现的:
写屏障(Store Barrier):在volatile变量写操作之后插入,确保所有之前的操作都能被其他线程看到读屏障(Load Barrier):在volatile变量读操作之前插入,确保能看到其他线程的最新修改
3.3 volatile 的局限性:不能保证原子性
import lombok.extern.slf4j.Slf4j;
/**
* volatile不能保证原子性演示
*
* @author ken
*/
@Slf4j
public class VolatileAtomicityDemo {
// 使用volatile修饰count变量
private volatile int count = 0;
public void increment() {
// count++不是原子操作,包含读取、加1、写入三个步骤
count++;
}
public static void main(String[] args) throws InterruptedException {
VolatileAtomicityDemo demo = new VolatileAtomicityDemo();
int threadCount = 10;
Thread[] threads = new Thread[threadCount];
// 创建10个线程,每个线程执行1000次increment
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increment();
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
// 理想结果是10000,但实际结果往往小于10000
log.info("最终count值:{}", demo.count);
}
}
上述代码中,虽然count被volatile修饰,但count++操作不是原子的,因此最终结果可能小于 10000。
3.4 volatile 的正确使用场景
状态标志:如示例中的flag变量,用于标记某种状态双重检查锁定(DCL):在单例模式中使用,如懒汉式单例的实现独立观察:确保多个线程看到的变量值是一致的
四、Lock 接口:更灵活的 "手动锁" 机制
java.util.concurrent.locks.Lock接口提供了比synchronized更灵活的锁定操作,它允许更精细的控制,如尝试获取锁、超时获取锁等。
4.1 ReentrantLock:可重入锁的实现
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock演示
*
* @author ken
*/
@Slf4j
public class ReentrantLockDemo {
private int count = 0;
// 创建可重入锁,默认是非公平锁
private final Lock lock = new ReentrantLock();
/**
* 使用ReentrantLock进行同步
*/
public void increment() {
// 获取锁
lock.lock();
try {
// 临界区代码
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
} finally {
// 确保锁被释放
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockDemo demo = new ReentrantLockDemo();
// 创建10个线程同时执行increment方法
for (int i = 0; i < 10; i++) {
new Thread(demo::increment, "Thread-" + i).start();
}
}
}
4.2 ReentrantLock 的高级特性
4.2.1 尝试获取锁(tryLock)
/**
* 尝试获取锁演示
*
* @return 是否成功获取锁并执行了增量操作
*/
public boolean tryIncrement() {
// 尝试获取锁,立即返回结果
if (lock.tryLock()) {
try {
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
return true;
} finally {
lock.unlock();
}
} else {
log.info("线程{}:获取锁失败", Thread.currentThread().getName());
return false;
}
}
4.2.2 超时获取锁
import java.util.concurrent.TimeUnit;
/**
* 超时获取锁演示
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否成功获取锁并执行了增量操作
* @throws InterruptedException 如果线程在等待时被中断
*/
public boolean tryIncrementWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
// 在指定时间内尝试获取锁
if (lock.tryLock(timeout, unit)) {
try {
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
return true;
} finally {
lock.unlock();
}
} else {
log.info("线程{}:超时,获取锁失败", Thread.currentThread().getName());
return false;
}
}
4.2.3 中断响应
/**
* 可中断的锁获取演示
*
* @throws InterruptedException 如果线程在等待时被中断
*/
public void interruptibleIncrement() throws InterruptedException {
// 可中断地获取锁
lock.lockInterruptibly();
try {
count++;
log.info("线程{}:count = {}", Thread.currentThread().getName(), count);
} finally {
lock.unlock();
}
}
4.3 公平锁与非公平锁
ReentrantLock提供了公平锁和非公平锁两种实现:
// 公平锁:线程获取锁的顺序与请求顺序一致
Lock fairLock = new ReentrantLock(true);
// 非公平锁:线程获取锁的顺序不一定与请求顺序一致,可能有线程插队
Lock nonFairLock = new ReentrantLock(false); // 默认是非公平锁
公平锁保证了线程获取锁的公平性,但性能通常比非公平锁差,因为需要维护等待队列的顺序。
4.4 synchronized 与 ReentrantLock 的对比
特性synchronizedReentrantLock锁获取方式隐式获取显式调用 lock ()锁释放方式自动释放显式调用 unlock (),需在 finally 中执行可中断性不可中断可中断(lockInterruptibly ())超时获取不支持支持(tryLock (timeout, unit))尝试获取不支持支持(tryLock ())公平性非公平可选择公平或非公平条件变量只有一个(wait/notify)多个(newCondition ())性能JDK 6 + 后与 ReentrantLock 相当略好于 synchronized(极端场景)
五、ReadWriteLock:读写分离的 "智能锁"
ReadWriteLock维护了一对相关的锁:一个用于只读操作,一个用于写入操作。允许多个读线程同时访问,但写线程访问时会阻止所有读和写线程。
5.1 ReentrantReadWriteLock 的使用
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写锁演示
*
* @author ken
*/
@Slf4j
public class ReadWriteLockDemo {
// 共享数据
private final Map
// 创建读写锁
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁
private final java.util.concurrent.locks.Lock readLock = rwLock.readLock();
// 写锁
private final java.util.concurrent.locks.Lock writeLock = rwLock.writeLock();
/**
* 读取数据
*
* @param key 键
* @return 对应的值
*/
public Object get(String key) {
readLock.lock();
try {
log.info("线程{}:读取key = {}", Thread.currentThread().getName(), key);
return data.get(key);
} finally {
readLock.unlock();
}
}
/**
* 写入数据
*
* @param key 键
* @param value 值
*/
public void put(String key, Object value) {
writeLock.lock();
try {
log.info("线程{}:写入key = {}, value = {}", Thread.currentThread().getName(), key, value);
data.put(key, value);
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
// 创建5个读线程
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> demo.get("key" + num), "ReadThread-" + i).start();
}
// 创建2个写线程
for (int i = 0; i < 2; i++) {
final int num = i;
new Thread(() -> demo.put("key" + num, "value" + num), "WriteThread-" + i).start();
}
}
}
5.2 读写锁的适用场景
读写锁非常适合 "读多写少" 的场景,如缓存系统、配置中心等。它的核心思想是:
读操作不修改数据,多个线程可以同时进行写操作会修改数据,必须独占访问读操作和写操作不能同时进行
六、StampedLock:Java 8 引入的 "乐观读" 锁
StampedLock是 Java 8 新增的一种锁机制,它提供了乐观读模式,在某些场景下比ReentrantReadWriteLock具有更好的性能。
6.1 StampedLock 的三种模式
写锁(WriteLock):独占锁,获取后其他线程无法获取任何锁悲观读锁(ReadLock):共享锁,多个线程可以同时获取乐观读(Optimistic Read):不获取锁,仅验证数据是否被修改
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.StampedLock;
/**
* StampedLock演示
*
* @author ken
*/
@Slf4j
public class StampedLockDemo {
private double x = 0.0;
private double y = 0.0;
private final StampedLock stampedLock = new StampedLock();
/**
* 移动点的位置
*
* @param deltaX x方向的增量
* @param deltaY y方向的增量
*/
public void move(double deltaX, double deltaY) {
// 获取写锁,返回一个戳记
long stamp = stampedLock.writeLock();
try {
x += deltaX;
y += deltaY;
log.info("线程{}:移动到({}, {})", Thread.currentThread().getName(), x, y);
} finally {
// 释放写锁,需要传入获取锁时的戳记
stampedLock.unlockWrite(stamp);
}
}
/**
* 计算到原点的距离(使用悲观读锁)
*
* @return 到原点的距离
*/
public double distanceFromOriginWithReadLock() {
// 获取悲观读锁
long stamp = stampedLock.readLock();
try {
double currentX = x;
double currentY = y;
// 模拟一些计算时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
double distance = Math.sqrt(currentX * currentX + currentY * currentY);
log.info("线程{}:距离 = {}", Thread.currentThread().getName(), distance);
return distance;
} finally {
// 释放悲观读锁
stampedLock.unlockRead(stamp);
}
}
/**
* 计算到原点的距离(使用乐观读)
*
* @return 到原点的距离
*/
public double distanceFromOriginWithOptimisticRead() {
// 获取乐观读的戳记,不阻塞其他线程
long stamp = stampedLock.tryOptimisticRead();
// 读取数据
double currentX = x;
double currentY = y;
// 模拟一些计算时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 验证在乐观读期间数据是否被修改
if (!stampedLock.validate(stamp)) {
log.info("线程{}:数据已被修改,使用悲观读锁重新读取", Thread.currentThread().getName());
// 数据已被修改,使用悲观读锁重新读取
stamp = stampedLock.readLock();
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
double distance = Math.sqrt(currentX * currentX + currentY * currentY);
log.info("线程{}:距离 = {}", Thread.currentThread().getName(), distance);
return distance;
}
public static void main(String[] args) {
StampedLockDemo demo = new StampedLockDemo();
// 创建移动线程
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demo.move(1.0, 1.0);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "MoveThread").start();
// 创建使用悲观读的线程
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demo.distanceFromOriginWithReadLock();
}
}, "ReadLockThread").start();
// 创建使用乐观读的线程
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demo.distanceFromOriginWithOptimisticRead();
}
}, "OptimisticReadThread").start();
}
}
6.2 StampedLock 与 ReentrantReadWriteLock 的对比
特性ReentrantReadWriteLockStampedLock可重入性支持不支持乐观读不支持支持锁升级支持(读锁升级为写锁)支持(通过 tryConvertToWriteLock ())中断性支持写锁和悲观读锁支持,乐观读不支持性能较好读多写少场景下性能更优复杂度较低较高,需要处理戳记
StampedLock不支持可重入性,这是它与ReentrantReadWriteLock的主要区别之一,也是使用时需要特别注意的地方。
七、原子类:无锁化的 "原子操作" 方案
Java 的java.util.concurrent.atomic包提供了一系列原子类,它们利用 CAS(Compare-And-Swap)操作实现了无锁化的线程安全。
7.1 基本类型原子类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 基本类型原子类演示
*
* @author ken
*/
@Slf4j
public class BasicAtomicDemo {
// 使用AtomicInteger替代普通int
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
// 原子性地增加1并返回新值
int newValue = count.incrementAndGet();
log.info("线程{}:count = {}", Thread.currentThread().getName(), newValue);
}
public static void main(String[] args) {
BasicAtomicDemo demo = new BasicAtomicDemo();
// 创建10个线程同时执行increment方法
for (int i = 0; i < 10; i++) {
new Thread(demo::increment, "Thread-" + i).start();
}
}
}
7.2 数组类型原子类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* 数组类型原子类演示
*
* @author ken
*/
@Slf4j
public class AtomicArrayDemo {
// 创建长度为5的原子整数数组
private final AtomicIntegerArray array = new AtomicIntegerArray(5);
/**
* 原子性地为指定索引的元素加1
*
* @param index 数组索引
*/
public void increment(int index) {
int newValue = array.incrementAndGet(index);
log.info("线程{}:array[{}] = {}", Thread.currentThread().getName(), index, newValue);
}
public static void main(String[] args) {
AtomicArrayDemo demo = new AtomicArrayDemo();
// 创建10个线程,随机更新数组元素
for (int i = 0; i < 10; i++) {
final int threadNum = i;
new Thread(() -> {
// 随机选择一个索引(0-4)
int index = (int) (Math.random() * 5);
demo.increment(index);
}, "Thread-" + threadNum).start();
}
}
}
7.3 引用类型原子类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
* 引用类型原子类演示
*
* @author ken
*/
@Slf4j
public class AtomicReferenceDemo {
// 定义一个用户类
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private String name;
private int age;
}
public static void main(String[] args) {
// 创建初始用户
User initialUser = new User("Alice", 20);
// 创建原子引用
AtomicReference
log.info("初始用户:{}", userRef.get());
// 创建线程更新用户信息
new Thread(() -> {
User oldUser = userRef.get();
User newUser = new User("Bob", 25);
// CAS操作:如果当前值是oldUser,则更新为newUser
boolean success = userRef.compareAndSet(oldUser, newUser);
log.info("线程1更新{}:新用户 = {}", success ? "成功" : "失败", userRef.get());
}, "UpdateThread-1").start();
new Thread(() -> {
User oldUser = userRef.get();
User newUser = new User("Charlie", 30);
// CAS操作
boolean success = userRef.compareAndSet(oldUser, newUser);
log.info("线程2更新{}:新用户 = {}", success ? "成功" : "失败", userRef.get());
}, "UpdateThread-2").start();
}
}
7.4 原子类的底层实现:CAS 操作
CAS 操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相等,那么处理器会自动将该位置值更新为新值,否则不做任何操作。
CAS 是一种乐观锁策略,它不阻塞线程,而是通过重试机制解决冲突。但在高并发场景下,可能会出现大量重试,影响性能,这就是所谓的 "ABA 问题"。
7.5 解决 ABA 问题:AtomicStampedReference
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* AtomicStampedReference解决ABA问题演示
*
* @author ken
*/
@Slf4j
public class AtomicStampedReferenceDemo {
public static void main(String[] args) {
// 创建带版本戳的原子引用,初始值为100,版本号为0
AtomicStampedReference
// 线程1:模拟ABA问题
new Thread(() -> {
int stamp = money.getStamp();
log.info("线程1获取的初始值:{},版本号:{}", money.getReference(), stamp);
// 先将值从100改为200
money.compareAndSet(100, 200, stamp, stamp + 1);
log.info("线程1将值改为200,新版本号:{}", money.getStamp());
// 再将值从200改回100
stamp = money.getStamp();
money.compareAndSet(200, 100, stamp, stamp + 1);
log.info("线程1将值改回100,新版本号:{}", money.getStamp());
}, "Thread-1").start();
// 线程2:尝试更新值
new Thread(() -> {
int stamp = money.getStamp();
log.info("线程2获取的初始值:{},版本号:{}", money.getReference(), stamp);
// 休眠1秒,等待线程1完成ABA操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 尝试将值从100改为300
boolean success = money.compareAndSet(100, 300, stamp, stamp + 1);
log.info("线程2更新{}:当前值 = {},当前版本号 = {}",
success ? "成功" : "失败", money.getReference(), money.getStamp());
}, "Thread-2").start();
}
}
AtomicStampedReference通过引入版本号解决了 ABA 问题,每次修改不仅要比较值,还要比较版本号,只有两者都匹配时才会执行更新。
八、等待 / 通知机制:线程间的 "消息传递"
Java 的Object类提供了wait()、notify()和notifyAll()方法,用于实现线程间的协作。
8.1 等待 / 通知的基本使用
import lombok.extern.slf4j.Slf4j;
/**
* 等待/通知机制演示
*
* @author ken
*/
@Slf4j
public class WaitNotifyDemo {
// 共享对象,作为锁
private static final Object lock = new Object();
// 标志位,控制等待线程
private static boolean flag = true;
public static void main(String[] args) {
// 等待线程
Thread waiter = new Thread(() -> {
synchronized (lock) {
// 循环检查条件,防止虚假唤醒
while (flag) {
try {
log.info("线程{}:条件不满足,进入等待状态", Thread.currentThread().getName());
// 释放锁并进入等待状态
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
log.info("线程{}:条件满足,执行后续操作", Thread.currentThread().getName());
}
}, "Waiter");
// 通知线程
Thread notifier = new Thread(() -> {
synchronized (lock) {
log.info("线程{}:准备修改条件", Thread.currentThread().getName());
// 修改条件
flag = false;
// 通知等待线程
lock.notify();
log.info("线程{}:已发送通知", Thread.currentThread().getName());
// 休眠2秒,演示通知后不会立即释放锁
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("线程{}:即将释放锁", Thread.currentThread().getName());
}
}, "Notifier");
waiter.start();
// 确保等待线程先启动
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
notifier.start();
}
}
8.2 等待 / 通知的工作流程
8.3 等待 / 通知的注意事项
必须在同步块中使用:wait()、notify()和notifyAll()必须在synchronized块或方法中调用使用循环检查条件:防止虚假唤醒(spurious wakeup),这是 JVM 的一种优化机制notify () 与 notifyAll () 的选择:notify()唤醒一个等待线程,notifyAll()唤醒所有等待线程锁对象必须一致:等待线程和通知线程必须使用同一个锁对象
九、CountDownLatch:线程间的 "倒计时器"
CountDownLatch允许一个或多个线程等待其他线程完成操作后再继续执行。
9.1 CountDownLatch 的基本使用
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch演示
*
* @author ken
*/
@Slf4j
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 创建CountDownLatch,计数器初始值为3
CountDownLatch latch = new CountDownLatch(3);
// 创建3个工作线程
for (int i = 0; i < 3; i++) {
final int taskNum = i + 1;
new Thread(() -> {
try {
log.info("任务{}:开始执行", taskNum);
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 1000));
log.info("任务{}:执行完成", taskNum);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 计数器减1
latch.countDown();
}
}, "Worker-" + taskNum).start();
}
log.info("主线程:等待所有任务完成...");
// 等待计数器变为0
latch.await();
log.info("主线程:所有任务已完成,继续执行");
}
}
9.2 CountDownLatch 的工作原理
CountDownLatch内部维护了一个计数器,初始化时设置一个非负整数。countDown()方法会将计数器减 1,await()方法会阻塞当前线程,直到计数器变为 0。
9.3 CountDownLatch 的高级应用:超时等待
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch超时等待演示
*
* @param latch 倒计时器
*/
public void waitWithTimeout(CountDownLatch latch) {
try {
log.info("开始等待,最多等待3秒");
// 最多等待3秒,如果超时则继续执行
boolean isCompleted = latch.await(3, TimeUnit.SECONDS);
if (isCompleted) {
log.info("所有任务都已完成");
} else {
log.info("等待超时,仍有任务未完成");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("等待被中断");
}
}
十、CyclicBarrier:线程间的 "循环屏障"
CyclicBarrier允许一组线程相互等待,直到所有线程都到达某个屏障点后才继续执行。与CountDownLatch不同的是,CyclicBarrier可以被重置并重复使用。
10.1 CyclicBarrier 的基本使用
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier演示
*
* @author ken
*/
@Slf4j
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 创建CyclicBarrier,需要5个线程到达屏障点,
// 当所有线程到达后,执行指定的 Runnable 任务
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
log.info("所有线程都已到达屏障点,执行汇总任务");
});
// 创建5个线程
for (int i = 0; i < threadCount; i++) {
final int threadNum = i + 1;
new Thread(() -> {
try {
log.info("线程{}:开始执行任务", threadNum);
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 1000));
log.info("线程{}:任务执行完成,等待其他线程", threadNum);
// 到达屏障点,等待其他线程
barrier.await();
log.info("线程{}:所有线程都已到达,继续执行后续任务", threadNum);
} catch (Exception e) {
log.error("线程{}:发生异常", threadNum, e);
}
}, "Thread-" + threadNum).start();
}
}
}
10.2 CyclicBarrier 的循环使用特性
/**
* CyclicBarrier循环使用演示
*/
public static void cyclicUsageDemo() {
int threadCount = 3;
// 创建可循环使用的CyclicBarrier
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
log.info("阶段完成,准备进入下一阶段");
});
// 创建3个线程,执行多个阶段的任务
for (int i = 0; i < threadCount; i++) {
final int threadNum = i + 1;
new Thread(() -> {
try {
// 执行3个阶段的任务
for (int phase = 1; phase <= 3; phase++) {
log.info("线程{}:开始执行第{}阶段任务", threadNum, phase);
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 1000));
log.info("线程{}:第{}阶段任务完成,等待其他线程", threadNum, phase);
// 到达屏障点
barrier.await();
}
log.info("线程{}:所有阶段任务完成", threadNum);
} catch (Exception e) {
log.error("线程{}:发生异常", threadNum, e);
}
}, "Thread-" + threadNum).start();
}
}
10.3 CountDownLatch 与 CyclicBarrier 的对比
特性CountDownLatchCyclicBarrier用途一个或多个线程等待其他线程完成一组线程相互等待,直到都到达屏障点可重用性不可重用,计数器到 0 后就不能再使用可重用,调用 reset () 方法可重置计数器由外部线程调用 countDown () 递减由等待线程自身到达屏障点而递减等待线程等待的线程与执行 countDown () 的线程可以不同等待的线程就是要到达屏障点的线程回调功能不支持支持,所有线程到达后执行一个 Runnable
十一、Semaphore:线程间的 "信号量" 控制
Semaphore用于控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理地使用公共资源。
11.1 Semaphore 的基本使用
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
/**
* Semaphore演示
*
* @author ken
*/
@Slf4j
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建Semaphore,允许3个线程同时访问
int permits = 3;
Semaphore semaphore = new Semaphore(permits);
// 创建10个线程尝试访问资源
for (int i = 0; i < 10; i++) {
final int threadNum = i + 1;
new Thread(() -> {
try {
log.info("线程{}:尝试获取许可", threadNum);
// 获取许可
semaphore.acquire();
log.info("线程{}:获取许可成功,开始访问资源", threadNum);
// 模拟访问资源的时间
Thread.sleep(1000);
log.info("线程{}:访问资源完成", threadNum);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("线程{}:被中断", threadNum, e);
} finally {
// 释放许可
semaphore.release();
log.info("线程{}:释放许可,当前可用许可数:{}",
threadNum, semaphore.availablePermits());
}
}, "Thread-" + threadNum).start();
}
}
}
11.2 Semaphore 的高级用法
11.2.1 尝试获取许可
/**
* 尝试获取许可
*
* @param semaphore 信号量
* @param threadNum 线程编号
*/
public void tryAcquireDemo(Semaphore semaphore, int threadNum) {
try {
log.info("线程{}:尝试获取许可", threadNum);
// 尝试获取许可,立即返回结果
boolean acquired = semaphore.tryAcquire();
if (acquired) {
log.info("线程{}:获取许可成功,开始访问资源", threadNum);
// 模拟访问资源
Thread.sleep(1000);
log.info("线程{}:访问资源完成", threadNum);
} else {
log.info("线程{}:获取许可失败,无法访问资源", threadNum);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("线程{}:被中断", threadNum, e);
} finally {
// 只有获取到许可的线程才需要释放
if (semaphore.hasQueuedThreads()) {
semaphore.release();
log.info("线程{}:释放许可", threadNum);
}
}
}
11.2.2 超时获取许可
import java.util.concurrent.TimeUnit;
/**
* 超时获取许可
*
* @param semaphore 信号量
* @param threadNum 线程编号
*/
public void tryAcquireWithTimeoutDemo(Semaphore semaphore, int threadNum) {
try {
log.info("线程{}:尝试获取许可,最多等待2秒", threadNum);
// 尝试在2秒内获取许可
boolean acquired = semaphore.tryAcquire(2, TimeUnit.SECONDS);
if (acquired) {
log.info("线程{}:获取许可成功,开始访问资源", threadNum);
// 模拟访问资源
Thread.sleep(1000);
log.info("线程{}:访问资源完成", threadNum);
} else {
log.info("线程{}:超时,获取许可失败", threadNum);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("线程{}:被中断", threadNum, e);
} finally {
// 只有获取到许可的线程才需要释放
if (semaphore.availablePermits() < 3) { // 假设初始许可数为3
semaphore.release();
log.info("线程{}:释放许可", threadNum);
}
}
}
11.3 Semaphore 的应用场景
限流:控制同时访问某个资源的线程数量实现互斥锁:当 permits=1 时,Semaphore 就相当于一个互斥锁资源池管理:如数据库连接池,控制同时使用的连接数量
十二、并发容器:线程安全的 "数据仓库"
Java 的java.util.concurrent包提供了一系列线程安全的容器类,它们内部实现了线程同步机制,无需我们手动同步。
12.1 ConcurrentHashMap:高效的线程安全哈希表
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* ConcurrentHashMap演示
*
* @author ken
*/
@Slf4j
public class ConcurrentHashMapDemo {
public static void main(String[] args) {
// 创建ConcurrentHashMap
Map
// 创建10个线程同时操作map
for (int i = 0; i < 10; i++) {
final int threadNum = i;
new Thread(() -> {
// 每个线程向map中添加10个元素
for (int j = 0; j < 10; j++) {
String key = "key-" + threadNum + "-" + j;
map.put(key, j);
log.info("线程{}:添加元素{} = {}", threadNum, key, j);
}
}, "Thread-" + i).start();
}
// 等待所有线程完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("最终map大小:{}", map.size());
}
}
ConcurrentHashMap在 JDK 1.7 中使用分段锁(Segment)实现,JDK 1.8 及以后使用 CAS+synchronized 实现,大大提高了并发性能。
12.2 CopyOnWriteArrayList:读多写少场景的列表
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* CopyOnWriteArrayList演示
*
* @author ken
*/
@Slf4j
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
// 创建CopyOnWriteArrayList
List
// 添加初始元素
list.add("元素1");
list.add("元素2");
list.add("元素3");
// 创建读线程
Runnable reader = () -> {
for (String element : list) {
log.info("线程{}:读取到元素:{}", Thread.currentThread().getName(), element);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
// 创建写线程
Runnable writer = () -> {
try {
// 等待读线程开始
Thread.sleep(200);
log.info("线程{}:添加新元素", Thread.currentThread().getName());
list.add("新元素");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 启动读线程
new Thread(reader, "Reader-1").start();
// 启动写线程
new Thread(writer, "Writer-1").start();
// 再启动一个读线程,观察是否能读到新元素
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
new Thread(reader, "Reader-2").start();
}
}
CopyOnWriteArrayList的核心思想是 "写时复制":当需要修改列表时,会创建一个新的数组,修改操作在新数组上进行,完成后再将引用指向新数组。这使得读操作不需要加锁,非常适合读多写少的场景。
12.3 其他常用并发容器
ConcurrentLinkedQueue:线程安全的无界队列,基于链表实现LinkedBlockingQueue:线程安全的有界或无界队列,基于链表实现ArrayBlockingQueue:线程安全的有界队列,基于数组实现ConcurrentSkipListMap:线程安全的有序映射,支持并发排序操作CopyOnWriteArraySet:基于CopyOnWriteArrayList实现的线程安全集合
十三、线程同步方式的选择指南
面对众多的线程同步方式,如何选择合适的方案是开发中的关键问题。以下是一个决策指南:
十四、线程同步的性能优化策略
减少锁持有时间:只在必要的代码段上加锁,减少锁的持有时间降低锁粒度:将大对象拆分为小对象,减少锁竞争使用无锁机制:在合适场景下使用原子类替代锁读写分离:使用 ReadWriteLock 或 StampedLock 分离读写操作避免锁嵌套:锁嵌套容易导致死锁和性能问题使用并发容器:优先使用 JUC 提供的并发容器,而非手动同步普通容器选择合适的锁类型:根据场景选择公平锁或非公平锁
十五、常见线程同步问题及解决方案
15.1 死锁
import lombok.extern.slf4j.Slf4j;
/**
* 死锁演示及解决方案
*
* @author ken
*/
@Slf4j
public class DeadlockDemo {
// 创建两个锁对象
private static final Object lockA = new Object();
private static final Object lockB = new Object();
/**
* 死锁场景演示
*/
public static void deadlockScenario() {
// 线程1:先获取lockA,再获取lockB
new Thread(() -> {
synchronized (lockA) {
log.info("线程1:获取了lockA,尝试获取lockB");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lockB) {
log.info("线程1:获取了lockB");
}
}
}, "Thread-1").start();
// 线程2:先获取lockB,再获取lockA
new Thread(() -> {
synchronized (lockB) {
log.info("线程2:获取了lockB,尝试获取lockA");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lockA) {
log.info("线程2:获取了lockA");
}
}
}, "Thread-2").start();
}
/**
* 死锁解决方案:按顺序获取锁
*/
public static void solveDeadlock() {
// 线程1:按顺序获取lockA,再获取lockB
new Thread(() -> {
synchronized (lockA) {
log.info("线程1:获取了lockA,尝试获取lockB");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lockB) {
log.info("线程1:获取了lockB");
}
}
}, "Thread-1").start();
// 线程2:同样按顺序获取lockA,再获取lockB
new Thread(() -> {
synchronized (lockA) {
log.info("线程2:获取了lockA,尝试获取lockB");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lockB) {
log.info("线程2:获取了lockB");
}
}
}, "Thread-2").start();
}
public static void main(String[] args) {
log.info("演示死锁场景:");
deadlockScenario();
// 等待死锁演示完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("\n演示死锁解决方案:");
solveDeadlock();
}
}
死锁预防策略:
按顺序获取锁避免锁嵌套使用 tryLock () 设置超时定期释放锁并重试
15.2 活锁
活锁是指线程不断重试一个永远不会成功的操作,虽然没有阻塞,但也无法继续执行。解决方法是引入随机性或限制重试次数。
15.3 饥饿
饥饿是指某些线程长期得不到 CPU 时间或资源。解决方法是使用公平锁或为线程设置合理的优先级。
结语:构建线程安全的并发系统
线程同步是并发编程的核心挑战,也是衡量一个 Java 开发者技术水平的重要标志。从基础的synchronized到高级的StampedLock,从简单的原子操作到复杂的线程协作,每种同步方式都有其适用场景和局限性。