DukeDuke
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • JAVA

      • JVM

        • 认识JVM
        • JVM类加载器
        • 运行时数据区
        • 执行引擎
        • 本地方法接口
        • 本地方法库
        • JVM垃圾回收
        • JVM性能监控
        • JVM调优
      • 设计模式
        • 单例模式
        • 工厂模式
        • 策略模式
        • 适配器模式
        • 建造者模式
        • 原型模式
        • 装饰器模式
        • 代理模式
        • 外观模式
        • 享元模式
        • 组合模式
        • 桥接模式
      • Java多线程

        • Java 线程基础详解
        • Java 线程池详解
        • Java ThreadLocal 详解
        • Java volatile 详解
        • Java 线程间通信详解
        • Java 线程安全详解
        • Java 线程调度详解
        • Java 线程优先级详解

        • Java 线程中断详解
        • Java 线程死锁详解
      • Java反射
      • Java 面试题

        • Java 基础概念面试题
        • Java 面向对象编程面试题
        • Java 集合框架面试题
        • Java 多线程与并发面试题
        • JVM 与内存管理面试题
        • Java I/O 与 NIO 面试题
        • Java 异常处理面试题
        • Java 反射与注解面试题
        • Java Spring 框架面试题
        • Java 数据库与 JDBC 面试题
        • Java 性能优化面试题
        • Java 实际项目经验面试题
        • Java 高级特性面试题
        • Java 面试准备建议
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

Java 线程间通信详解

目录

  • 什么是线程间通信
  • 为什么需要线程间通信
  • 线程间通信的基本原理
  • wait/notify 机制
  • Condition 条件变量
  • BlockingQueue 阻塞队列
  • CountDownLatch 倒计时门闩
  • CyclicBarrier 循环屏障
  • Semaphore 信号量
  • Exchanger 交换器
  • Phaser 阶段器
  • 生产者-消费者模式
  • 实际应用场景
  • 性能优化建议
  • 常见问题与解决方案
  • 参考资源

什么是线程间通信

线程间通信是指多个线程之间协调工作、共享数据、传递信息的过程。在多线程编程中,线程间通信是确保线程安全、实现复杂业务逻辑的重要手段。

线程间通信的核心概念

  1. 同步:确保多个线程按预期顺序执行
  2. 互斥:防止多个线程同时访问共享资源
  3. 协调:让线程之间能够相互等待和通知
  4. 数据传递:在线程之间安全地传递数据

为什么需要线程间通信

1. 数据共享需求

// 多个线程需要访问共享数据
public class SharedData {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

2. 任务协调需求

// 线程A需要等待线程B完成某个任务
public class TaskCoordinator {
    private volatile boolean taskCompleted = false;

    public void waitForTask() throws InterruptedException {
        while (!taskCompleted) {
            Thread.sleep(100);
        }
    }

    public void markTaskCompleted() {
        taskCompleted = true;
    }
}

3. 资源管理需求

// 多个线程需要共享有限的资源
public class ResourceManager {
    private final Semaphore semaphore = new Semaphore(5); // 最多5个资源

    public void useResource() throws InterruptedException {
        semaphore.acquire();
        try {
            // 使用资源
        } finally {
            semaphore.release();
        }
    }
}

线程间通信的基本原理

1. 共享内存模型

2. 消息传递模型

3. 同步机制分类

机制类型主要用途典型实现
互斥锁保护临界区synchronized, ReentrantLock
条件变量线程等待/通知wait/notify, Condition
信号量控制资源访问Semaphore
屏障线程同步点CountDownLatch, CyclicBarrier
队列数据传递BlockingQueue

wait/notify 机制

基本概念

wait/notify 是 Java 中最基础的线程间通信机制,基于对象监视器实现。

核心方法

public class WaitNotifyExample {
    private final Object lock = new Object();
    private boolean condition = false;

    public void waitForCondition() throws InterruptedException {
        synchronized (lock) {
            while (!condition) {
                lock.wait(); // 等待条件满足
            }
            // 条件满足,继续执行
            System.out.println("条件满足,继续执行");
        }
    }

    public void signalCondition() {
        synchronized (lock) {
            condition = true;
            lock.notify(); // 通知等待的线程
        }
    }
}

使用注意事项

