DukeDuke
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • JAVA

      • JVM

        • 认识JVM
        • JVM类加载器
        • 运行时数据区
        • 执行引擎
        • 本地方法接口
        • 本地方法库
        • JVM垃圾回收
        • JVM性能监控
        • JVM调优
      • 设计模式
        • 单例模式
        • 工厂模式
        • 策略模式
        • 适配器模式
        • 建造者模式
        • 原型模式
        • 装饰器模式
        • 代理模式
        • 外观模式
        • 享元模式
        • 组合模式
        • 桥接模式
      • Java多线程

        • Java 线程基础详解
        • Java 线程池详解
        • Java ThreadLocal 详解
        • Java volatile 详解
        • Java 线程间通信详解
        • Java 线程安全详解
        • Java 线程调度详解
        • Java 线程优先级详解

        • Java 线程中断详解
        • Java 线程死锁详解
      • Java反射
      • Java 面试题

        • Java 基础概念面试题
        • Java 面向对象编程面试题
        • Java 集合框架面试题
        • Java 多线程与并发面试题
        • JVM 与内存管理面试题
        • Java I/O 与 NIO 面试题
        • Java 异常处理面试题
        • Java 反射与注解面试题
        • Java Spring 框架面试题
        • Java 数据库与 JDBC 面试题
        • Java 性能优化面试题
        • Java 实际项目经验面试题
        • Java 高级特性面试题
        • Java 面试准备建议
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

Java 线程池详解

目录

  • 什么是线程池
  • 为什么需要线程池
  • 线程池的核心参数
  • 线程池的工作原理
  • ThreadPoolExecutor 详解
  • 线程池的创建方式
  • 常用的线程池配置
  • 线程池的拒绝策略
  • 线程池任务队列
  • ThreadPoolExecutor 与 Executors 的比较
  • 线程池监控与调优
  • 实际应用场景
  • 最佳实践建议
  • 常见问题与解决方案
  • 参考资源

什么是线程池

线程池是一种线程使用模式,它通过预先创建一定数量的线程,避免了线程创建和销毁的开销,提高了响应速度。线程池维护着一个工作队列,当有任务需要执行时,从队列中取出任务分配给空闲的线程执行。

为什么需要线程池

1. 性能优势

  • 减少线程创建和销毁的开销:线程的创建和销毁是昂贵的操作
  • 提高响应速度:任务到达时不需要等待线程创建
  • 控制并发数量:避免创建过多线程导致系统资源耗尽

2. 资源管理

  • 统一管理线程:集中管理线程的生命周期
  • 合理利用系统资源:根据系统能力调整线程数量
  • 提供监控能力:可以监控线程池的运行状态

3. 功能增强

  • 任务队列管理:提供多种队列类型
  • 拒绝策略:当线程池满时的处理策略
  • 定时任务支持:支持延迟和周期性任务

线程池的核心参数

1. 核心线程数(corePoolSize)

  • 线程池维护的最小线程数
  • 即使线程空闲,也不会被回收
  • 当任务数量超过核心线程数时,会创建新线程

2. 最大线程数(maximumPoolSize)

  • 线程池允许创建的最大线程数
  • 当任务队列满时,会创建新线程直到达到最大线程数
  • 超过最大线程数的任务会被拒绝

3. 空闲线程存活时间(keepAliveTime)

  • 超过核心线程数的线程,空闲超过该时间将被回收
  • 只对超过核心线程数的线程有效
  • 核心线程默认不会被回收

4. 时间单位(unit)

  • keepAliveTime 的时间单位
  • 常用:TimeUnit.SECONDS、TimeUnit.MILLISECONDS

5. 工作队列(workQueue)

  • 用于存储等待执行的任务
  • 不同的队列类型有不同的特性

6. 线程工厂(threadFactory)

  • 用于创建新线程
  • 可以自定义线程名称、优先级等

7. 拒绝策略(handler)

  • 当线程池和队列都满时的处理策略
  • 有四种内置策略,也可以自定义

线程池的工作原理

ThreadPoolExecutor 详解

构造函数

public ThreadPoolExecutor(
    int corePoolSize,              // 核心线程数
    int maximumPoolSize,           // 最大线程数
    long keepAliveTime,            // 空闲线程存活时间
    TimeUnit unit,                 // 时间单位
    BlockingQueue<Runnable> workQueue,  // 工作队列
    ThreadFactory threadFactory,   // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)

基本使用示例

// 创建 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // 核心线程数
    10,                     // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100),  // 工作队列
    new ThreadFactory() {   // 线程工厂
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("CustomThread-" + t.getId());
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

// 提交任务
executor.execute(() -> {
    // 任务执行代码
});

// 提交有返回值的任务
Future<String> future = executor.submit(() -> {
    // 任务执行代码
    return "执行结果";
});

// 关闭线程池
executor.shutdown();

线程池的拒绝策略

当线程池无法接受新任务时(线程池已满且队列已满),会触发拒绝策略。ThreadPoolExecutor 提供了四种内置的拒绝策略,每种策略都有其特定的使用场景。

1. AbortPolicy(默认策略)

策略描述:

  • 直接抛出 RejectedExecutionException 异常
  • 这是 ThreadPoolExecutor 的默认拒绝策略
  • 会中断调用线程的执行

代码示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    new ThreadPoolExecutor.AbortPolicy()  // 默认策略
);

// 当线程池满时,会抛出异常
try {
    executor.execute(() -> {
        System.out.println("执行任务");
    });
} catch (RejectedExecutionException e) {
    System.err.println("任务被拒绝:" + e.getMessage());
}

适用场景:

  • 需要明确知道任务被拒绝的情况
  • 对任务执行有严格要求,不能容忍任务丢失
  • 需要快速失败,避免任务堆积
  • 调试和测试阶段

优缺点:

  • 优点:能够及时发现线程池过载问题
  • 缺点:可能导致调用线程异常中断

2. CallerRunsPolicy

策略描述:

  • 由调用线程执行被拒绝的任务
  • 调用线程会阻塞,直到任务执行完成
  • 这是一种"回退"机制,能够自然地降低任务提交速度

代码示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 当线程池满时,任务会在调用线程中执行
executor.execute(() -> {
    System.out.println("在线程池中执行:" + Thread.currentThread().getName());
});

// 如果线程池满,这个任务会在主线程中执行
executor.execute(() -> {
    System.out.println("在调用线程中执行:" + Thread.currentThread().getName());
});

适用场景:

  • 任务执行时间较短
  • 调用线程可以承受执行任务的负载
  • 需要保证所有任务都能执行
  • 希望自然地控制任务提交速度

优缺点:

  • 优点:不会丢失任务,能够自然控制提交速度
  • 缺点:可能阻塞调用线程,影响主业务流程

3. DiscardPolicy

策略描述:

  • 直接丢弃被拒绝的任务,不做任何处理
  • 静默丢弃,不会抛出异常
  • 任务会完全丢失,无法恢复

代码示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    new ThreadPoolExecutor.DiscardPolicy()
);

