Java 线程池详解
目录
- 什么是线程池
- 为什么需要线程池
- 线程池的核心参数
- 线程池的工作原理
- ThreadPoolExecutor 详解
- 线程池的创建方式
- 常用的线程池配置
- 线程池的拒绝策略
- 线程池任务队列
- ThreadPoolExecutor 与 Executors 的比较
- 线程池监控与调优
- 实际应用场景
- 最佳实践建议
- 常见问题与解决方案
- 参考资源
什么是线程池
线程池是一种线程使用模式,它通过预先创建一定数量的线程,避免了线程创建和销毁的开销,提高了响应速度。线程池维护着一个工作队列,当有任务需要执行时,从队列中取出任务分配给空闲的线程执行。
为什么需要线程池
1. 性能优势
- 减少线程创建和销毁的开销:线程的创建和销毁是昂贵的操作
- 提高响应速度:任务到达时不需要等待线程创建
- 控制并发数量:避免创建过多线程导致系统资源耗尽
2. 资源管理
- 统一管理线程:集中管理线程的生命周期
- 合理利用系统资源:根据系统能力调整线程数量
- 提供监控能力:可以监控线程池的运行状态
3. 功能增强
- 任务队列管理:提供多种队列类型
- 拒绝策略:当线程池满时的处理策略
- 定时任务支持:支持延迟和周期性任务
线程池的核心参数
1. 核心线程数(corePoolSize)
- 线程池维护的最小线程数
- 即使线程空闲,也不会被回收
- 当任务数量超过核心线程数时,会创建新线程
2. 最大线程数(maximumPoolSize)
- 线程池允许创建的最大线程数
- 当任务队列满时,会创建新线程直到达到最大线程数
- 超过最大线程数的任务会被拒绝
3. 空闲线程存活时间(keepAliveTime)
- 超过核心线程数的线程,空闲超过该时间将被回收
- 只对超过核心线程数的线程有效
- 核心线程默认不会被回收
4. 时间单位(unit)
- keepAliveTime 的时间单位
- 常用:TimeUnit.SECONDS、TimeUnit.MILLISECONDS
5. 工作队列(workQueue)
- 用于存储等待执行的任务
- 不同的队列类型有不同的特性
6. 线程工厂(threadFactory)
- 用于创建新线程
- 可以自定义线程名称、优先级等
7. 拒绝策略(handler)
- 当线程池和队列都满时的处理策略
- 有四种内置策略,也可以自定义
线程池的工作原理
ThreadPoolExecutor 详解
构造函数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
基本使用示例
// 创建 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
new ThreadFactory() { // 线程工厂
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("CustomThread-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
executor.execute(() -> {
// 任务执行代码
});
// 提交有返回值的任务
Future<String> future = executor.submit(() -> {
// 任务执行代码
return "执行结果";
});
// 关闭线程池
executor.shutdown();
线程池的拒绝策略
当线程池无法接受新任务时(线程池已满且队列已满),会触发拒绝策略。ThreadPoolExecutor 提供了四种内置的拒绝策略,每种策略都有其特定的使用场景。
1. AbortPolicy(默认策略)
策略描述:
- 直接抛出
RejectedExecutionException异常 - 这是 ThreadPoolExecutor 的默认拒绝策略
- 会中断调用线程的执行
代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy() // 默认策略
);
// 当线程池满时,会抛出异常
try {
executor.execute(() -> {
System.out.println("执行任务");
});
} catch (RejectedExecutionException e) {
System.err.println("任务被拒绝:" + e.getMessage());
}
适用场景:
- 需要明确知道任务被拒绝的情况
- 对任务执行有严格要求,不能容忍任务丢失
- 需要快速失败,避免任务堆积
- 调试和测试阶段
优缺点:
- 优点:能够及时发现线程池过载问题
- 缺点:可能导致调用线程异常中断
2. CallerRunsPolicy
策略描述:
- 由调用线程执行被拒绝的任务
- 调用线程会阻塞,直到任务执行完成
- 这是一种"回退"机制,能够自然地降低任务提交速度
代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 当线程池满时,任务会在调用线程中执行
executor.execute(() -> {
System.out.println("在线程池中执行:" + Thread.currentThread().getName());
});
// 如果线程池满,这个任务会在主线程中执行
executor.execute(() -> {
System.out.println("在调用线程中执行:" + Thread.currentThread().getName());
});
适用场景:
- 任务执行时间较短
- 调用线程可以承受执行任务的负载
- 需要保证所有任务都能执行
- 希望自然地控制任务提交速度
优缺点:
- 优点:不会丢失任务,能够自然控制提交速度
- 缺点:可能阻塞调用线程,影响主业务流程
3. DiscardPolicy
策略描述:
- 直接丢弃被拒绝的任务,不做任何处理
- 静默丢弃,不会抛出异常
- 任务会完全丢失,无法恢复
代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardPolicy()
);
// 当线程池满时,任务会被静默丢弃
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("执行任务 " + taskId);
});
}
// 部分任务可能被丢弃,不会输出
适用场景:
- 可以容忍任务丢失
- 任务不是关键业务逻辑
- 日志记录、统计等非关键任务
- 需要避免异常影响主流程
优缺点:
- 优点:不会影响主流程,性能开销小
- 缺点:任务丢失,可能导致数据不一致
4. DiscardOldestPolicy
策略描述:
- 丢弃队列中最老的任务
- 然后尝试重新提交当前任务
- 如果重新提交仍然失败,会继续执行拒绝策略
代码示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
// 提交多个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("执行任务 " + taskId);
});
}
// 较老的任务可能被丢弃,新任务会被执行
适用场景:
- 新任务比老任务更重要
- 可以容忍部分任务丢失
- 需要优先处理最新任务
- 实时性要求较高的场景
优缺点:
- 优点:优先处理新任务,适合实时场景
- 缺点:可能丢失重要任务,需要谨慎使用
5. 自定义拒绝策略
实现自定义拒绝策略:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private final AtomicInteger rejectedCount = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
int count = rejectedCount.incrementAndGet();
// 记录拒绝信息
logger.warn("任务被拒绝,拒绝次数:{},线程池状态:{}/{}",
count, executor.getActiveCount(), executor.getMaximumPoolSize());
// 尝试将任务保存到数据库或消息队列
try {
saveTaskToQueue(r);
} catch (Exception e) {
logger.error("保存任务失败", e);
// 如果保存失败,可以选择其他策略
fallbackStrategy(r, executor);
}
}
private void saveTaskToQueue(Runnable task) {
// 将任务保存到数据库或消息队列,稍后重试
System.out.println("任务已保存到队列,稍后重试");
}
private void fallbackStrategy(Runnable task, ThreadPoolExecutor executor) {
// 降级策略:由调用线程执行
if (!executor.isShutdown()) {
task.run();
}
}
}
使用自定义拒绝策略:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomRejectedExecutionHandler()
);
6. 高级拒绝策略实现
带重试机制的拒绝策略:
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
private final int maxRetries;
private final long retryDelay;
private final TimeUnit retryTimeUnit;
public RetryRejectedExecutionHandler(int maxRetries, long retryDelay, TimeUnit retryTimeUnit) {
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
this.retryTimeUnit = retryTimeUnit;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
return;
}
// 尝试重试提交
for (int i = 0; i < maxRetries; i++) {
try {
Thread.sleep(retryTimeUnit.toMillis(retryDelay));
if (executor.getQueue().offer(r)) {
return; // 成功提交
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// 重试失败,执行降级策略
System.err.println("任务重试失败,执行降级策略");
r.run();
}
}
带监控的拒绝策略:
public class MonitoringRejectedExecutionHandler implements RejectedExecutionHandler {
private final MeterRegistry meterRegistry;
private final Counter rejectedCounter;
private final Gauge queueSizeGauge;
public MonitoringRejectedExecutionHandler(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.rejectedCounter = Counter.builder("threadpool.rejected.tasks")
.description("Number of rejected tasks")
.register(meterRegistry);
this.queueSizeGauge = Gauge.builder("threadpool.queue.size")
.description("Current queue size")
.register(meterRegistry, executor -> executor.getQueue().size());
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录指标
rejectedCounter.increment();
// 记录详细信息
System.err.printf("任务被拒绝 - 活跃线程:%d,队列大小:%d,最大线程数:%d%n",
executor.getActiveCount(),
executor.getQueue().size(),
executor.getMaximumPoolSize());
// 执行降级策略
if (!executor.isShutdown()) {
r.run();
}
}
}
7. 拒绝策略选择指南
根据业务场景选择拒绝策略:
| 业务场景 | 推荐策略 | 原因 |
|---|---|---|
| 关键业务逻辑 | AbortPolicy | 需要明确知道任务被拒绝 |
| 日志记录 | DiscardPolicy | 可以容忍任务丢失 |
| 实时数据处理 | DiscardOldestPolicy | 优先处理新数据 |
| 批量处理 | CallerRunsPolicy | 保证任务不丢失 |
| 高并发场景 | 自定义策略 | 需要复杂的降级机制 |
性能对比:
// 性能测试代码
public class RejectionPolicyPerformanceTest {
public static void main(String[] args) {
testPolicy("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
testPolicy("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
testPolicy("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
testPolicy("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
}
private static void testPolicy(String name, RejectedExecutionHandler handler) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
handler
);
long startTime = System.currentTimeMillis();
int successCount = 0;
int rejectCount = 0;
for (int i = 0; i < 1000; i++) {
try {
executor.execute(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
successCount++;
} catch (RejectedExecutionException e) {
rejectCount++;
}
}
long endTime = System.currentTimeMillis();
System.out.printf("%s - 成功:%d,拒绝:%d,耗时:%dms%n",
name, successCount, rejectCount, endTime - startTime);
executor.shutdown();
}
}
8. 最佳实践建议
1. 根据业务特点选择策略:
// 关键业务 - 使用 AbortPolicy
ThreadPoolExecutor criticalExecutor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy()
);
// 日志处理 - 使用 DiscardPolicy
ThreadPoolExecutor logExecutor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
new ThreadPoolExecutor.DiscardPolicy()
);
2. 结合监控和告警:
public class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录拒绝事件
logRejectionEvent(executor);
// 发送告警
sendAlert(executor);
// 执行降级策略
executeFallback(r, executor);
}
}
3. 动态调整线程池参数:
public class AdaptiveThreadPool {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitor;
public AdaptiveThreadPool() {
this.executor = new ThreadPoolExecutor(...);
this.monitor = Executors.newScheduledThreadPool(1);
// 定期监控和调整
monitor.scheduleAtFixedRate(this::adjustPoolSize, 0, 30, TimeUnit.SECONDS);
}
private void adjustPoolSize() {
double queueUsage = (double) executor.getQueue().size() / executor.getQueue().remainingCapacity();
if (queueUsage > 0.8) {
// 增加线程数
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 2);
}
}
}
线程池任务队列
线程池的任务队列是存储等待执行任务的重要组件,它直接影响线程池的性能和行为。不同类型的队列具有不同的特性,选择合适的队列类型对线程池的性能至关重要。
1. 任务队列的作用
基本功能:
- 缓冲任务:当线程池中的线程都在忙碌时,新提交的任务会进入队列等待
- 控制并发:通过队列大小限制,可以控制系统的并发处理能力
- 内存管理:有界队列可以防止任务无限堆积导致内存溢出
- 优先级处理:支持按优先级处理任务
工作流程:
2. 队列类型概述
Java 线程池支持多种类型的队列,每种都有其特定的使用场景:
| 队列类型 | 特性 | 适用场景 | 优缺点 |
|---|---|---|---|
| ArrayBlockingQueue | 有界、FIFO | 需要控制内存使用 | 优点:内存可控;缺点:容量固定 |
| LinkedBlockingQueue | 无界/有界、FIFO | 任务数量不确定 | 优点:灵活;缺点:可能 OOM |
| SynchronousQueue | 不存储元素 | 任务处理速度快 | 优点:直接传递;缺点:无缓冲 |
| PriorityBlockingQueue | 优先级队列 | 需要按优先级处理 | 优点:支持优先级;缺点:性能较低 |
| DelayQueue | 延迟队列 | 定时任务 | 优点:支持延迟;缺点:复杂度高 |
3. ArrayBlockingQueue(数组阻塞队列)
特性:
- 基于数组实现的有界阻塞队列
- 容量固定,创建时指定
- 采用 FIFO(先进先出)原则
- 使用 ReentrantLock 保证线程安全
代码示例:
// 创建容量为100的数组阻塞队列
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
queue, // 使用数组阻塞队列
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 队列操作示例
try {
// 添加任务到队列(阻塞)
queue.put(() -> System.out.println("任务1"));
// 添加任务到队列(非阻塞,失败返回false)
boolean success = queue.offer(() -> System.out.println("任务2"));
// 从队列取出任务(阻塞)
Runnable task = queue.take();
task.run();
// 从队列取出任务(非阻塞,失败返回null)
Runnable task2 = queue.poll();
if (task2 != null) {
task2.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
适用场景:
- 需要严格控制内存使用
- 任务数量相对稳定
- 对任务执行顺序有要求
- 高并发场景下的任务缓冲
优缺点:
- 优点:内存使用可控,性能稳定,支持公平/非公平模式
- 缺点:容量固定,无法动态调整
4. LinkedBlockingQueue(链表阻塞队列)
特性:
- 基于链表实现的可选有界阻塞队列
- 默认无界(Integer.MAX_VALUE),也可指定容量
- 采用 FIFO 原则
- 使用两把锁(putLock 和 takeLock)提高并发性能
代码示例:
// 创建无界链表阻塞队列
LinkedBlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();
// 创建有界链表阻塞队列
LinkedBlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(1000);
// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
boundedQueue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 队列操作示例
try {
// 添加任务(阻塞)
queue.put(() -> System.out.println("任务1"));
// 添加任务(非阻塞)
boolean success = queue.offer(() -> System.out.println("任务2"));
// 批量添加
List<Runnable> tasks = Arrays.asList(
() -> System.out.println("任务3"),
() -> System.out.println("任务4")
);
queue.addAll(tasks);
// 取出任务
Runnable task = queue.take();
task.run();
// 批量取出
List<Runnable> drainedTasks = new ArrayList<>();
queue.drainTo(drainedTasks, 10); // 最多取出10个任务
for (Runnable t : drainedTasks) {
t.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
适用场景:
- 任务数量不确定
- 需要较大的缓冲容量
- 对内存使用要求不严格
- 需要批量操作任务
优缺点:
- 优点:容量灵活,支持批量操作,并发性能好
- 缺点:无界队列可能导致内存溢出
5. SynchronousQueue(同步队列)
特性:
- 不存储元素的阻塞队列
- 每个插入操作必须等待另一个线程的移除操作
- 支持公平和非公平模式
- 适合直接传递任务的场景
代码示例:
// 创建同步队列
SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
// 创建公平模式的同步队列
SynchronousQueue<Runnable> fairQueue = new SynchronousQueue<>(true);
// 在线程池中使用(通常用于可缓存线程池)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
queue, // 使用同步队列
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 队列操作示例
try {
// 添加任务(阻塞直到有线程取走)
queue.put(() -> System.out.println("任务1"));
// 添加任务(非阻塞)
boolean success = queue.offer(() -> System.out.println("任务2"));
// 取出任务(阻塞)
Runnable task = queue.take();
task.run();
// 取出任务(非阻塞)
Runnable task2 = queue.poll();
if (task2 != null) {
task2.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
适用场景:
- 任务处理速度很快
- 不需要任务缓冲
- 需要直接传递任务
- 可缓存线程池场景
优缺点:
- 优点:无内存开销,直接传递,性能高
- 缺点:无缓冲能力,任务可能被拒绝
6. PriorityBlockingQueue(优先级阻塞队列)
特性:
- 基于堆实现的优先级阻塞队列
- 支持无界容量
- 元素必须实现 Comparable 接口或提供 Comparator
- 优先级高的任务先执行
代码示例:
// 创建优先级队列
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
// 创建带比较器的优先级队列
PriorityBlockingQueue<Task> customQueue = new PriorityBlockingQueue<>(
11, // 初始容量
(t1, t2) -> Integer.compare(t1.getPriority(), t2.getPriority())
);
// 优先级任务类
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority;
private final String name;
public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("执行任务: " + name + ", 优先级: " + priority);
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority); // 优先级高的先执行
}
}
// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
queue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 提交不同优先级的任务
executor.execute(new PriorityTask(1, "低优先级任务"));
executor.execute(new PriorityTask(3, "高优先级任务"));
executor.execute(new PriorityTask(2, "中优先级任务"));
// 任务会按优先级执行:高 -> 中 -> 低
适用场景:
- 需要按优先级处理任务
- 任务重要性不同
- 需要动态调整任务执行顺序
- 实时系统或任务调度系统
优缺点:
- 优点:支持优先级,灵活的任务调度
- 缺点:性能相对较低,内存使用较高
7. DelayQueue(延迟队列)
特性:
- 基于 PriorityQueue 实现的延迟阻塞队列
- 元素必须实现 Delayed 接口
- 只有延迟时间到了才能取出元素
- 支持无界容量
代码示例:
// 创建延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 延迟任务类
class DelayedTask implements Runnable, Delayed {
private final long executeTime;
private final String name;
public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}
@Override
public void run() {
System.out.println("执行延迟任务: " + name +
", 延迟时间: " + (executeTime - System.currentTimeMillis()) + "ms");
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
}
// 在线程池中使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
delayQueue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 提交延迟任务
executor.execute(new DelayedTask("任务1", 1000)); // 1秒后执行
executor.execute(new DelayedTask("任务2", 500)); // 0.5秒后执行
executor.execute(new DelayedTask("任务3", 2000)); // 2秒后执行
// 任务会按延迟时间顺序执行:任务2 -> 任务1 -> 任务3
适用场景:
- 定时任务执行
- 延迟任务处理
- 任务调度系统
- 缓存过期处理
优缺点:
- 优点:支持延迟执行,精确的时间控制
- 缺点:实现复杂,内存使用较高
8. 队列选择指南
根据业务场景选择队列:
| 业务场景 | 推荐队列 | 原因 |
|---|---|---|
| 高并发 Web 服务 | ArrayBlockingQueue | 内存可控,性能稳定 |
| 批量数据处理 | LinkedBlockingQueue | 容量大,支持批量操作 |
| 实时任务处理 | SynchronousQueue | 直接传递,响应快 |
| 任务调度系统 | PriorityBlockingQueue | 支持优先级调度 |
| 定时任务 | DelayQueue | 支持延迟执行 |
| 内存敏感场景 | ArrayBlockingQueue | 有界队列,防止 OOM |
| 任务数量不确定 | LinkedBlockingQueue | 无界队列,灵活适应 |
根据性能要求选择:
// 性能测试代码
public class QueuePerformanceTest {
public static void main(String[] args) {
testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(1000));
testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>(1000));
testQueue("SynchronousQueue", new SynchronousQueue<>());
testQueue("PriorityBlockingQueue", new PriorityBlockingQueue<>());
}
private static void testQueue(String name, BlockingQueue<Runnable> queue) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60L, TimeUnit.SECONDS, queue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
long startTime = System.currentTimeMillis();
int taskCount = 10000;
// 提交任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.execute(() -> {
// 模拟任务执行
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 等待所有任务完成
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long endTime = System.currentTimeMillis();
System.out.printf("%s - 处理%d个任务,耗时:%dms%n",
name, taskCount, endTime - startTime);
}
}
9. 队列性能对比
性能特征对比:
| 队列类型 | 吞吐量 | 内存使用 | 延迟 | 并发性能 | 适用场景 |
|---|---|---|---|---|---|
| ArrayBlockingQueue | 高 | 低 | 低 | 中等 | 高并发、内存敏感 |
| LinkedBlockingQueue | 高 | 中等 | 低 | 高 | 通用场景 |
| SynchronousQueue | 最高 | 最低 | 最低 | 最高 | 实时处理 |
| PriorityBlockingQueue | 低 | 高 | 高 | 低 | 优先级调度 |
| DelayQueue | 低 | 高 | 高 | 低 | 定时任务 |
内存使用对比:
// 内存使用测试
public class QueueMemoryTest {
public static void main(String[] args) {
testMemoryUsage("ArrayBlockingQueue", new ArrayBlockingQueue<>(1000));
testMemoryUsage("LinkedBlockingQueue", new LinkedBlockingQueue<>(1000));
testMemoryUsage("SynchronousQueue", new SynchronousQueue<>());
testMemoryUsage("PriorityBlockingQueue", new PriorityBlockingQueue<>());
}
private static void testMemoryUsage(String name, BlockingQueue<Runnable> queue) {
// 添加1000个任务
for (int i = 0; i < 1000; i++) {
queue.offer(() -> System.out.println("Task " + i));
}
// 获取内存使用情况
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
System.out.printf("%s - 内存使用:%d KB%n", name, usedMemory / 1024);
}
}
10. 队列配置最佳实践
1. 根据 CPU 核心数配置队列大小:
public class QueueSizeCalculator {
public static int calculateQueueSize(int corePoolSize, int maxPoolSize) {
int cpuCount = Runtime.getRuntime().availableProcessors();
// CPU密集型任务:队列大小为CPU核心数的2-4倍
// I/O密集型任务:队列大小为CPU核心数的10-20倍
if (isCpuIntensive()) {
return cpuCount * 2;
} else {
return cpuCount * 10;
}
}
private static boolean isCpuIntensive() {
// 根据业务特点判断是否为CPU密集型
return true; // 示例
}
}
2. 动态调整队列大小:
public class DynamicQueueSize {
private final BlockingQueue<Runnable> queue;
private final AtomicInteger currentSize;
private final int maxSize;
public DynamicQueueSize(int initialSize, int maxSize) {
this.queue = new LinkedBlockingQueue<>(initialSize);
this.currentSize = new AtomicInteger(initialSize);
this.maxSize = maxSize;
}
public void adjustQueueSize() {
// 根据系统负载动态调整队列大小
double cpuUsage = getCpuUsage();
double memoryUsage = getMemoryUsage();
if (cpuUsage > 0.8 && memoryUsage < 0.7) {
// CPU使用率高但内存充足,增加队列大小
int newSize = Math.min(currentSize.get() + 100, maxSize);
currentSize.set(newSize);
} else if (cpuUsage < 0.5 && memoryUsage > 0.8) {
// CPU使用率低但内存紧张,减少队列大小
int newSize = Math.max(currentSize.get() - 50, 100);
currentSize.set(newSize);
}
}
private double getCpuUsage() {
// 获取CPU使用率
return 0.5; // 示例
}
private double getMemoryUsage() {
// 获取内存使用率
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
return (double) usedMemory / maxMemory;
}
}
3. 队列监控和告警:
public class QueueMonitor {
private final BlockingQueue<Runnable> queue;
private final ScheduledExecutorService monitor;
private final AtomicLong totalTasks = new AtomicLong(0);
private final AtomicLong rejectedTasks = new AtomicLong(0);
public QueueMonitor(BlockingQueue<Runnable> queue) {
this.queue = queue;
this.monitor = Executors.newScheduledThreadPool(1);
// 每5秒监控一次
monitor.scheduleAtFixedRate(this::monitorQueue, 0, 5, TimeUnit.SECONDS);
}
private void monitorQueue() {
int queueSize = queue.size();
int queueCapacity = queue.remainingCapacity() + queueSize;
double queueUsage = (double) queueSize / queueCapacity;
System.out.printf("队列监控 - 当前大小:%d,容量:%d,使用率:%.2f%%%n",
queueSize, queueCapacity, queueUsage * 100);
// 队列使用率过高时告警
if (queueUsage > 0.8) {
System.err.println("警告:队列使用率过高!");
// 发送告警通知
sendAlert("队列使用率过高", queueUsage);
}
// 队列为空时记录
if (queueSize == 0) {
System.out.println("信息:队列为空,所有任务已处理完成");
}
}
private void sendAlert(String message, double value) {
// 发送告警通知(邮件、短信、钉钉等)
System.err.printf("告警:%s,当前值:%.2f%n", message, value);
}
public void shutdown() {
monitor.shutdown();
}
}
11. 自定义队列实现
1. 带统计功能的队列:
public class StatisticsBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> delegate;
private final AtomicLong totalOffers = new AtomicLong(0);
private final AtomicLong totalPolls = new AtomicLong(0);
private final AtomicLong rejectedOffers = new AtomicLong(0);
private final AtomicLong emptyPolls = new AtomicLong(0);
public StatisticsBlockingQueue(BlockingQueue<E> delegate) {
this.delegate = delegate;
}
@Override
public boolean offer(E e) {
totalOffers.incrementAndGet();
boolean result = delegate.offer(e);
if (!result) {
rejectedOffers.incrementAndGet();
}
return result;
}
@Override
public E poll() {
E result = delegate.poll();
totalPolls.incrementAndGet();
if (result == null) {
emptyPolls.incrementAndGet();
}
return result;
}
@Override
public E take() throws InterruptedException {
E result = delegate.take();
totalPolls.incrementAndGet();
return result;
}
// 获取统计信息
public QueueStatistics getStatistics() {
return new QueueStatistics(
totalOffers.get(),
totalPolls.get(),
rejectedOffers.get(),
emptyPolls.get(),
delegate.size()
);
}
// 统计信息类
public static class QueueStatistics {
private final long totalOffers;
private final long totalPolls;
private final long rejectedOffers;
private final long emptyPolls;
private final int currentSize;
public QueueStatistics(long totalOffers, long totalPolls, long rejectedOffers,
long emptyPolls, int currentSize) {
this.totalOffers = totalOffers;
this.totalPolls = totalPolls;
this.rejectedOffers = rejectedOffers;
this.emptyPolls = emptyPolls;
this.currentSize = currentSize;
}
public double getRejectionRate() {
return totalOffers > 0 ? (double) rejectedOffers / totalOffers : 0.0;
}
public double getEmptyPollRate() {
return totalPolls > 0 ? (double) emptyPolls / totalPolls : 0.0;
}
// getter方法...
}
// 实现其他BlockingQueue方法...
@Override
public int size() { return delegate.size(); }
@Override
public boolean isEmpty() { return delegate.isEmpty(); }
@Override
public boolean contains(Object o) { return delegate.contains(o); }
@Override
public Iterator<E> iterator() { return delegate.iterator(); }
@Override
public Object[] toArray() { return delegate.toArray(); }
@Override
public <T> T[] toArray(T[] a) { return delegate.toArray(a); }
@Override
public boolean remove(Object o) { return delegate.remove(o); }
@Override
public boolean containsAll(Collection<?> c) { return delegate.containsAll(c); }
@Override
public boolean addAll(Collection<? extends E> c) { return delegate.addAll(c); }
@Override
public boolean removeAll(Collection<?> c) { return delegate.removeAll(c); }
@Override
public boolean retainAll(Collection<?> c) { return delegate.retainAll(c); }
@Override
public void clear() { delegate.clear(); }
@Override
public boolean add(E e) { return delegate.add(e); }
@Override
public E remove() { return delegate.remove(); }
@Override
public E element() { return delegate.element(); }
@Override
public E peek() { return delegate.peek(); }
@Override
public void put(E e) throws InterruptedException { delegate.put(e); }
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.offer(e, timeout, unit);
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.poll(timeout, unit);
}
@Override
public int remainingCapacity() { return delegate.remainingCapacity(); }
@Override
public boolean remove(Object o) { return delegate.remove(o); }
@Override
public int drainTo(Collection<? super E> c) { return delegate.drainTo(c); }
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return delegate.drainTo(c, maxElements);
}
}
2. 带优先级的任务队列:
public class PriorityTaskQueue implements BlockingQueue<Runnable> {
private final PriorityBlockingQueue<PriorityTask> queue;
private final AtomicInteger sequence = new AtomicInteger(0);
public PriorityTaskQueue() {
this.queue = new PriorityBlockingQueue<>();
}
public PriorityTaskQueue(int initialCapacity) {
this.queue = new PriorityBlockingQueue<>(initialCapacity);
}
@Override
public boolean offer(Runnable task) {
if (task instanceof PriorityTask) {
return queue.offer((PriorityTask) task);
} else {
// 将普通任务包装为优先级任务
PriorityTask priorityTask = new PriorityTask(task, 0, sequence.getAndIncrement());
return queue.offer(priorityTask);
}
}
@Override
public Runnable poll() {
return queue.poll();
}
@Override
public Runnable take() throws InterruptedException {
return queue.take();
}
// 提交带优先级的任务
public boolean offer(Runnable task, int priority) {
PriorityTask priorityTask = new PriorityTask(task, priority, sequence.getAndIncrement());
return queue.offer(priorityTask);
}
// 优先级任务类
private static class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final Runnable task;
private final int priority;
private final int sequence;
public PriorityTask(Runnable task, int priority, int sequence) {
this.task = task;
this.priority = priority;
this.sequence = sequence;
}
@Override
public void run() {
task.run();
}
@Override
public int compareTo(PriorityTask other) {
// 优先级高的先执行,优先级相同时按提交顺序
int priorityCompare = Integer.compare(other.priority, this.priority);
return priorityCompare != 0 ? priorityCompare : Integer.compare(this.sequence, other.sequence);
}
}
// 实现其他方法...
@Override
public int size() { return queue.size(); }
@Override
public boolean isEmpty() { return queue.isEmpty(); }
@Override
public boolean contains(Object o) { return queue.contains(o); }
@Override
public Iterator<Runnable> iterator() { return queue.iterator(); }
@Override
public Object[] toArray() { return queue.toArray(); }
@Override
public <T> T[] toArray(T[] a) { return queue.toArray(a); }
@Override
public boolean remove(Object o) { return queue.remove(o); }
@Override
public boolean containsAll(Collection<?> c) { return queue.containsAll(c); }
@Override
public boolean addAll(Collection<? extends Runnable> c) { return queue.addAll(c); }
@Override
public boolean removeAll(Collection<?> c) { return queue.removeAll(c); }
@Override
public boolean retainAll(Collection<?> c) { return queue.retainAll(c); }
@Override
public void clear() { queue.clear(); }
@Override
public boolean add(Runnable e) { return queue.add(e); }
@Override
public Runnable remove() { return queue.remove(); }
@Override
public Runnable element() { return queue.element(); }
@Override
public Runnable peek() { return queue.peek(); }
@Override
public void put(Runnable e) throws InterruptedException { queue.put(e); }
@Override
public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(e, timeout, unit);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
@Override
public int remainingCapacity() { return queue.remainingCapacity(); }
@Override
public int drainTo(Collection<? super Runnable> c) { return queue.drainTo(c); }
@Override
public int drainTo(Collection<? super Runnable> c, int maxElements) {
return queue.drainTo(c, maxElements);
}
}
3. 带限流的队列:
public class RateLimitedQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> delegate;
private final RateLimiter rateLimiter;
private final AtomicLong rejectedCount = new AtomicLong(0);
public RateLimitedQueue(BlockingQueue<E> delegate, double permitsPerSecond) {
this.delegate = delegate;
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}
@Override
public boolean offer(E e) {
if (rateLimiter.tryAcquire()) {
return delegate.offer(e);
} else {
rejectedCount.incrementAndGet();
return false;
}
}
@Override
public void put(E e) throws InterruptedException {
rateLimiter.acquire();
delegate.put(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (rateLimiter.tryAcquire(timeout, unit)) {
return delegate.offer(e, timeout, unit);
} else {
rejectedCount.incrementAndGet();
return false;
}
}
public long getRejectedCount() {
return rejectedCount.get();
}
public double getCurrentRate() {
return rateLimiter.getRate();
}
public void setRate(double permitsPerSecond) {
rateLimiter.setRate(permitsPerSecond);
}
// 实现其他方法...
@Override
public E poll() { return delegate.poll(); }
@Override
public E take() throws InterruptedException { return delegate.take(); }
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.poll(timeout, unit);
}
@Override
public int size() { return delegate.size(); }
@Override
public boolean isEmpty() { return delegate.isEmpty(); }
@Override
public boolean contains(Object o) { return delegate.contains(o); }
@Override
public Iterator<E> iterator() { return delegate.iterator(); }
@Override
public Object[] toArray() { return delegate.toArray(); }
@Override
public <T> T[] toArray(T[] a) { return delegate.toArray(a); }
@Override
public boolean remove(Object o) { return delegate.remove(o); }
@Override
public boolean containsAll(Collection<?> c) { return delegate.containsAll(c); }
@Override
public boolean addAll(Collection<? extends E> c) { return delegate.addAll(c); }
@Override
public boolean removeAll(Collection<?> c) { return delegate.removeAll(c); }
@Override
public boolean retainAll(Collection<?> c) { return delegate.retainAll(c); }
@Override
public void clear() { delegate.clear(); }
@Override
public boolean add(E e) { return delegate.add(e); }
@Override
public E remove() { return delegate.remove(); }
@Override
public E element() { return delegate.element(); }
@Override
public E peek() { return delegate.peek(); }
@Override
public int remainingCapacity() { return delegate.remainingCapacity(); }
@Override
public int drainTo(Collection<? super E> c) { return delegate.drainTo(c); }
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return delegate.drainTo(c, maxElements);
}
}
4. 使用自定义队列的线程池:
public class CustomThreadPoolExample {
public static void main(String[] args) {
// 使用带统计功能的队列
BlockingQueue<Runnable> statisticsQueue = new StatisticsBlockingQueue<>(
new ArrayBlockingQueue<>(100)
);
// 使用带优先级的队列
PriorityTaskQueue priorityQueue = new PriorityTaskQueue(100);
// 使用带限流的队列
RateLimitedQueue<Runnable> rateLimitedQueue = new RateLimitedQueue<>(
new LinkedBlockingQueue<>(1000), 10.0 // 每秒最多10个任务
);
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
statisticsQueue, // 使用自定义队列
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 提交任务
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("执行任务: " + taskId);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 获取统计信息
if (statisticsQueue instanceof StatisticsBlockingQueue) {
StatisticsBlockingQueue.QueueStatistics stats =
((StatisticsBlockingQueue<Runnable>) statisticsQueue).getStatistics();
System.out.println("队列统计信息: " + stats);
}
executor.shutdown();
}
}
12. 高级队列用法
1. 队列组合使用:
public class CompositeQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> primaryQueue;
private final BlockingQueue<E> secondaryQueue;
private final AtomicBoolean usePrimary = new AtomicBoolean(true);
public CompositeQueue(BlockingQueue<E> primaryQueue, BlockingQueue<E> secondaryQueue) {
this.primaryQueue = primaryQueue;
this.secondaryQueue = secondaryQueue;
}
@Override
public boolean offer(E e) {
if (usePrimary.get()) {
if (primaryQueue.offer(e)) {
return true;
} else {
// 主队列满了,切换到备用队列
usePrimary.set(false);
return secondaryQueue.offer(e);
}
} else {
return secondaryQueue.offer(e);
}
}
@Override
public E poll() {
E result = primaryQueue.poll();
if (result == null) {
result = secondaryQueue.poll();
}
return result;
}
@Override
public E take() throws InterruptedException {
E result = primaryQueue.poll();
if (result == null) {
result = secondaryQueue.take();
}
return result;
}
// 实现其他方法...
@Override
public int size() { return primaryQueue.size() + secondaryQueue.size(); }
@Override
public boolean isEmpty() { return primaryQueue.isEmpty() && secondaryQueue.isEmpty(); }
// 其他方法实现...
}
2. 队列监控和调优:
public class QueueTuner {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService tuner;
private final AtomicReference<BlockingQueue<Runnable>> currentQueue;
public QueueTuner(ThreadPoolExecutor executor) {
this.executor = executor;
this.tuner = Executors.newScheduledThreadPool(1);
this.currentQueue = new AtomicReference<>(executor.getQueue());
// 每30秒调优一次
tuner.scheduleAtFixedRate(this::tuneQueue, 0, 30, TimeUnit.SECONDS);
}
private void tuneQueue() {
BlockingQueue<Runnable> queue = currentQueue.get();
int queueSize = queue.size();
int queueCapacity = queue.remainingCapacity() + queueSize;
double queueUsage = (double) queueSize / queueCapacity;
// 根据队列使用率调整线程池参数
if (queueUsage > 0.8) {
// 队列使用率过高,增加线程数
int currentMax = executor.getMaximumPoolSize();
if (currentMax < 50) { // 限制最大线程数
executor.setMaximumPoolSize(currentMax + 2);
System.out.println("增加最大线程数到: " + (currentMax + 2));
}
} else if (queueUsage < 0.3) {
// 队列使用率过低,减少线程数
int currentMax = executor.getMaximumPoolSize();
int currentCore = executor.getCorePoolSize();
if (currentMax > currentCore + 2) {
executor.setMaximumPoolSize(currentMax - 1);
System.out.println("减少最大线程数到: " + (currentMax - 1));
}
}
// 根据系统负载调整队列类型
adjustQueueType();
}
private void adjustQueueType() {
double cpuUsage = getCpuUsage();
double memoryUsage = getMemoryUsage();
if (cpuUsage > 0.9 && memoryUsage < 0.5) {
// CPU使用率很高但内存充足,可以考虑使用更大的队列
if (currentQueue.get() instanceof ArrayBlockingQueue) {
ArrayBlockingQueue<Runnable> current = (ArrayBlockingQueue<Runnable>) currentQueue.get();
if (current.remainingCapacity() < 100) {
// 切换到更大的队列
BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(1000);
// 注意:ThreadPoolExecutor不支持动态更换队列
System.out.println("建议切换到更大的队列");
}
}
}
}
private double getCpuUsage() {
// 获取CPU使用率
return 0.5; // 示例
}
private double getMemoryUsage() {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
return (double) usedMemory / maxMemory;
}
public void shutdown() {
tuner.shutdown();
}
}
线程池的创建方式
1. 使用 ThreadPoolExecutor(推荐)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("BusinessThread-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
2. 使用 Executors 工厂方法
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 可缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 单线程执行器
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
3. 使用 CompletableFuture(Java 8+)
// 异步执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 任务执行代码
return "执行结果";
});
// 使用自定义线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 任务执行代码
return "执行结果";
}, executor);
常用的线程池配置
1. 固定大小的线程池
ThreadPoolExecutor fixedPool = new ThreadPoolExecutor(
5, // 核心线程数
5, // 最大线程数(与核心线程数相同)
0L, // 空闲线程存活时间
TimeUnit.MILLISECONDS, // 时间单位
new LinkedBlockingQueue<>() // 工作队列
);
适用场景:
- 任务数量相对固定
- 需要控制并发数量
- 对响应时间要求不高
2. 可缓存的线程池
ThreadPoolExecutor cachedPool = new ThreadPoolExecutor(
0, // 核心线程数
Integer.MAX_VALUE, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new SynchronousQueue<>() // 工作队列
);
适用场景:
- 任务数量变化较大
- 任务执行时间较短
- 需要快速响应
3. 单线程执行器
ThreadPoolExecutor singlePool = new ThreadPoolExecutor(
1, // 核心线程数
1, // 最大线程数
0L, // 空闲线程存活时间
TimeUnit.MILLISECONDS, // 时间单位
new LinkedBlockingQueue<>() // 工作队列
);
适用场景:
- 需要保证任务顺序执行
- 任务之间有依赖关系
- 避免并发问题
4. 定时任务线程池
ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(
5, // 核心线程数
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 使用示例
scheduledPool.schedule(() -> {
// 延迟执行的任务
}, 5, TimeUnit.SECONDS);
scheduledPool.scheduleAtFixedRate(() -> {
// 定期执行的任务
}, 0, 1, TimeUnit.SECONDS);
适用场景:
- 定时任务
- 周期性任务
- 延迟执行任务
ThreadPoolExecutor 与 Executors 的比较
1. 使用方式对比
Executors 工厂方法:
// 简单但不够灵活
ExecutorService executor = Executors.newFixedThreadPool(5);
ThreadPoolExecutor:
// 更灵活,可以精确控制各个参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("CustomThread-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
2. 优缺点分析
Executors 工厂方法:
优点:
- 使用简单,代码简洁
- 适合快速开发和小型应用
- 不需要深入了解线程池参数
缺点:
- 隐藏了线程池的具体实现细节
- 使用无界队列可能导致 OOM
- 无法自定义线程工厂和拒绝策略
- 线程池参数不够灵活
ThreadPoolExecutor:
优点:
- 完全控制线程池的行为
- 可以自定义线程工厂
- 可以自定义拒绝策略
- 可以使用有界队列防止 OOM
- 可以精确控制线程池参数
缺点:
- 使用相对复杂
- 需要了解更多的线程池知识
- 代码量较大
3. 选择建议
使用 ThreadPoolExecutor 的场景:
- 大型企业级应用
- 对性能要求较高的系统
- 需要精确控制线程池行为
- 需要自定义线程工厂或拒绝策略
- 需要防止 OOM 风险
- 需要监控线程池状态
使用 Executors 的场景:
- 小型应用或原型开发
- 简单的并发任务处理
- 对线程池要求不高的场景
- 快速开发阶段
最佳实践建议:
- 生产环境推荐使用 ThreadPoolExecutor
- 明确指定线程池参数
- 使用有界队列
- 自定义线程工厂便于问题排查
- 根据业务场景选择合适的拒绝策略
- 合理设置核心线程数和最大线程数
线程池监控与调优
1. 监控指标
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
// 获取线程池状态
int corePoolSize = executor.getCorePoolSize();
int maximumPoolSize = executor.getMaximumPoolSize();
int activeCount = executor.getActiveCount();
long completedTaskCount = executor.getCompletedTaskCount();
long taskCount = executor.getTaskCount();
int queueSize = executor.getQueue().size();
2. 性能调优建议
// 推荐的线程池创建方式
ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心线程数
Runtime.getRuntime().availableProcessors() * 2, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("BusinessThread-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
3. 监控工具
- JMX 监控:通过 JMX 查看线程池状态
- 日志监控:记录线程池运行日志
- 性能分析工具:JVisualVM、JMC、Arthas
实际应用场景
1. Web 服务器处理请求
@Component
public class RequestProcessor {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("RequestProcessor-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public void processRequest(HttpRequest request) {
executor.execute(() -> {
// 处理请求
handleRequest(request);
});
}
}
2. 批量数据处理
@Service
public class BatchDataProcessor {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("BatchProcessor-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
public void processBatch(List<Data> dataList) {
for (Data data : dataList) {
executor.execute(() -> {
// 处理单个数据
processData(data);
});
}
}
}
3. 异步任务处理
@Service
public class AsyncTaskService {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
3, 8, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("AsyncTask-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public CompletableFuture<String> processAsync(Task task) {
return CompletableFuture.supplyAsync(() -> {
// 异步处理任务
return processTask(task);
}, executor);
}
}
最佳实践建议
1. 线程池参数设置
// CPU 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = Runtime.getRuntime().availableProcessors();
// I/O 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 4;
2. 队列选择
- ArrayBlockingQueue:有界队列,适合需要控制内存使用的场景
- LinkedBlockingQueue:无界队列,适合任务数量不确定的场景
- SynchronousQueue:不存储元素,适合任务处理速度快的场景
- PriorityBlockingQueue:优先级队列,适合需要按优先级处理任务的场景
3. 线程工厂最佳实践
public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final ThreadGroup group;
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
4. 优雅关闭线程池
public void shutdownGracefully(ThreadPoolExecutor executor) {
executor.shutdown(); // 停止接受新任务
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
常见问题与解决方案
1. 内存泄漏问题
问题:线程池中的线程无法正常回收,导致内存泄漏。
解决方案:
- 使用有界队列
- 合理设置线程存活时间
- 及时关闭线程池
2. 任务堆积问题
问题:任务提交速度超过处理速度,导致队列中任务堆积。
解决方案:
- 增加线程数量
- 优化任务处理逻辑
- 使用更合适的拒绝策略
3. 线程池关闭问题
问题:线程池关闭时,正在执行的任务被强制中断。
解决方案:
- 使用优雅关闭方式
- 设置合理的等待时间
- 处理中断异常
参考资源
- Java Concurrency in Practice (Brian Goetz)
- Java 并发编程实战
- Oracle Java Documentation
- Java 线程池最佳实践
总结
线程池是 Java 并发编程中的重要工具,正确使用线程池可以显著提升程序性能。在实际应用中,需要根据具体业务场景选择合适的线程池配置,并注意监控和调优,以确保系统的稳定性和性能。
相关文档:
- Java 线程基础 - 了解线程的基本概念和同步机制