  1. 必须在同步块中使用
  2. 使用 while 循环检查条件
  3. 避免虚假唤醒
public class SafeWaitNotify {
    private final Object lock = new Object();
    private boolean ready = false;

    public void waitForReady() throws InterruptedException {
        synchronized (lock) {
            // 使用 while 而不是 if,防止虚假唤醒
            while (!ready) {
                lock.wait();
            }
            // 处理业务逻辑
        }
    }

    public void setReady() {
        synchronized (lock) {
            ready = true;
            lock.notifyAll(); // 使用 notifyAll 更安全
        }
    }
}

生产者-消费者示例

public class ProducerConsumer {
    private final Object lock = new Object();
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;

    public void produce(int item) throws InterruptedException {
        synchronized (lock) {
            while (queue.size() == capacity) {
                lock.wait(); // 队列满,等待
            }
            queue.offer(item);
            System.out.println("生产: " + item);
            lock.notifyAll(); // 通知消费者
        }
    }

    public int consume() throws InterruptedException {
        synchronized (lock) {
            while (queue.isEmpty()) {
                lock.wait(); // 队列空,等待
            }
            int item = queue.poll();
            System.out.println("消费: " + item);
            lock.notifyAll(); // 通知生产者
            return item;
        }
    }
}

Condition 条件变量

基本概念

Condition 是 Lock 接口的配套条件变量,提供了比 wait/notify 更灵活的线程间通信机制。

基本使用

public class ConditionExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void waitForReady() throws InterruptedException {
        lock.lock();
        try {
            while (!ready) {
                condition.await(); // 等待条件
            }
            // 处理业务逻辑
        } finally {
            lock.unlock();
        }
    }

    public void signalReady() {
        lock.lock();
        try {
            ready = true;
            condition.signalAll(); // 通知所有等待的线程
        } finally {
            lock.unlock();
        }
    }
}

多条件示例

public class MultiConditionExample {
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;

    public void put(int item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 等待队列不满
            }
            queue.offer(item);
            notEmpty.signal(); // 通知队列非空
        } finally {
            lock.unlock();
        }
    }

    public int take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 等待队列非空
            }
            int item = queue.poll();
            notFull.signal(); // 通知队列不满
            return item;
        } finally {
            lock.unlock();
        }
    }
}

Condition 的优势

  1. 更灵活的条件管理:可以为不同的条件创建不同的 Condition
  2. 更精确的通知:可以只通知等待特定条件的线程
  3. 更好的性能:避免了不必要的线程唤醒

BlockingQueue 阻塞队列

基本概念

BlockingQueue 是线程安全的队列,提供了阻塞式的插入和删除操作。

常用实现类

// 1. ArrayBlockingQueue - 有界数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(10);

// 2. LinkedBlockingQueue - 可选有界的链表队列
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);

// 3. SynchronousQueue - 同步队列,不存储元素
BlockingQueue<String> syncQueue = new SynchronousQueue<>();

// 4. PriorityBlockingQueue - 优先级队列
BlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>();

基本操作

public class BlockingQueueExample {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    // 生产者
    public void produce() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            String item = "item-" + i;
            queue.put(item); // 阻塞式插入
            System.out.println("生产: " + item);
        }
    }

    // 消费者
    public void consume() throws InterruptedException {
        while (true) {
            String item = queue.take(); // 阻塞式获取
            System.out.println("消费: " + item);
            Thread.sleep(1000); // 模拟处理时间
        }
    }
}

高级操作

public class AdvancedBlockingQueue {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    // 非阻塞操作
    public boolean offerItem(String item) {
        return queue.offer(item); // 立即返回,不阻塞
    }