// 当线程池满时,任务会被静默丢弃
for (int i = 0; i < 10; i++) {
    final int taskId = i;
    executor.execute(() -> {
        System.out.println("执行任务 " + taskId);
    });
}
// 部分任务可能被丢弃,不会输出

适用场景:

  • 可以容忍任务丢失
  • 任务不是关键业务逻辑
  • 日志记录、统计等非关键任务
  • 需要避免异常影响主流程

优缺点:

  • 优点:不会影响主流程,性能开销小
  • 缺点:任务丢失,可能导致数据不一致

4. DiscardOldestPolicy

策略描述:

  • 丢弃队列中最老的任务
  • 然后尝试重新提交当前任务
  • 如果重新提交仍然失败,会继续执行拒绝策略

代码示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    new ThreadPoolExecutor.DiscardOldestPolicy()
);

// 提交多个任务
for (int i = 0; i < 10; i++) {
    final int taskId = i;
    executor.execute(() -> {
        System.out.println("执行任务 " + taskId);
    });
}
// 较老的任务可能被丢弃,新任务会被执行

适用场景:

  • 新任务比老任务更重要
  • 可以容忍部分任务丢失
  • 需要优先处理最新任务
  • 实时性要求较高的场景

优缺点:

  • 优点:优先处理新任务,适合实时场景
  • 缺点:可能丢失重要任务,需要谨慎使用

5. 自定义拒绝策略

实现自定义拒绝策略:

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    private final AtomicInteger rejectedCount = new AtomicInteger(0);
    private final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        int count = rejectedCount.incrementAndGet();

        // 记录拒绝信息
        logger.warn("任务被拒绝,拒绝次数:{},线程池状态:{}/{}",
                   count, executor.getActiveCount(), executor.getMaximumPoolSize());

        // 尝试将任务保存到数据库或消息队列
        try {
            saveTaskToQueue(r);
        } catch (Exception e) {
            logger.error("保存任务失败", e);
            // 如果保存失败,可以选择其他策略
            fallbackStrategy(r, executor);
        }
    }

    private void saveTaskToQueue(Runnable task) {
        // 将任务保存到数据库或消息队列,稍后重试
        System.out.println("任务已保存到队列,稍后重试");
    }

    private void fallbackStrategy(Runnable task, ThreadPoolExecutor executor) {
        // 降级策略:由调用线程执行
        if (!executor.isShutdown()) {
            task.run();
        }
    }
}

使用自定义拒绝策略:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new CustomRejectedExecutionHandler()
);

6. 高级拒绝策略实现

带重试机制的拒绝策略:

public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
    private final int maxRetries;
    private final long retryDelay;
    private final TimeUnit retryTimeUnit;

    public RetryRejectedExecutionHandler(int maxRetries, long retryDelay, TimeUnit retryTimeUnit) {
        this.maxRetries = maxRetries;
        this.retryDelay = retryDelay;
        this.retryTimeUnit = retryTimeUnit;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (executor.isShutdown()) {
            return;
        }

        // 尝试重试提交
        for (int i = 0; i < maxRetries; i++) {
            try {
                Thread.sleep(retryTimeUnit.toMillis(retryDelay));
                if (executor.getQueue().offer(r)) {
                    return; // 成功提交
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }

        // 重试失败,执行降级策略
        System.err.println("任务重试失败,执行降级策略");
        r.run();
    }
}

带监控的拒绝策略:

public class MonitoringRejectedExecutionHandler implements RejectedExecutionHandler {
    private final MeterRegistry meterRegistry;
    private final Counter rejectedCounter;
    private final Gauge queueSizeGauge;

    public MonitoringRejectedExecutionHandler(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.rejectedCounter = Counter.builder("threadpool.rejected.tasks")
                .description("Number of rejected tasks")
                .register(meterRegistry);
        this.queueSizeGauge = Gauge.builder("threadpool.queue.size")
                .description("Current queue size")
                .register(meterRegistry, executor -> executor.getQueue().size());
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录指标
        rejectedCounter.increment();

        // 记录详细信息
        System.err.printf("任务被拒绝 - 活跃线程:%d,队列大小:%d,最大线程数:%d%n",
                         executor.getActiveCount(),
                         executor.getQueue().size(),
                         executor.getMaximumPoolSize());

        // 执行降级策略
        if (!executor.isShutdown()) {
            r.run();
        }
    }
}

7. 拒绝策略选择指南

根据业务场景选择拒绝策略:

业务场景推荐策略原因
关键业务逻辑AbortPolicy需要明确知道任务被拒绝
日志记录DiscardPolicy可以容忍任务丢失
实时数据处理DiscardOldestPolicy优先处理新数据
批量处理CallerRunsPolicy保证任务不丢失
高并发场景自定义策略需要复杂的降级机制

性能对比:

// 性能测试代码
public class RejectionPolicyPerformanceTest {
    public static void main(String[] args) {
        testPolicy("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
        testPolicy("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
        testPolicy("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
        testPolicy("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    private static void testPolicy(String name, RejectedExecutionHandler handler) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2),
            handler
        );

        long startTime = System.currentTimeMillis();
        int successCount = 0;
        int rejectCount = 0;

        for (int i = 0; i < 1000; i++) {
            try {
                executor.execute(() -> {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                successCount++;
            } catch (RejectedExecutionException e) {
                rejectCount++;
            }
        }

        long endTime = System.currentTimeMillis();
        System.out.printf("%s - 成功:%d,拒绝:%d,耗时:%dms%n",
                         name, successCount, rejectCount, endTime - startTime);

        executor.shutdown();
    }
}

8. 最佳实践建议

1. 根据业务特点选择策略:

// 关键业务 - 使用 AbortPolicy
ThreadPoolExecutor criticalExecutor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.AbortPolicy()
);

// 日志处理 - 使用 DiscardPolicy
ThreadPoolExecutor logExecutor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(50),
    new ThreadPoolExecutor.DiscardPolicy()
);

2. 结合监控和告警:

public class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录拒绝事件
        logRejectionEvent(executor);

        // 发送告警
        sendAlert(executor);

        // 执行降级策略
        executeFallback(r, executor);
    }
}

3. 动态调整线程池参数:

public class AdaptiveThreadPool {
    private final ThreadPoolExecutor executor;
    private final ScheduledExecutorService monitor;

    public AdaptiveThreadPool() {
        this.executor = new ThreadPoolExecutor(...);
        this.monitor = Executors.newScheduledThreadPool(1);

        // 定期监控和调整
        monitor.scheduleAtFixedRate(this::adjustPoolSize, 0, 30, TimeUnit.SECONDS);
    }

    private void adjustPoolSize() {
        double queueUsage = (double) executor.getQueue().size() / executor.getQueue().remainingCapacity();
        if (queueUsage > 0.8) {
            // 增加线程数
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 2);
        }
    }
}

线程池任务队列

线程池的任务队列是存储等待执行任务的重要组件,它直接影响线程池的性能和行为。不同类型的队列具有不同的特性,选择合适的队列类型对线程池的性能至关重要。

1. 任务队列的作用

基本功能:

  • 缓冲任务:当线程池中的线程都在忙碌时,新提交的任务会进入队列等待
  • 控制并发:通过队列大小限制,可以控制系统的并发处理能力
  • 内存管理:有界队列可以防止任务无限堆积导致内存溢出
  • 优先级处理:支持按优先级处理任务

工作流程:

2. 队列类型概述

Java 线程池支持多种类型的队列,每种都有其特定的使用场景:

队列类型特性适用场景优缺点
ArrayBlockingQueue有界、FIFO需要控制内存使用优点:内存可控;缺点:容量固定
LinkedBlockingQueue无界/有界、FIFO任务数量不确定优点:灵活;缺点:可能 OOM
SynchronousQueue不存储元素任务处理速度快优点:直接传递;缺点:无缓冲
PriorityBlockingQueue优先级队列需要按优先级处理优点:支持优先级;缺点:性能较低
DelayQueue延迟队列定时任务优点:支持延迟;缺点:复杂度高

3. ArrayBlockingQueue(数组阻塞队列)

特性:

  • 基于数组实现的有界阻塞队列
  • 容量固定,创建时指定
  • 采用 FIFO(先进先出)原则
  • 使用 ReentrantLock 保证线程安全

代码示例:

// 创建容量为100的数组阻塞队列
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);

// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    queue,  // 使用数组阻塞队列
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 队列操作示例
try {
    // 添加任务到队列(阻塞)
    queue.put(() -> System.out.println("任务1"));

    // 添加任务到队列(非阻塞,失败返回false)
    boolean success = queue.offer(() -> System.out.println("任务2"));

    // 从队列取出任务(阻塞)
    Runnable task = queue.take();
    task.run();

    // 从队列取出任务(非阻塞,失败返回null)
    Runnable task2 = queue.poll();
    if (task2 != null) {
        task2.run();
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

适用场景:

  • 需要严格控制内存使用
  • 任务数量相对稳定
  • 对任务执行顺序有要求
  • 高并发场景下的任务缓冲

优缺点:

  • 优点:内存使用可控,性能稳定,支持公平/非公平模式
  • 缺点:容量固定,无法动态调整

4. LinkedBlockingQueue(链表阻塞队列)

特性:

  • 基于链表实现的可选有界阻塞队列
  • 默认无界(Integer.MAX_VALUE),也可指定容量
  • 采用 FIFO 原则
  • 使用两把锁(putLock 和 takeLock)提高并发性能

代码示例:

// 创建无界链表阻塞队列
LinkedBlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();

// 创建有界链表阻塞队列
LinkedBlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(1000);

// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    boundedQueue,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 队列操作示例
try {
    // 添加任务(阻塞)
    queue.put(() -> System.out.println("任务1"));

    // 添加任务(非阻塞)
    boolean success = queue.offer(() -> System.out.println("任务2"));

    // 批量添加
    List<Runnable> tasks = Arrays.asList(
        () -> System.out.println("任务3"),
        () -> System.out.println("任务4")
    );
    queue.addAll(tasks);

    // 取出任务
    Runnable task = queue.take();
    task.run();

    // 批量取出
    List<Runnable> drainedTasks = new ArrayList<>();
    queue.drainTo(drainedTasks, 10); // 最多取出10个任务
    for (Runnable t : drainedTasks) {
        t.run();
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

适用场景:

  • 任务数量不确定
  • 需要较大的缓冲容量
  • 对内存使用要求不严格
  • 需要批量操作任务

优缺点:

  • 优点:容量灵活,支持批量操作,并发性能好
  • 缺点:无界队列可能导致内存溢出

5. SynchronousQueue(同步队列)

特性:

  • 不存储元素的阻塞队列
  • 每个插入操作必须等待另一个线程的移除操作
  • 支持公平和非公平模式
  • 适合直接传递任务的场景

代码示例:

// 创建同步队列
SynchronousQueue<Runnable> queue = new SynchronousQueue<>();

// 创建公平模式的同步队列
SynchronousQueue<Runnable> fairQueue = new SynchronousQueue<>(true);

// 在线程池中使用(通常用于可缓存线程池)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    queue,  // 使用同步队列
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 队列操作示例
try {
    // 添加任务(阻塞直到有线程取走)
    queue.put(() -> System.out.println("任务1"));

    // 添加任务(非阻塞)
    boolean success = queue.offer(() -> System.out.println("任务2"));

    // 取出任务(阻塞)
    Runnable task = queue.take();
    task.run();

    // 取出任务(非阻塞)
    Runnable task2 = queue.poll();
    if (task2 != null) {
        task2.run();
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

适用场景:

  • 任务处理速度很快
  • 不需要任务缓冲
  • 需要直接传递任务
  • 可缓存线程池场景

优缺点:

  • 优点:无内存开销,直接传递,性能高
  • 缺点:无缓冲能力,任务可能被拒绝

6. PriorityBlockingQueue(优先级阻塞队列)

特性:

  • 基于堆实现的优先级阻塞队列
  • 支持无界容量
  • 元素必须实现 Comparable 接口或提供 Comparator
  • 优先级高的任务先执行

代码示例:

// 创建优先级队列
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();

// 创建带比较器的优先级队列
PriorityBlockingQueue<Task> customQueue = new PriorityBlockingQueue<>(
    11, // 初始容量
    (t1, t2) -> Integer.compare(t1.getPriority(), t2.getPriority())
);

// 优先级任务类
class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private final int priority;
    private final String name;

    public PriorityTask(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("执行任务: " + name + ", 优先级: " + priority);
    }

    @Override
    public int compareTo(PriorityTask other) {
        return Integer.compare(other.priority, this.priority); // 优先级高的先执行
    }
}

// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    queue,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 提交不同优先级的任务
executor.execute(new PriorityTask(1, "低优先级任务"));
executor.execute(new PriorityTask(3, "高优先级任务"));
executor.execute(new PriorityTask(2, "中优先级任务"));

// 任务会按优先级执行:高 -> 中 -> 低

适用场景:

  • 需要按优先级处理任务
  • 任务重要性不同
  • 需要动态调整任务执行顺序
  • 实时系统或任务调度系统

优缺点:

  • 优点:支持优先级,灵活的任务调度
  • 缺点:性能相对较低,内存使用较高

7. DelayQueue(延迟队列)

特性:

  • 基于 PriorityQueue 实现的延迟阻塞队列
  • 元素必须实现 Delayed 接口
  • 只有延迟时间到了才能取出元素
  • 支持无界容量

代码示例:

// 创建延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

// 延迟任务类
class DelayedTask implements Runnable, Delayed {
    private final long executeTime;
    private final String name;

    public DelayedTask(String name, long delayMs) {
        this.name = name;
        this.executeTime = System.currentTimeMillis() + delayMs;
    }

    @Override
    public void run() {
        System.out.println("执行延迟任务: " + name +
                          ", 延迟时间: " + (executeTime - System.currentTimeMillis()) + "ms");
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
    }
}

// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    delayQueue,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 提交延迟任务
executor.execute(new DelayedTask("任务1", 1000)); // 1秒后执行
executor.execute(new DelayedTask("任务2", 500));  // 0.5秒后执行
executor.execute(new DelayedTask("任务3", 2000)); // 2秒后执行

// 任务会按延迟时间顺序执行:任务2 -> 任务1 -> 任务3

适用场景:

  • 定时任务执行
  • 延迟任务处理
  • 任务调度系统
  • 缓存过期处理

优缺点:

  • 优点:支持延迟执行,精确的时间控制
  • 缺点:实现复杂,内存使用较高

8. 队列选择指南

根据业务场景选择队列:

业务场景推荐队列原因
高并发 Web 服务ArrayBlockingQueue内存可控,性能稳定
批量数据处理LinkedBlockingQueue容量大,支持批量操作
实时任务处理SynchronousQueue直接传递,响应快
任务调度系统PriorityBlockingQueue支持优先级调度
定时任务DelayQueue支持延迟执行
内存敏感场景ArrayBlockingQueue有界队列,防止 OOM
任务数量不确定LinkedBlockingQueue无界队列,灵活适应

根据性能要求选择:

// 性能测试代码
public class QueuePerformanceTest {
    public static void main(String[] args) {
        testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(1000));
        testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>(1000));
        testQueue("SynchronousQueue", new SynchronousQueue<>());
        testQueue("PriorityBlockingQueue", new PriorityBlockingQueue<>());
    }

    private static void testQueue(String name, BlockingQueue<Runnable> queue) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60L, TimeUnit.SECONDS, queue,
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        long startTime = System.currentTimeMillis();
        int taskCount = 10000;

        // 提交任务
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            executor.execute(() -> {
                // 模拟任务执行
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 等待所有任务完成
        executor.shutdown();
        try {
            executor.awaitTermination(60, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        long endTime = System.currentTimeMillis();
        System.out.printf("%s - 处理%d个任务,耗时:%dms%n",
                         name, taskCount, endTime - startTime);
    }
}

9. 队列性能对比

性能特征对比:

队列类型吞吐量内存使用延迟并发性能适用场景
ArrayBlockingQueue高低低中等高并发、内存敏感
LinkedBlockingQueue高中等低高通用场景
SynchronousQueue最高最低最低最高实时处理
PriorityBlockingQueue低高高低优先级调度
DelayQueue低高高低定时任务

内存使用对比:

// 内存使用测试
public class QueueMemoryTest {
    public static void main(String[] args) {
        testMemoryUsage("ArrayBlockingQueue", new ArrayBlockingQueue<>(1000));
        testMemoryUsage("LinkedBlockingQueue", new LinkedBlockingQueue<>(1000));
        testMemoryUsage("SynchronousQueue", new SynchronousQueue<>());
        testMemoryUsage("PriorityBlockingQueue", new PriorityBlockingQueue<>());
    }

    private static void testMemoryUsage(String name, BlockingQueue<Runnable> queue) {
        // 添加1000个任务
        for (int i = 0; i < 1000; i++) {
            queue.offer(() -> System.out.println("Task " + i));
        }

        // 获取内存使用情况
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        System.out.printf("%s - 内存使用:%d KB%n", name, usedMemory / 1024);
    }
}

10. 队列配置最佳实践

1. 根据 CPU 核心数配置队列大小:

public class QueueSizeCalculator {
    public static int calculateQueueSize(int corePoolSize, int maxPoolSize) {
        int cpuCount = Runtime.getRuntime().availableProcessors();

        // CPU密集型任务:队列大小为CPU核心数的2-4倍
        // I/O密集型任务:队列大小为CPU核心数的10-20倍

        if (isCpuIntensive()) {
            return cpuCount * 2;
        } else {
            return cpuCount * 10;
        }
    }

    private static boolean isCpuIntensive() {
        // 根据业务特点判断是否为CPU密集型
        return true; // 示例
    }
}

2. 动态调整队列大小:

public class DynamicQueueSize {
    private final BlockingQueue<Runnable> queue;
    private final AtomicInteger currentSize;
    private final int maxSize;

    public DynamicQueueSize(int initialSize, int maxSize) {
        this.queue = new LinkedBlockingQueue<>(initialSize);
        this.currentSize = new AtomicInteger(initialSize);
        this.maxSize = maxSize;
    }

    public void adjustQueueSize() {
        // 根据系统负载动态调整队列大小
        double cpuUsage = getCpuUsage();
        double memoryUsage = getMemoryUsage();

        if (cpuUsage > 0.8 && memoryUsage < 0.7) {
            // CPU使用率高但内存充足,增加队列大小
            int newSize = Math.min(currentSize.get() + 100, maxSize);
            currentSize.set(newSize);
        } else if (cpuUsage < 0.5 && memoryUsage > 0.8) {
            // CPU使用率低但内存紧张,减少队列大小
            int newSize = Math.max(currentSize.get() - 50, 100);
            currentSize.set(newSize);
        }
    }

    private double getCpuUsage() {
        // 获取CPU使用率
        return 0.5; // 示例
    }

    private double getMemoryUsage() {
        // 获取内存使用率
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        long maxMemory = runtime.maxMemory();
        return (double) usedMemory / maxMemory;
    }
}

3. 队列监控和告警:

public class QueueMonitor {
    private final BlockingQueue<Runnable> queue;
    private final ScheduledExecutorService monitor;
    private final AtomicLong totalTasks = new AtomicLong(0);
    private final AtomicLong rejectedTasks = new AtomicLong(0);

    public QueueMonitor(BlockingQueue<Runnable> queue) {
        this.queue = queue;
        this.monitor = Executors.newScheduledThreadPool(1);

        // 每5秒监控一次
        monitor.scheduleAtFixedRate(this::monitorQueue, 0, 5, TimeUnit.SECONDS);
    }

    private void monitorQueue() {
        int queueSize = queue.size();
        int queueCapacity = queue.remainingCapacity() + queueSize;
        double queueUsage = (double) queueSize / queueCapacity;

        System.out.printf("队列监控 - 当前大小:%d,容量:%d,使用率:%.2f%%%n",
                         queueSize, queueCapacity, queueUsage * 100);

        // 队列使用率过高时告警
        if (queueUsage > 0.8) {
            System.err.println("警告:队列使用率过高!");
            // 发送告警通知
            sendAlert("队列使用率过高", queueUsage);
        }

        // 队列为空时记录
        if (queueSize == 0) {
            System.out.println("信息:队列为空,所有任务已处理完成");
        }
    }

    private void sendAlert(String message, double value) {
        // 发送告警通知(邮件、短信、钉钉等)
        System.err.printf("告警:%s,当前值:%.2f%n", message, value);
    }

    public void shutdown() {
        monitor.shutdown();
    }
}

11. 自定义队列实现

1. 带统计功能的队列:

public class StatisticsBlockingQueue<E> implements BlockingQueue<E> {
    private final BlockingQueue<E> delegate;
    private final AtomicLong totalOffers = new AtomicLong(0);
    private final AtomicLong totalPolls = new AtomicLong(0);
    private final AtomicLong rejectedOffers = new AtomicLong(0);
    private final AtomicLong emptyPolls = new AtomicLong(0);

    public StatisticsBlockingQueue(BlockingQueue<E> delegate) {
        this.delegate = delegate;
    }

    @Override
    public boolean offer(E e) {
        totalOffers.incrementAndGet();
        boolean result = delegate.offer(e);
        if (!result) {
            rejectedOffers.incrementAndGet();
        }
        return result;
    }

    @Override
    public E poll() {
        E result = delegate.poll();
        totalPolls.incrementAndGet();
        if (result == null) {
            emptyPolls.incrementAndGet();
        }
        return result;
    }

    @Override
    public E take() throws InterruptedException {
        E result = delegate.take();
        totalPolls.incrementAndGet();
        return result;
    }

    // 获取统计信息
    public QueueStatistics getStatistics() {
        return new QueueStatistics(
            totalOffers.get(),
            totalPolls.get(),
            rejectedOffers.get(),
            emptyPolls.get(),
            delegate.size()
        );
    }

    // 统计信息类
    public static class QueueStatistics {
        private final long totalOffers;
        private final long totalPolls;
        private final long rejectedOffers;
        private final long emptyPolls;
        private final int currentSize;

        public QueueStatistics(long totalOffers, long totalPolls, long rejectedOffers,
                              long emptyPolls, int currentSize) {
            this.totalOffers = totalOffers;
            this.totalPolls = totalPolls;
            this.rejectedOffers = rejectedOffers;
            this.emptyPolls = emptyPolls;
            this.currentSize = currentSize;
        }

        public double getRejectionRate() {
            return totalOffers > 0 ? (double) rejectedOffers / totalOffers : 0.0;
        }

        public double getEmptyPollRate() {
            return totalPolls > 0 ? (double) emptyPolls / totalPolls : 0.0;
        }

        // getter方法...
    }

    // 实现其他BlockingQueue方法...
    @Override
    public int size() { return delegate.size(); }

    @Override
    public boolean isEmpty() { return delegate.isEmpty(); }

    @Override
    public boolean contains(Object o) { return delegate.contains(o); }

    @Override
    public Iterator<E> iterator() { return delegate.iterator(); }

    @Override
    public Object[] toArray() { return delegate.toArray(); }

    @Override
    public <T> T[] toArray(T[] a) { return delegate.toArray(a); }

    @Override
    public boolean remove(Object o) { return delegate.remove(o); }

    @Override
    public boolean containsAll(Collection<?> c) { return delegate.containsAll(c); }

    @Override
    public boolean addAll(Collection<? extends E> c) { return delegate.addAll(c); }

    @Override
    public boolean removeAll(Collection<?> c) { return delegate.removeAll(c); }

    @Override
    public boolean retainAll(Collection<?> c) { return delegate.retainAll(c); }

    @Override
    public void clear() { delegate.clear(); }

    @Override
    public boolean add(E e) { return delegate.add(e); }

    @Override
    public E remove() { return delegate.remove(); }

    @Override
    public E element() { return delegate.element(); }

    @Override
    public E peek() { return delegate.peek(); }

    @Override
    public void put(E e) throws InterruptedException { delegate.put(e); }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.offer(e, timeout, unit);
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.poll(timeout, unit);
    }

    @Override
    public int remainingCapacity() { return delegate.remainingCapacity(); }

    @Override
    public boolean remove(Object o) { return delegate.remove(o); }

    @Override
    public int drainTo(Collection<? super E> c) { return delegate.drainTo(c); }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return delegate.drainTo(c, maxElements);
    }
}

2. 带优先级的任务队列:

public class PriorityTaskQueue implements BlockingQueue<Runnable> {
    private final PriorityBlockingQueue<PriorityTask> queue;
    private final AtomicInteger sequence = new AtomicInteger(0);

    public PriorityTaskQueue() {
        this.queue = new PriorityBlockingQueue<>();
    }

    public PriorityTaskQueue(int initialCapacity) {
        this.queue = new PriorityBlockingQueue<>(initialCapacity);
    }

    @Override
    public boolean offer(Runnable task) {
        if (task instanceof PriorityTask) {
            return queue.offer((PriorityTask) task);
        } else {
            // 将普通任务包装为优先级任务
            PriorityTask priorityTask = new PriorityTask(task, 0, sequence.getAndIncrement());
            return queue.offer(priorityTask);
        }
    }

    @Override
    public Runnable poll() {
        return queue.poll();
    }

    @Override
    public Runnable take() throws InterruptedException {
        return queue.take();
    }

    // 提交带优先级的任务
    public boolean offer(Runnable task, int priority) {
        PriorityTask priorityTask = new PriorityTask(task, priority, sequence.getAndIncrement());
        return queue.offer(priorityTask);
    }

    // 优先级任务类
    private static class PriorityTask implements Runnable, Comparable<PriorityTask> {
        private final Runnable task;
        private final int priority;
        private final int sequence;

        public PriorityTask(Runnable task, int priority, int sequence) {
            this.task = task;
            this.priority = priority;
            this.sequence = sequence;
        }

        @Override
        public void run() {
            task.run();
        }

        @Override
        public int compareTo(PriorityTask other) {
            // 优先级高的先执行,优先级相同时按提交顺序
            int priorityCompare = Integer.compare(other.priority, this.priority);
            return priorityCompare != 0 ? priorityCompare : Integer.compare(this.sequence, other.sequence);
        }
    }

    // 实现其他方法...
    @Override
    public int size() { return queue.size(); }

    @Override
    public boolean isEmpty() { return queue.isEmpty(); }

    @Override
    public boolean contains(Object o) { return queue.contains(o); }

    @Override
    public Iterator<Runnable> iterator() { return queue.iterator(); }

    @Override
    public Object[] toArray() { return queue.toArray(); }

    @Override
    public <T> T[] toArray(T[] a) { return queue.toArray(a); }

    @Override
    public boolean remove(Object o) { return queue.remove(o); }

    @Override
    public boolean containsAll(Collection<?> c) { return queue.containsAll(c); }

    @Override
    public boolean addAll(Collection<? extends Runnable> c) { return queue.addAll(c); }

    @Override
    public boolean removeAll(Collection<?> c) { return queue.removeAll(c); }

    @Override
    public boolean retainAll(Collection<?> c) { return queue.retainAll(c); }

    @Override
    public void clear() { queue.clear(); }

    @Override
    public boolean add(Runnable e) { return queue.add(e); }

    @Override
    public Runnable remove() { return queue.remove(); }

    @Override
    public Runnable element() { return queue.element(); }

    @Override
    public Runnable peek() { return queue.peek(); }

    @Override
    public void put(Runnable e) throws InterruptedException { queue.put(e); }

    @Override
    public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
        return queue.offer(e, timeout, unit);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public int remainingCapacity() { return queue.remainingCapacity(); }

    @Override
    public int drainTo(Collection<? super Runnable> c) { return queue.drainTo(c); }

    @Override
    public int drainTo(Collection<? super Runnable> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }
}

3. 带限流的队列:

public class RateLimitedQueue<E> implements BlockingQueue<E> {
    private final BlockingQueue<E> delegate;
    private final RateLimiter rateLimiter;
    private final AtomicLong rejectedCount = new AtomicLong(0);

    public RateLimitedQueue(BlockingQueue<E> delegate, double permitsPerSecond) {
        this.delegate = delegate;
        this.rateLimiter = RateLimiter.create(permitsPerSecond);
    }

    @Override
    public boolean offer(E e) {
        if (rateLimiter.tryAcquire()) {
            return delegate.offer(e);
        } else {
            rejectedCount.incrementAndGet();
            return false;
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        rateLimiter.acquire();
        delegate.put(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (rateLimiter.tryAcquire(timeout, unit)) {
            return delegate.offer(e, timeout, unit);
        } else {
            rejectedCount.incrementAndGet();
            return false;
        }
    }

    public long getRejectedCount() {
        return rejectedCount.get();
    }

    public double getCurrentRate() {
        return rateLimiter.getRate();
    }

    public void setRate(double permitsPerSecond) {
        rateLimiter.setRate(permitsPerSecond);
    }

    // 实现其他方法...
    @Override
    public E poll() { return delegate.poll(); }

    @Override
    public E take() throws InterruptedException { return delegate.take(); }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.poll(timeout, unit);
    }

    @Override
    public int size() { return delegate.size(); }

    @Override
    public boolean isEmpty() { return delegate.isEmpty(); }

    @Override
    public boolean contains(Object o) { return delegate.contains(o); }

    @Override
    public Iterator<E> iterator() { return delegate.iterator(); }

    @Override
    public Object[] toArray() { return delegate.toArray(); }

    @Override
    public <T> T[] toArray(T[] a) { return delegate.toArray(a); }

    @Override
    public boolean remove(Object o) { return delegate.remove(o); }

    @Override
    public boolean containsAll(Collection<?> c) { return delegate.containsAll(c); }

    @Override
    public boolean addAll(Collection<? extends E> c) { return delegate.addAll(c); }

    @Override
    public boolean removeAll(Collection<?> c) { return delegate.removeAll(c); }

    @Override
    public boolean retainAll(Collection<?> c) { return delegate.retainAll(c); }

    @Override
    public void clear() { delegate.clear(); }

    @Override
    public boolean add(E e) { return delegate.add(e); }

    @Override
    public E remove() { return delegate.remove(); }

    @Override
    public E element() { return delegate.element(); }

    @Override
    public E peek() { return delegate.peek(); }

    @Override
    public int remainingCapacity() { return delegate.remainingCapacity(); }

    @Override
    public int drainTo(Collection<? super E> c) { return delegate.drainTo(c); }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return delegate.drainTo(c, maxElements);
    }
}

4. 使用自定义队列的线程池:

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 使用带统计功能的队列
        BlockingQueue<Runnable> statisticsQueue = new StatisticsBlockingQueue<>(
            new ArrayBlockingQueue<>(100)
        );

        // 使用带优先级的队列
        PriorityTaskQueue priorityQueue = new PriorityTaskQueue(100);

        // 使用带限流的队列
        RateLimitedQueue<Runnable> rateLimitedQueue = new RateLimitedQueue<>(
            new LinkedBlockingQueue<>(1000), 10.0 // 每秒最多10个任务
        );

        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5, 10, 60L, TimeUnit.SECONDS,
            statisticsQueue, // 使用自定义队列
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务
        for (int i = 0; i < 100; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println("执行任务: " + taskId);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 获取统计信息
        if (statisticsQueue instanceof StatisticsBlockingQueue) {
            StatisticsBlockingQueue.QueueStatistics stats =
                ((StatisticsBlockingQueue<Runnable>) statisticsQueue).getStatistics();
            System.out.println("队列统计信息: " + stats);
        }

        executor.shutdown();
    }
}

12. 高级队列用法

1. 队列组合使用:

public class CompositeQueue<E> implements BlockingQueue<E> {
    private final BlockingQueue<E> primaryQueue;
    private final BlockingQueue<E> secondaryQueue;
    private final AtomicBoolean usePrimary = new AtomicBoolean(true);

    public CompositeQueue(BlockingQueue<E> primaryQueue, BlockingQueue<E> secondaryQueue) {
        this.primaryQueue = primaryQueue;
        this.secondaryQueue = secondaryQueue;
    }

    @Override
    public boolean offer(E e) {
        if (usePrimary.get()) {
            if (primaryQueue.offer(e)) {
                return true;
            } else {
                // 主队列满了,切换到备用队列
                usePrimary.set(false);
                return secondaryQueue.offer(e);
            }
        } else {
            return secondaryQueue.offer(e);
        }
    }

    @Override
    public E poll() {
        E result = primaryQueue.poll();
        if (result == null) {
            result = secondaryQueue.poll();
        }
        return result;
    }

    @Override
    public E take() throws InterruptedException {
        E result = primaryQueue.poll();
        if (result == null) {
            result = secondaryQueue.take();
        }
        return result;
    }

    // 实现其他方法...
    @Override
    public int size() { return primaryQueue.size() + secondaryQueue.size(); }

    @Override
    public boolean isEmpty() { return primaryQueue.isEmpty() && secondaryQueue.isEmpty(); }

    // 其他方法实现...
}

2. 队列监控和调优:

public class QueueTuner {
    private final ThreadPoolExecutor executor;
    private final ScheduledExecutorService tuner;
    private final AtomicReference<BlockingQueue<Runnable>> currentQueue;

    public QueueTuner(ThreadPoolExecutor executor) {
        this.executor = executor;
        this.tuner = Executors.newScheduledThreadPool(1);
        this.currentQueue = new AtomicReference<>(executor.getQueue());

        // 每30秒调优一次
        tuner.scheduleAtFixedRate(this::tuneQueue, 0, 30, TimeUnit.SECONDS);
    }

    private void tuneQueue() {
        BlockingQueue<Runnable> queue = currentQueue.get();
        int queueSize = queue.size();
        int queueCapacity = queue.remainingCapacity() + queueSize;
        double queueUsage = (double) queueSize / queueCapacity;

        // 根据队列使用率调整线程池参数
        if (queueUsage > 0.8) {
            // 队列使用率过高,增加线程数
            int currentMax = executor.getMaximumPoolSize();
            if (currentMax < 50) { // 限制最大线程数
                executor.setMaximumPoolSize(currentMax + 2);
                System.out.println("增加最大线程数到: " + (currentMax + 2));
            }
        } else if (queueUsage < 0.3) {
            // 队列使用率过低,减少线程数
            int currentMax = executor.getMaximumPoolSize();
            int currentCore = executor.getCorePoolSize();
            if (currentMax > currentCore + 2) {
                executor.setMaximumPoolSize(currentMax - 1);
                System.out.println("减少最大线程数到: " + (currentMax - 1));
            }
        }

        // 根据系统负载调整队列类型
        adjustQueueType();
    }

    private void adjustQueueType() {
        double cpuUsage = getCpuUsage();
        double memoryUsage = getMemoryUsage();

        if (cpuUsage > 0.9 && memoryUsage < 0.5) {
            // CPU使用率很高但内存充足,可以考虑使用更大的队列
            if (currentQueue.get() instanceof ArrayBlockingQueue) {
                ArrayBlockingQueue<Runnable> current = (ArrayBlockingQueue<Runnable>) currentQueue.get();
                if (current.remainingCapacity() < 100) {
                    // 切换到更大的队列
                    BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(1000);
                    // 注意:ThreadPoolExecutor不支持动态更换队列
                    System.out.println("建议切换到更大的队列");
                }
            }
        }
    }

    private double getCpuUsage() {
        // 获取CPU使用率
        return 0.5; // 示例
    }

    private double getMemoryUsage() {
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        long maxMemory = runtime.maxMemory();
        return (double) usedMemory / maxMemory;
    }

    public void shutdown() {
        tuner.shutdown();
    }
}

线程池的创建方式

1. 使用 ThreadPoolExecutor(推荐)

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("BusinessThread-" + t.getId());
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 使用 Executors 工厂方法

// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 可缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 单线程执行器
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);

3. 使用 CompletableFuture(Java 8+)

// 异步执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 任务执行代码
    return "执行结果";
});

