Java 线程间通信详解
目录
- 什么是线程间通信
- 为什么需要线程间通信
- 线程间通信的基本原理
- wait/notify 机制
- Condition 条件变量
- BlockingQueue 阻塞队列
- CountDownLatch 倒计时门闩
- CyclicBarrier 循环屏障
- Semaphore 信号量
- Exchanger 交换器
- Phaser 阶段器
- 生产者-消费者模式
- 实际应用场景
- 性能优化建议
- 常见问题与解决方案
- 参考资源
什么是线程间通信
线程间通信是指多个线程之间协调工作、共享数据、传递信息的过程。在多线程编程中,线程间通信是确保线程安全、实现复杂业务逻辑的重要手段。
线程间通信的核心概念
- 同步:确保多个线程按预期顺序执行
- 互斥:防止多个线程同时访问共享资源
- 协调:让线程之间能够相互等待和通知
- 数据传递:在线程之间安全地传递数据
为什么需要线程间通信
1. 数据共享需求
// 多个线程需要访问共享数据
public class SharedData {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
2. 任务协调需求
// 线程A需要等待线程B完成某个任务
public class TaskCoordinator {
private volatile boolean taskCompleted = false;
public void waitForTask() throws InterruptedException {
while (!taskCompleted) {
Thread.sleep(100);
}
}
public void markTaskCompleted() {
taskCompleted = true;
}
}
3. 资源管理需求
// 多个线程需要共享有限的资源
public class ResourceManager {
private final Semaphore semaphore = new Semaphore(5); // 最多5个资源
public void useResource() throws InterruptedException {
semaphore.acquire();
try {
// 使用资源
} finally {
semaphore.release();
}
}
}
线程间通信的基本原理
1. 共享内存模型
2. 消息传递模型
3. 同步机制分类
| 机制类型 | 主要用途 | 典型实现 |
|---|---|---|
| 互斥锁 | 保护临界区 | synchronized, ReentrantLock |
| 条件变量 | 线程等待/通知 | wait/notify, Condition |
| 信号量 | 控制资源访问 | Semaphore |
| 屏障 | 线程同步点 | CountDownLatch, CyclicBarrier |
| 队列 | 数据传递 | BlockingQueue |
wait/notify 机制
基本概念
wait/notify 是 Java 中最基础的线程间通信机制,基于对象监视器实现。
核心方法
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean condition = false;
public void waitForCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait(); // 等待条件满足
}
// 条件满足,继续执行
System.out.println("条件满足,继续执行");
}
}
public void signalCondition() {
synchronized (lock) {
condition = true;
lock.notify(); // 通知等待的线程
}
}
}
使用注意事项
- 必须在同步块中使用
- 使用 while 循环检查条件
- 避免虚假唤醒
public class SafeWaitNotify {
private final Object lock = new Object();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
synchronized (lock) {
// 使用 while 而不是 if,防止虚假唤醒
while (!ready) {
lock.wait();
}
// 处理业务逻辑
}
}
public void setReady() {
synchronized (lock) {
ready = true;
lock.notifyAll(); // 使用 notifyAll 更安全
}
}
}
生产者-消费者示例
public class ProducerConsumer {
private final Object lock = new Object();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public void produce(int item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == capacity) {
lock.wait(); // 队列满,等待
}
queue.offer(item);
System.out.println("生产: " + item);
lock.notifyAll(); // 通知消费者
}
}
public int consume() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
lock.wait(); // 队列空,等待
}
int item = queue.poll();
System.out.println("消费: " + item);
lock.notifyAll(); // 通知生产者
return item;
}
}
}
Condition 条件变量
基本概念
Condition 是 Lock 接口的配套条件变量,提供了比 wait/notify 更灵活的线程间通信机制。
基本使用
public class ConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
lock.lock();
try {
while (!ready) {
condition.await(); // 等待条件
}
// 处理业务逻辑
} finally {
lock.unlock();
}
}
public void signalReady() {
lock.lock();
try {
ready = true;
condition.signalAll(); // 通知所有等待的线程
} finally {
lock.unlock();
}
}
}
多条件示例
public class MultiConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public void put(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待队列不满
}
queue.offer(item);
notEmpty.signal(); // 通知队列非空
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列非空
}
int item = queue.poll();
notFull.signal(); // 通知队列不满
return item;
} finally {
lock.unlock();
}
}
}
Condition 的优势
- 更灵活的条件管理:可以为不同的条件创建不同的 Condition
- 更精确的通知:可以只通知等待特定条件的线程
- 更好的性能:避免了不必要的线程唤醒
BlockingQueue 阻塞队列
基本概念
BlockingQueue 是线程安全的队列,提供了阻塞式的插入和删除操作。
常用实现类
// 1. ArrayBlockingQueue - 有界数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(10);
// 2. LinkedBlockingQueue - 可选有界的链表队列
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);
// 3. SynchronousQueue - 同步队列,不存储元素
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 4. PriorityBlockingQueue - 优先级队列
BlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>();
基本操作
public class BlockingQueueExample {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
public void produce() throws InterruptedException {
for (int i = 0; i < 100; i++) {
String item = "item-" + i;
queue.put(item); // 阻塞式插入
System.out.println("生产: " + item);
}
}
// 消费者
public void consume() throws InterruptedException {
while (true) {
String item = queue.take(); // 阻塞式获取
System.out.println("消费: " + item);
Thread.sleep(1000); // 模拟处理时间
}
}
}
高级操作
public class AdvancedBlockingQueue {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 非阻塞操作
public boolean offerItem(String item) {
return queue.offer(item); // 立即返回,不阻塞
}
// 超时操作
public boolean offerWithTimeout(String item, long timeout, TimeUnit unit) {
try {
return queue.offer(item, timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
// 检查队列状态
public void checkQueueStatus() {
System.out.println("队列大小: " + queue.size());
System.out.println("剩余容量: " + queue.remainingCapacity());
System.out.println("是否为空: " + queue.isEmpty());
}
}
CountDownLatch 倒计时门闩
基本概念
CountDownLatch 是一个同步辅助类,允许一个或多个线程等待,直到一组操作在其他线程中完成。
基本使用
public class CountDownLatchExample {
private final CountDownLatch latch = new CountDownLatch(3);
public void worker() {
try {
// 执行工作
System.out.println("工作线程开始执行");
Thread.sleep(2000);
System.out.println("工作线程执行完成");
} finally {
latch.countDown(); // 计数减1
}
}
public void coordinator() throws InterruptedException {
System.out.println("协调者等待所有工作完成");
latch.await(); // 等待所有工作完成
System.out.println("所有工作已完成,继续执行");
}
}
实际应用场景
public class ParallelTaskProcessor {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
public void processTasks(List<Task> tasks) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(tasks.size());
for (Task task : tasks) {
executor.submit(() -> {
try {
processTask(task);
} finally {
latch.countDown();
}
});
}
// 等待所有任务完成
latch.await();
System.out.println("所有任务处理完成");
}
private void processTask(Task task) {
// 处理单个任务
System.out.println("处理任务: " + task.getId());
}
}
超时等待
public class TimeoutCountDownLatch {
private final CountDownLatch latch = new CountDownLatch(5);
public boolean waitWithTimeout(long timeout, TimeUnit unit) {
try {
return latch.await(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
CyclicBarrier 循环屏障
基本概念
CyclicBarrier 是一个同步辅助类,允许一组线程相互等待,直到所有线程都到达一个公共的屏障点。
基本使用
public class CyclicBarrierExample {
private final CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程都到达屏障,开始下一阶段");
});
public void worker(int workerId) {
try {
System.out.println("工作线程 " + workerId + " 开始工作");
Thread.sleep(1000 + workerId * 500);
System.out.println("工作线程 " + workerId + " 完成工作,等待其他线程");
barrier.await(); // 等待其他线程
System.out.println("工作线程 " + workerId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
多阶段处理
public class MultiPhaseProcessor {
private final CyclicBarrier barrier = new CyclicBarrier(3);
public void processPhase(int phase) {
try {
System.out.println("阶段 " + phase + " 开始");
Thread.sleep(1000);
System.out.println("阶段 " + phase + " 完成,等待其他线程");
barrier.await();
System.out.println("所有线程完成阶段 " + phase + ",开始下一阶段");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
与 CountDownLatch 的区别
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数方向 | 递减到 0 | 递增到指定值 |
| 可重用性 | 不可重用 | 可重用 |
| 等待方式 | 被动等待 | 主动等待 |
| 适用场景 | 一个线程等待多个线程 | 多个线程相互等待 |
Semaphore 信号量
基本概念
Semaphore 是一个计数信号量,用于控制同时访问特定资源的线程数量。
基本使用
public class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(3); // 最多3个线程同时访问
public void accessResource() throws InterruptedException {
semaphore.acquire(); // 获取许可
try {
System.out.println("线程 " + Thread.currentThread().getName() + " 访问资源");
Thread.sleep(2000); // 模拟资源使用
} finally {
semaphore.release(); // 释放许可
}
}
}
高级用法
public class AdvancedSemaphore {
private final Semaphore semaphore = new Semaphore(5, true); // 公平信号量
public boolean tryAccessResource() {
if (semaphore.tryAcquire()) {
try {
// 访问资源
return true;
} finally {
semaphore.release();
}
}
return false;
}
public boolean accessWithTimeout(long timeout, TimeUnit unit) {
try {
if (semaphore.tryAcquire(timeout, unit)) {
try {
// 访问资源
return true;
} finally {
semaphore.release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
}
实际应用场景
public class ConnectionPool {
private final Semaphore semaphore;
private final Queue<Connection> connections;
public ConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
this.connections = new ConcurrentLinkedQueue<>();
// 初始化连接池
for (int i = 0; i < maxConnections; i++) {
connections.offer(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return connections.poll();
}
public void releaseConnection(Connection connection) {
connections.offer(connection);
semaphore.release();
}
private Connection createConnection() {
// 创建数据库连接
return new Connection();
}
}
Exchanger 交换器
基本概念
Exchanger 是一个同步点,用于两个线程之间交换数据。
基本使用
public class ExchangerExample {
private final Exchanger<String> exchanger = new Exchanger<>();
public void producer() {
try {
String data = "生产者数据";
System.out.println("生产者发送: " + data);
String received = exchanger.exchange(data);
System.out.println("生产者接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void consumer() {
try {
String data = "消费者数据";
System.out.println("消费者发送: " + data);
String received = exchanger.exchange(data);
System.out.println("消费者接收: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
实际应用场景
public class DataProcessor {
private final Exchanger<List<String>> exchanger = new Exchanger<>();
public void dataProducer() {
try {
List<String> data = generateData();
System.out.println("生产者生成数据: " + data.size() + " 条");
List<String> processedData = exchanger.exchange(data);
System.out.println("生产者接收处理后的数据: " + processedData.size() + " 条");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void dataConsumer() {
try {
List<String> rawData = exchanger.exchange(null);
System.out.println("消费者接收原始数据: " + rawData.size() + " 条");
List<String> processedData = processData(rawData);
System.out.println("消费者处理数据: " + processedData.size() + " 条");
exchanger.exchange(processedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Phaser 阶段器
基本概念
Phaser 是一个可重用的同步屏障,类似于 CyclicBarrier 和 CountDownLatch 的组合。
基本使用
public class PhaserExample {
private final Phaser phaser = new Phaser(3); // 3个参与线程
public void worker(int workerId) {
System.out.println("工作线程 " + workerId + " 开始");
// 阶段1
phaser.arriveAndAwaitAdvance();
System.out.println("工作线程 " + workerId + " 完成阶段1");
// 阶段2
phaser.arriveAndAwaitAdvance();
System.out.println("工作线程 " + workerId + " 完成阶段2");
// 阶段3
phaser.arriveAndAwaitAdvance();
System.out.println("工作线程 " + workerId + " 完成所有阶段");
}
}
动态注册
public class DynamicPhaser {
private final Phaser phaser = new Phaser(1); // 主线程注册
public void dynamicWorker() {
phaser.register(); // 动态注册
try {
System.out.println("动态工作线程开始");
Thread.sleep(1000);
System.out.println("动态工作线程完成");
} finally {
phaser.arriveAndDeregister(); // 完成并注销
}
}
}
生产者-消费者模式
基本实现
public class ProducerConsumerPattern {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
private volatile boolean running = true;
// 生产者
public void producer() {
try {
for (int i = 0; i < 100; i++) {
String item = "item-" + i;
queue.put(item);
System.out.println("生产: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 消费者
public void consumer() {
try {
while (running || !queue.isEmpty()) {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() {
running = false;
}
}
多生产者多消费者
public class MultiProducerConsumer {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
private final AtomicInteger producerCount = new AtomicInteger(0);
private final AtomicInteger consumerCount = new AtomicInteger(0);
public void start() {
// 启动多个生产者
for (int i = 0; i < 3; i++) {
new Thread(this::producer, "Producer-" + i).start();
}
// 启动多个消费者
for (int i = 0; i < 2; i++) {
new Thread(this::consumer, "Consumer-" + i).start();
}
}
private void producer() {
try {
while (true) {
String item = "item-" + producerCount.incrementAndGet();
queue.put(item);
System.out.println(Thread.currentThread().getName() + " 生产: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consumer() {
try {
while (true) {
String item = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
实际应用场景
1. Web 服务器请求处理
@Component
public class RequestProcessor {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
public void processRequest(HttpRequest request) {
executor.execute(() -> {
try {
// 处理请求
handleRequest(request);
} catch (Exception e) {
// 处理异常
handleException(e);
}
});
}
public void shutdown() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
shutdownLatch.countDown();
}
}
2. 批量数据处理
@Service
public class BatchDataProcessor {
private final Semaphore semaphore = new Semaphore(10);
private final CountDownLatch batchLatch = new CountDownLatch(1);
public void processBatch(List<Data> dataList) throws InterruptedException {
for (Data data : dataList) {
semaphore.acquire();
processDataAsync(data);
}
// 等待所有数据处理完成
batchLatch.await();
}
private void processDataAsync(Data data) {
CompletableFuture.runAsync(() -> {
try {
// 处理数据
processData(data);
} finally {
semaphore.release();
}
});
}
}
3. 缓存更新机制
@Component
public class CacheUpdater {
private final BlockingQueue<CacheUpdateTask> updateQueue = new LinkedBlockingQueue<>();
private final AtomicBoolean running = new AtomicBoolean(true);
@PostConstruct
public void start() {
new Thread(this::processUpdates, "CacheUpdater").start();
}
public void scheduleUpdate(CacheUpdateTask task) {
updateQueue.offer(task);
}
private void processUpdates() {
while (running.get()) {
try {
CacheUpdateTask task = updateQueue.take();
updateCache(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
性能优化建议
1. 选择合适的通信机制
// 高频率数据传递 - 使用 BlockingQueue
private final BlockingQueue<Data> dataQueue = new ArrayBlockingQueue<>(1000);
// 低频率同步 - 使用 CountDownLatch
private final CountDownLatch syncLatch = new CountDownLatch(1);
// 资源控制 - 使用 Semaphore
private final Semaphore resourceSemaphore = new Semaphore(10);
2. 避免不必要的阻塞
public class NonBlockingCommunication {
private final AtomicReference<String> sharedData = new AtomicReference<>();
public void updateData(String newData) {
sharedData.set(newData);
}
public String getData() {
return sharedData.get();
}
}
3. 使用无锁数据结构
public class LockFreeCommunication {
private final ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean hasNewMessage = new AtomicBoolean(false);
public void sendMessage(String message) {
messageQueue.offer(message);
hasNewMessage.set(true);
}
public String receiveMessage() {
if (hasNewMessage.compareAndSet(true, false)) {
return messageQueue.poll();
}
return null;
}
}
常见问题与解决方案
1. 死锁问题
问题:多个线程相互等待,导致死锁。
解决方案:
public class DeadlockPrevention {
private final Lock lock1 = new ReentrantLock();
private final Lock lock2 = new ReentrantLock();
public void safeMethod() {
// 按固定顺序获取锁
if (lock1.tryLock()) {
try {
if (lock2.tryLock()) {
try {
// 执行操作
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
}
}
2. 内存泄漏问题
问题:ThreadLocal 变量没有及时清理。
解决方案:
public class SafeThreadLocalUsage {
private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
public void processRequest() {
try {
threadLocal.set("request-data");
// 处理业务逻辑
} finally {
threadLocal.remove(); // 确保清理
}
}
}
3. 虚假唤醒问题
问题:线程在没有被通知的情况下被唤醒。
解决方案:
public class SpuriousWakeupPrevention {
private final Object lock = new Object();
private boolean condition = false;
public void waitForCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) { // 使用 while 而不是 if
lock.wait();
}
// 处理业务逻辑
}
}
}
参考资源
- Java Concurrency in Practice (Brian Goetz)
- Java 并发编程实战
- Oracle Java Documentation
- Java 线程间通信最佳实践
- 并发编程的艺术
总结
线程间通信是 Java 并发编程的核心内容,正确使用各种通信机制可以确保多线程程序的正确性和性能。在实际应用中,需要根据具体场景选择合适的通信方式,并注意避免常见的问题如死锁、内存泄漏等。通过合理的设计和实现,可以构建出高效、稳定的多线程应用程序。
相关文档:
- Java 线程基础 - 了解线程的基本概念和同步机制
- Java 线程池详解 - 深入了解线程池的使用和配置
- Java ThreadLocal 详解 - 了解线程本地存储机制