    // 超时操作
    public boolean offerWithTimeout(String item, long timeout, TimeUnit unit) {
        try {
            return queue.offer(item, timeout, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    // 检查队列状态
    public void checkQueueStatus() {
        System.out.println("队列大小: " + queue.size());
        System.out.println("剩余容量: " + queue.remainingCapacity());
        System.out.println("是否为空: " + queue.isEmpty());
    }
}

CountDownLatch 倒计时门闩

基本概念

CountDownLatch 是一个同步辅助类,允许一个或多个线程等待,直到一组操作在其他线程中完成。

基本使用

public class CountDownLatchExample {
    private final CountDownLatch latch = new CountDownLatch(3);

    public void worker() {
        try {
            // 执行工作
            System.out.println("工作线程开始执行");
            Thread.sleep(2000);
            System.out.println("工作线程执行完成");
        } finally {
            latch.countDown(); // 计数减1
        }
    }

    public void coordinator() throws InterruptedException {
        System.out.println("协调者等待所有工作完成");
        latch.await(); // 等待所有工作完成
        System.out.println("所有工作已完成,继续执行");
    }
}

实际应用场景

public class ParallelTaskProcessor {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);

    public void processTasks(List<Task> tasks) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(tasks.size());

        for (Task task : tasks) {
            executor.submit(() -> {
                try {
                    processTask(task);
                } finally {
                    latch.countDown();
                }
            });
        }

        // 等待所有任务完成
        latch.await();
        System.out.println("所有任务处理完成");
    }

    private void processTask(Task task) {
        // 处理单个任务
        System.out.println("处理任务: " + task.getId());
    }
}

超时等待

public class TimeoutCountDownLatch {
    private final CountDownLatch latch = new CountDownLatch(5);