// 使用自定义线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 任务执行代码
    return "执行结果";
}, executor);

常用的线程池配置

1. 固定大小的线程池

ThreadPoolExecutor fixedPool = new ThreadPoolExecutor(
    5,                      // 核心线程数
    5,                      // 最大线程数(与核心线程数相同)
    0L,                     // 空闲线程存活时间
    TimeUnit.MILLISECONDS,  // 时间单位
    new LinkedBlockingQueue<>()  // 工作队列
);

适用场景:

  • 任务数量相对固定
  • 需要控制并发数量
  • 对响应时间要求不高

2. 可缓存的线程池

ThreadPoolExecutor cachedPool = new ThreadPoolExecutor(
    0,                      // 核心线程数
    Integer.MAX_VALUE,      // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new SynchronousQueue<>()  // 工作队列
);

适用场景:

  • 任务数量变化较大
  • 任务执行时间较短
  • 需要快速响应

3. 单线程执行器

ThreadPoolExecutor singlePool = new ThreadPoolExecutor(
    1,                      // 核心线程数
    1,                      // 最大线程数
    0L,                     // 空闲线程存活时间
    TimeUnit.MILLISECONDS,  // 时间单位
    new LinkedBlockingQueue<>()  // 工作队列
);

适用场景:

  • 需要保证任务顺序执行
  • 任务之间有依赖关系
  • 避免并发问题

4. 定时任务线程池

ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(
    5,                      // 核心线程数
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

// 使用示例
scheduledPool.schedule(() -> {
    // 延迟执行的任务
}, 5, TimeUnit.SECONDS);

scheduledPool.scheduleAtFixedRate(() -> {
    // 定期执行的任务
}, 0, 1, TimeUnit.SECONDS);

适用场景:

  • 定时任务
  • 周期性任务
  • 延迟执行任务

ThreadPoolExecutor 与 Executors 的比较

1. 使用方式对比

Executors 工厂方法:

// 简单但不够灵活
ExecutorService executor = Executors.newFixedThreadPool(5);

ThreadPoolExecutor:

// 更灵活,可以精确控制各个参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("CustomThread-" + t.getId());
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 优缺点分析

Executors 工厂方法:

优点:

  • 使用简单,代码简洁
  • 适合快速开发和小型应用
  • 不需要深入了解线程池参数

缺点:

  • 隐藏了线程池的具体实现细节
  • 使用无界队列可能导致 OOM
  • 无法自定义线程工厂和拒绝策略
  • 线程池参数不够灵活

ThreadPoolExecutor:

优点:

  • 完全控制线程池的行为
  • 可以自定义线程工厂
  • 可以自定义拒绝策略
  • 可以使用有界队列防止 OOM
  • 可以精确控制线程池参数

缺点:

  • 使用相对复杂
  • 需要了解更多的线程池知识
  • 代码量较大

3. 选择建议

  1. 使用 ThreadPoolExecutor 的场景:

    • 大型企业级应用
    • 对性能要求较高的系统
    • 需要精确控制线程池行为
    • 需要自定义线程工厂或拒绝策略
    • 需要防止 OOM 风险
    • 需要监控线程池状态
  2. 使用 Executors 的场景:

    • 小型应用或原型开发
    • 简单的并发任务处理
    • 对线程池要求不高的场景
    • 快速开发阶段
  3. 最佳实践建议:

    • 生产环境推荐使用 ThreadPoolExecutor
    • 明确指定线程池参数
    • 使用有界队列
    • 自定义线程工厂便于问题排查
    • 根据业务场景选择合适的拒绝策略
    • 合理设置核心线程数和最大线程数

线程池监控与调优

1. 监控指标

ThreadPoolExecutor executor = new ThreadPoolExecutor(...);

// 获取线程池状态
int corePoolSize = executor.getCorePoolSize();
int maximumPoolSize = executor.getMaximumPoolSize();
int activeCount = executor.getActiveCount();
long completedTaskCount = executor.getCompletedTaskCount();
long taskCount = executor.getTaskCount();
int queueSize = executor.getQueue().size();

2. 性能调优建议

// 推荐的线程池创建方式
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),     // 核心线程数
    Runtime.getRuntime().availableProcessors() * 2, // 最大线程数
    60L,                                           // 空闲线程存活时间
    TimeUnit.SECONDS,                              // 时间单位
    new ArrayBlockingQueue<>(1000),                // 有界队列
    new ThreadFactory() {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("BusinessThread-" + threadNumber.getAndIncrement());
            t.setDaemon(false);
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()      // 拒绝策略
);