    public boolean waitWithTimeout(long timeout, TimeUnit unit) {
        try {
            return latch.await(timeout, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}

CyclicBarrier 循环屏障

基本概念

CyclicBarrier 是一个同步辅助类,允许一组线程相互等待,直到所有线程都到达一个公共的屏障点。

基本使用

public class CyclicBarrierExample {
    private final CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println("所有线程都到达屏障,开始下一阶段");
    });

    public void worker(int workerId) {
        try {
            System.out.println("工作线程 " + workerId + " 开始工作");
            Thread.sleep(1000 + workerId * 500);
            System.out.println("工作线程 " + workerId + " 完成工作,等待其他线程");

            barrier.await(); // 等待其他线程

            System.out.println("工作线程 " + workerId + " 继续执行");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

多阶段处理

public class MultiPhaseProcessor {
    private final CyclicBarrier barrier = new CyclicBarrier(3);

    public void processPhase(int phase) {
        try {
            System.out.println("阶段 " + phase + " 开始");
            Thread.sleep(1000);
            System.out.println("阶段 " + phase + " 完成,等待其他线程");

            barrier.await();

            System.out.println("所有线程完成阶段 " + phase + ",开始下一阶段");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

与 CountDownLatch 的区别

特性CountDownLatchCyclicBarrier
计数方向递减到 0递增到指定值
可重用性不可重用可重用
等待方式被动等待主动等待
适用场景一个线程等待多个线程多个线程相互等待

Semaphore 信号量

基本概念

Semaphore 是一个计数信号量,用于控制同时访问特定资源的线程数量。

基本使用

public class SemaphoreExample {
    private final Semaphore semaphore = new Semaphore(3); // 最多3个线程同时访问

    public void accessResource() throws InterruptedException {
        semaphore.acquire(); // 获取许可
        try {
            System.out.println("线程 " + Thread.currentThread().getName() + " 访问资源");
            Thread.sleep(2000); // 模拟资源使用
        } finally {
            semaphore.release(); // 释放许可
        }
    }
}

高级用法

public class AdvancedSemaphore {
    private final Semaphore semaphore = new Semaphore(5, true); // 公平信号量

    public boolean tryAccessResource() {
        if (semaphore.tryAcquire()) {
            try {
                // 访问资源
                return true;
            } finally {
                semaphore.release();
            }
        }
        return false;
    }

    public boolean accessWithTimeout(long timeout, TimeUnit unit) {
        try {
            if (semaphore.tryAcquire(timeout, unit)) {
                try {
                    // 访问资源
                    return true;
                } finally {
                    semaphore.release();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
}

实际应用场景

public class ConnectionPool {
    private final Semaphore semaphore;
    private final Queue<Connection> connections;

    public ConnectionPool(int maxConnections) {
        this.semaphore = new Semaphore(maxConnections);
        this.connections = new ConcurrentLinkedQueue<>();

        // 初始化连接池
        for (int i = 0; i < maxConnections; i++) {
            connections.offer(createConnection());
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return connections.poll();
    }

    public void releaseConnection(Connection connection) {
        connections.offer(connection);
        semaphore.release();
    }

    private Connection createConnection() {
        // 创建数据库连接
        return new Connection();
    }
}

Exchanger 交换器

基本概念

Exchanger 是一个同步点,用于两个线程之间交换数据。

基本使用

public class ExchangerExample {
    private final Exchanger<String> exchanger = new Exchanger<>();

    public void producer() {
        try {
            String data = "生产者数据";
            System.out.println("生产者发送: " + data);
            String received = exchanger.exchange(data);
            System.out.println("生产者接收: " + received);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void consumer() {
        try {
            String data = "消费者数据";
            System.out.println("消费者发送: " + data);
            String received = exchanger.exchange(data);
            System.out.println("消费者接收: " + received);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实际应用场景

public class DataProcessor {
    private final Exchanger<List<String>> exchanger = new Exchanger<>();

    public void dataProducer() {
        try {
            List<String> data = generateData();
            System.out.println("生产者生成数据: " + data.size() + " 条");

            List<String> processedData = exchanger.exchange(data);
            System.out.println("生产者接收处理后的数据: " + processedData.size() + " 条");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void dataConsumer() {
        try {
            List<String> rawData = exchanger.exchange(null);
            System.out.println("消费者接收原始数据: " + rawData.size() + " 条");

            List<String> processedData = processData(rawData);
            System.out.println("消费者处理数据: " + processedData.size() + " 条");

            exchanger.exchange(processedData);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Phaser 阶段器

基本概念

Phaser 是一个可重用的同步屏障,类似于 CyclicBarrier 和 CountDownLatch 的组合。

基本使用

public class PhaserExample {
    private final Phaser phaser = new Phaser(3); // 3个参与线程

    public void worker(int workerId) {
        System.out.println("工作线程 " + workerId + " 开始");

        // 阶段1
        phaser.arriveAndAwaitAdvance();
        System.out.println("工作线程 " + workerId + " 完成阶段1");

        // 阶段2
        phaser.arriveAndAwaitAdvance();
        System.out.println("工作线程 " + workerId + " 完成阶段2");

        // 阶段3
        phaser.arriveAndAwaitAdvance();
        System.out.println("工作线程 " + workerId + " 完成所有阶段");
    }
}

动态注册

public class DynamicPhaser {
    private final Phaser phaser = new Phaser(1); // 主线程注册

    public void dynamicWorker() {
        phaser.register(); // 动态注册

        try {
            System.out.println("动态工作线程开始");
            Thread.sleep(1000);
            System.out.println("动态工作线程完成");
        } finally {
            phaser.arriveAndDeregister(); // 完成并注销
        }
    }
}

生产者-消费者模式

基本实现

public class ProducerConsumerPattern {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
    private volatile boolean running = true;

    // 生产者
    public void producer() {
        try {
            for (int i = 0; i < 100; i++) {
                String item = "item-" + i;
                queue.put(item);
                System.out.println("生产: " + item);
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // 消费者
    public void consumer() {
        try {
            while (running || !queue.isEmpty()) {
                String item = queue.take();
                System.out.println("消费: " + item);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        running = false;
    }
}

多生产者多消费者

public class MultiProducerConsumer {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
    private final AtomicInteger producerCount = new AtomicInteger(0);
    private final AtomicInteger consumerCount = new AtomicInteger(0);

    public void start() {
        // 启动多个生产者
        for (int i = 0; i < 3; i++) {
            new Thread(this::producer, "Producer-" + i).start();
        }

        // 启动多个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(this::consumer, "Consumer-" + i).start();
        }
    }

    private void producer() {
        try {
            while (true) {
                String item = "item-" + producerCount.incrementAndGet();
                queue.put(item);
                System.out.println(Thread.currentThread().getName() + " 生产: " + item);
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void consumer() {
        try {
            while (true) {
                String item = queue.take();
                System.out.println(Thread.currentThread().getName() + " 消费: " + item);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实际应用场景

1. Web 服务器请求处理

@Component
public class RequestProcessor {
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10, 50, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000)
    );

    private final CountDownLatch shutdownLatch = new CountDownLatch(1);

    public void processRequest(HttpRequest request) {
        executor.execute(() -> {
            try {
                // 处理请求
                handleRequest(request);
            } catch (Exception e) {
                // 处理异常
                handleException(e);
            }
        });
    }

    public void shutdown() throws InterruptedException {
        executor.shutdown();
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
        shutdownLatch.countDown();
    }
}

2. 批量数据处理

@Service
public class BatchDataProcessor {
    private final Semaphore semaphore = new Semaphore(10);
    private final CountDownLatch batchLatch = new CountDownLatch(1);

    public void processBatch(List<Data> dataList) throws InterruptedException {
        for (Data data : dataList) {
            semaphore.acquire();
            processDataAsync(data);
        }

        // 等待所有数据处理完成
        batchLatch.await();
    }

    private void processDataAsync(Data data) {
        CompletableFuture.runAsync(() -> {
            try {
                // 处理数据
                processData(data);
            } finally {
                semaphore.release();
            }
        });
    }
}

3. 缓存更新机制

@Component
public class CacheUpdater {
    private final BlockingQueue<CacheUpdateTask> updateQueue = new LinkedBlockingQueue<>();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @PostConstruct
    public void start() {
        new Thread(this::processUpdates, "CacheUpdater").start();
    }

    public void scheduleUpdate(CacheUpdateTask task) {
        updateQueue.offer(task);
    }

    private void processUpdates() {
        while (running.get()) {
            try {
                CacheUpdateTask task = updateQueue.take();
                updateCache(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

性能优化建议

1. 选择合适的通信机制

// 高频率数据传递 - 使用 BlockingQueue
private final BlockingQueue<Data> dataQueue = new ArrayBlockingQueue<>(1000);

// 低频率同步 - 使用 CountDownLatch
private final CountDownLatch syncLatch = new CountDownLatch(1);

// 资源控制 - 使用 Semaphore
private final Semaphore resourceSemaphore = new Semaphore(10);

2. 避免不必要的阻塞

public class NonBlockingCommunication {
    private final AtomicReference<String> sharedData = new AtomicReference<>();

    public void updateData(String newData) {
        sharedData.set(newData);
    }

    public String getData() {
        return sharedData.get();
    }
}

3. 使用无锁数据结构

public class LockFreeCommunication {
    private final ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean hasNewMessage = new AtomicBoolean(false);

    public void sendMessage(String message) {
        messageQueue.offer(message);
        hasNewMessage.set(true);
    }

    public String receiveMessage() {
        if (hasNewMessage.compareAndSet(true, false)) {
            return messageQueue.poll();
        }
        return null;
    }
}

常见问题与解决方案

1. 死锁问题

问题:多个线程相互等待,导致死锁。

解决方案:

public class DeadlockPrevention {
    private final Lock lock1 = new ReentrantLock();
    private final Lock lock2 = new ReentrantLock();

    public void safeMethod() {
        // 按固定顺序获取锁
        if (lock1.tryLock()) {
            try {
                if (lock2.tryLock()) {
                    try {
                        // 执行操作
                    } finally {
                        lock2.unlock();
                    }
                }
            } finally {
                lock1.unlock();
            }
        }
    }
}

2. 内存泄漏问题

问题:ThreadLocal 变量没有及时清理。

解决方案:

public class SafeThreadLocalUsage {
    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public void processRequest() {
        try {
            threadLocal.set("request-data");
            // 处理业务逻辑
        } finally {
            threadLocal.remove(); // 确保清理
        }
    }
}

3. 虚假唤醒问题

问题:线程在没有被通知的情况下被唤醒。

解决方案:

public class SpuriousWakeupPrevention {
    private final Object lock = new Object();
    private boolean condition = false;

    public void waitForCondition() throws InterruptedException {
        synchronized (lock) {
            while (!condition) { // 使用 while 而不是 if
                lock.wait();
            }
            // 处理业务逻辑
        }
    }
}

参考资源

  1. Java Concurrency in Practice (Brian Goetz)
  2. Java 并发编程实战
  3. Oracle Java Documentation
  4. Java 线程间通信最佳实践
  5. 并发编程的艺术

总结

线程间通信是 Java 并发编程的核心内容,正确使用各种通信机制可以确保多线程程序的正确性和性能。在实际应用中,需要根据具体场景选择合适的通信方式,并注意避免常见的问题如死锁、内存泄漏等。通过合理的设计和实现,可以构建出高效、稳定的多线程应用程序。


相关文档:

  • Java 线程基础 - 了解线程的基本概念和同步机制
  • Java 线程池详解 - 深入了解线程池的使用和配置
  • Java ThreadLocal 详解 - 了解线程本地存储机制
最近更新:: 2025/10/9 17:20
Contributors: Duke
Prev
Java volatile 详解
Next
Java 线程安全详解