3. 监控工具

  • JMX 监控:通过 JMX 查看线程池状态
  • 日志监控:记录线程池运行日志
  • 性能分析工具:JVisualVM、JMC、Arthas

实际应用场景

1. Web 服务器处理请求

@Component
public class RequestProcessor {
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10, 50, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("RequestProcessor-" + t.getId());
                return t;
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public void processRequest(HttpRequest request) {
        executor.execute(() -> {
            // 处理请求
            handleRequest(request);
        });
    }
}

2. 批量数据处理

@Service
public class BatchDataProcessor {
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5, 10, 60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(100),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("BatchProcessor-" + t.getId());
                return t;
            }
        },
        new ThreadPoolExecutor.DiscardOldestPolicy()
    );

    public void processBatch(List<Data> dataList) {
        for (Data data : dataList) {
            executor.execute(() -> {
                // 处理单个数据
                processData(data);
            });
        }
    }
}

3. 异步任务处理

@Service
public class AsyncTaskService {
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        3, 8, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(200),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("AsyncTask-" + t.getId());
                return t;
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public CompletableFuture<String> processAsync(Task task) {
        return CompletableFuture.supplyAsync(() -> {
            // 异步处理任务
            return processTask(task);
        }, executor);
    }
}

最佳实践建议

1. 线程池参数设置

// CPU 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = Runtime.getRuntime().availableProcessors();

// I/O 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 4;

2. 队列选择

  • ArrayBlockingQueue:有界队列,适合需要控制内存使用的场景
  • LinkedBlockingQueue:无界队列,适合任务数量不确定的场景
  • SynchronousQueue:不存储元素,适合任务处理速度快的场景
  • PriorityBlockingQueue:优先级队列,适合需要按优先级处理任务的场景

3. 线程工厂最佳实践

public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final ThreadGroup group;

    public CustomThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}

4. 优雅关闭线程池

public void shutdownGracefully(ThreadPoolExecutor executor) {
    executor.shutdown(); // 停止接受新任务
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow(); // 强制关闭
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("线程池未能正常关闭");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

常见问题与解决方案

1. 内存泄漏问题

问题:线程池中的线程无法正常回收,导致内存泄漏。

解决方案:

  • 使用有界队列
  • 合理设置线程存活时间
  • 及时关闭线程池

2. 任务堆积问题

问题:任务提交速度超过处理速度,导致队列中任务堆积。

解决方案:

  • 增加线程数量
  • 优化任务处理逻辑
  • 使用更合适的拒绝策略

3. 线程池关闭问题

问题:线程池关闭时,正在执行的任务被强制中断。

解决方案:

  • 使用优雅关闭方式
  • 设置合理的等待时间
  • 处理中断异常

参考资源

  1. Java Concurrency in Practice (Brian Goetz)
  2. Java 并发编程实战
  3. Oracle Java Documentation
  4. Java 线程池最佳实践

总结

线程池是 Java 并发编程中的重要工具,正确使用线程池可以显著提升程序性能。在实际应用中,需要根据具体业务场景选择合适的线程池配置,并注意监控和调优,以确保系统的稳定性和性能。


相关文档:

  • Java 线程基础 - 了解线程的基本概念和同步机制
最近更新:: 2025/10/11 11:41
Contributors: Duke
Prev
Java 线程基础详解
Next
Java ThreadLocal 详解