RocketMQ 消息队列
目录
概述
Apache RocketMQ 是一个分布式消息中间件,具有低延迟、高并发、高可用、高可靠的特点。它广泛应用于电商、金融、物流等对消息可靠性要求较高的场景。
主要特性
- 高并发: 支持单机百万级消息处理
- 低延迟: 毫秒级消息投递
- 高可用: 支持集群部署,自动故障转移
- 消息可靠性: 支持消息持久化,确保消息不丢失
- 顺序消息: 支持全局顺序和分区顺序
- 事务消息: 支持分布式事务
- 消息过滤: 支持 SQL92 语法和 Tag 过滤
- 消息回溯: 支持按时间回溯消息
核心概念
1. 消息模型
RocketMQ 采用发布-订阅模式的消息模型,整个系统由四个核心组件构成,各司其职,协同工作:
Producer(生产者): 消息的发送方,负责将业务消息发送到指定的 Topic 中。生产者可以是一个应用程序、服务或系统,它们通过 RocketMQ 客户端 API 将消息推送到 Broker 集群。
Consumer(消费者): 消息的接收方,负责从指定的 Topic 中拉取并处理消息。消费者通过订阅 Topic 来接收消息,支持集群消费和广播消费两种模式。
Broker(消息代理): 消息存储和转发的核心组件,负责接收生产者发送的消息、持久化存储消息、以及向消费者推送消息。Broker 集群是 RocketMQ 的核心,承担着消息的存储、路由和负载均衡功能。
NameServer(命名服务器): 轻量级的注册中心,负责管理整个集群的路由信息、服务发现和负载均衡。NameServer 不存储消息数据,只维护 Broker 的地址信息和 Topic 的路由表,为生产者和消费者提供路由服务。
2. 消息结构
RocketMQ 中的消息(Message)是一个完整的数据单元,包含多个组成部分,每个部分都有其特定的作用:
消息组成部分详解:
Topic(主题): 消息的分类标识,类似于数据库中的表名。生产者将消息发送到特定的 Topic,消费者订阅感兴趣的 Topic 来接收消息。一个 Topic 可以包含多个 Queue,实现消息的分布式存储和并行处理。
Tag(标签): 消息的二级分类,用于对同一 Topic 下的消息进行更细粒度的分类。通过 Tag 可以实现消息的过滤,消费者可以只接收特定 Tag 的消息,提高消息处理的精确性。
Key(键): 消息的唯一标识符,通常用于消息去重、消息查询和消息追踪。Key 可以是业务相关的唯一 ID,如订单号、用户 ID 等。
Body(消息体): 消息的实际内容,是业务数据的载体。Body 是字节数组格式,可以存储任意格式的数据,如 JSON、XML、二进制数据等。
Properties(属性): 消息的扩展属性,以键值对的形式存储。Properties 可以用于存储消息的元数据信息,如消息来源、处理优先级、过期时间等。
3. 队列模型
RocketMQ 采用队列模型来实现消息的分布式存储和并行处理。这种设计使得系统能够支持高并发和大规模的消息处理:
队列模型的工作原理:
Topic 与 Queue 的关系: 每个 Topic 包含多个 Queue(队列),Queue 是消息存储和分发的基本单位。消息在 Topic 的各个 Queue 之间进行负载均衡分布,实现消息的并行处理。
消息分布策略: 生产者发送消息时,RocketMQ 会根据负载均衡算法将消息分布到不同的 Queue 中。这种设计避免了单点瓶颈,提高了系统的吞吐量和并发处理能力。
消费者负载均衡: 消费者组中的多个消费者实例会平均分配 Topic 下的 Queue,每个消费者负责处理分配给它的 Queue 中的消息。当消费者数量变化时,系统会自动重新分配 Queue,实现动态负载均衡。
顺序保证: 在同一个 Queue 内,消息的消费顺序与发送顺序一致。如果需要全局顺序,可以将所有消息发送到同一个 Queue;如果需要分区顺序,可以根据业务键(如订单 ID)将相关消息发送到同一个 Queue。
架构设计
整体架构
RocketMQ 采用分布式架构设计,通过多个组件的协同工作来实现高可用、高性能的消息处理能力。整个系统采用主从复制模式,确保数据的高可靠性:
架构设计特点:
集群化部署: 所有组件都支持集群部署,NameServer 集群提供高可用的路由服务,Broker 集群提供高可用的消息存储服务。
主从复制: Broker 采用 Master-Slave 架构,Master 负责读写操作,Slave 负责数据备份。当 Master 故障时,Slave 可以自动切换为 Master,保证服务不中断。
无状态设计: NameServer 采用无状态设计,各个节点之间相互独立,任何一个节点故障都不会影响整体服务。
负载均衡: 生产者和消费者通过 NameServer 获取路由信息,实现自动的负载均衡和故障转移。
组件职责
RocketMQ 的各个组件都有明确的职责分工,通过协同工作来实现完整的消息处理流程:
| 组件 | 职责 |
|---|---|
| NameServer | 路由信息管理、服务发现、负载均衡 |
| Broker | 消息存储、消息转发、消息过滤 |
| Producer | 消息发送、负载均衡、故障转移 |
| Consumer | 消息消费、负载均衡、消息确认 |
详细职责说明:
NameServer 职责:
- 路由信息管理: 维护 Topic 与 Broker 的映射关系,记录每个 Topic 分布在哪些 Broker 上
- 服务发现: 为生产者和消费者提供 Broker 的地址信息,实现动态的服务发现
- 负载均衡: 根据 Broker 的负载情况,为生产者和消费者提供最优的路由选择
- 健康检查: 定期检查 Broker 的健康状态,及时剔除故障节点
Broker 职责:
- 消息存储: 将接收到的消息持久化存储到磁盘,支持消息的可靠存储
- 消息转发: 根据消费者的订阅信息,将消息推送给相应的消费者
- 消息过滤: 支持 Tag 过滤和 SQL 过滤,减少不必要的网络传输
- 事务处理: 支持事务消息的处理,确保分布式事务的一致性
Producer 职责:
- 消息发送: 将业务消息发送到指定的 Topic
- 负载均衡: 根据 Broker 的负载情况选择最优的 Broker 进行消息发送
- 故障转移: 当某个 Broker 不可用时,自动切换到其他可用的 Broker
- 消息确认: 支持同步、异步和单向三种发送模式,满足不同的业务需求
Consumer 职责:
- 消息消费: 从订阅的 Topic 中拉取消息并进行业务处理
- 负载均衡: 在消费者组内进行负载均衡,确保消息的均匀分配
- 消息确认: 处理完消息后向 Broker 确认,支持消费进度的管理
- 故障恢复: 当消费者重启时,能够从上次消费的位置继续消费
消息存储架构
RocketMQ 采用高效的消息存储架构,通过三种不同类型的文件来实现消息的存储、索引和查询功能:
存储架构详解:
CommitLog(提交日志):
- 作用: 存储所有消息的实际内容,是消息的物理存储文件
- 特点: 采用顺序写入的方式,所有 Topic 的消息都写入同一个 CommitLog 文件
- 优势: 顺序写入性能极高,能够充分利用磁盘的写入性能
- 结构: 每个消息包含消息长度、消息内容、消息属性等信息
ConsumeQueue(消费队列):
- 作用: 为每个 Topic 的每个 Queue 建立索引,记录消息在 CommitLog 中的位置
- 特点: 按 Topic 和 Queue 进行分组,每个 ConsumeQueue 对应一个 Queue
- 优势: 支持随机读取,消费者可以快速定位到需要消费的消息
- 结构: 每条记录包含消息在 CommitLog 中的偏移量、消息大小、消息 Tag 等信息
IndexFile(索引文件):
- 作用: 为消息的 Key 建立索引,支持通过 Key 快速查找消息
- 特点: 采用 Hash 索引结构,支持精确匹配和模糊查询
- 优势: 提供消息的快速检索能力,支持消息的按 Key 查询功能
- 结构: 包含 Hash 槽、Hash 冲突链表、消息索引等信息
存储流程:
- 消息到达 Broker 后,首先写入 CommitLog 文件(顺序写入)
- 同时更新对应 Topic 和 Queue 的 ConsumeQueue 索引
- 如果消息包含 Key,则更新 IndexFile 索引
- 消费者通过 ConsumeQueue 找到消息在 CommitLog 中的位置,然后读取消息内容
安装配置
环境要求
- JDK 1.8+
- Maven 3.2+
- 内存 4GB+
- 磁盘空间 10GB+
下载安装
# 下载 RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
# 解压
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-all-4.9.4-bin-release
配置修改
1. 修改 JVM 参数
# 修改 runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
# 修改 runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
2. 配置 Broker
# broker.conf
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
启动服务
# 启动 NameServer
nohup sh mqnamesrv &
# 启动 Broker
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
快速开始
Maven 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
生产者示例
public class ProducerExample {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", "TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者示例
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.printf("Receive message: %s%n",
new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息发送
发送方式
1. 同步发送
public class SyncProducer {
public void sendSyncMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 同步发送,等待发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("Send result: %s%n", sendResult);
producer.shutdown();
}
}
2. 异步发送
public class AsyncProducer {
public void sendAsyncMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送,不等待发送结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("Send success: %s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("Send failed: %s%n", e.getMessage());
}
});
producer.shutdown();
}
}
3. 单向发送
public class OnewayProducer {
public void sendOnewayMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 单向发送,不关心发送结果
producer.sendOneway(msg);
producer.shutdown();
}
}
消息发送流程
消息发送是 RocketMQ 的核心流程之一,涉及多个组件的协同工作。整个流程设计考虑了高可用、负载均衡和故障转移等关键因素:
详细发送流程说明:
路由信息获取:
- 生产者启动时,首先向 NameServer 注册并获取 Topic 的路由信息
- NameServer 返回该 Topic 对应的 Broker 列表和 Queue 分布情况
- 生产者缓存路由信息,定期更新以应对 Broker 的变化
消息路由选择:
- 根据负载均衡策略(如轮询、随机等)选择合适的 Broker
- 根据消息的 Key 或业务规则选择具体的 Queue
- 支持自定义消息队列选择器,实现特定的路由逻辑
消息发送:
- 将消息发送到选定的 Broker
- Broker 接收消息后进行验证和存储
- 根据发送模式(同步/异步/单向)返回相应的结果
结果处理:
- 同步发送:等待 Broker 返回发送结果,确保消息已成功存储
- 异步发送:通过回调函数处理发送结果,不阻塞主线程
- 单向发送:不关心发送结果,适用于对可靠性要求不高的场景
故障处理:
- 如果发送失败,自动重试或切换到其他 Broker
- 更新本地路由缓存,剔除故障的 Broker
- 支持发送失败的回调处理,便于业务逻辑的容错处理
消息消费
消费模式
1. 集群消费 (Clustering)
public class ClusterConsumer {
public void consumeMessage() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
// 集群消费:每条消息只会被一个消费者消费
for (MessageExt message : messages) {
System.out.printf("Receive message: %s%n",
new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
2. 广播消费 (Broadcasting)
public class BroadcastConsumer {
public void consumeMessage() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.BROADCASTING); // 设置为广播模式
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
// 广播消费:每条消息会被所有消费者消费
for (MessageExt message : messages) {
System.out.printf("Receive message: %s%n",
new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消费进度管理
消费进度管理是 RocketMQ 保证消息可靠消费的关键机制。它通过记录每个消费者组的消费位置,确保消息不会丢失,同时支持消息的重试和故障恢复:
消费进度管理机制详解:
消费进度存储:
- 消费进度存储在 Broker 端,以消费者组为单位进行管理
- 每个消费者组对每个 Queue 都有一个消费进度记录
- 消费进度记录消息在 Queue 中的偏移量(Offset)
进度更新策略:
- 自动更新: 消费者处理完消息后,自动向 Broker 报告消费进度
- 批量更新: 支持批量更新消费进度,减少网络开销
- 定时更新: 定期更新消费进度,即使没有处理消息也会更新
重试机制:
- 重试次数: 消息处理失败时,会自动重试,默认最多重试 16 次
- 重试间隔: 重试间隔逐渐增加,避免频繁重试对系统造成压力
- 死信队列: 超过最大重试次数的消息会进入死信队列,需要人工处理
故障恢复:
- 消费者重启: 消费者重启后,会从上次消费的位置继续消费
- 消费者扩容: 新增消费者时,会重新分配 Queue,实现负载均衡
- 消费者缩容: 减少消费者时,剩余的消费者会接管更多的 Queue
进度查询:
- 支持查询指定消费者组的消费进度
- 支持查询消息的堆积情况,便于监控和告警
- 支持重置消费进度,实现消息的重新消费
高级特性
1. 顺序消息
顺序消息是 RocketMQ 提供的重要特性,能够保证消息按照发送顺序被消费。这在很多业务场景中非常重要,比如订单状态变更、库存扣减等需要严格顺序的操作。
顺序消息的实现原理:
- 通过将消息发送到同一个 Queue 来保证顺序
- 消费者按顺序从 Queue 中拉取消息进行处理
- 支持全局顺序和分区顺序两种模式
全局顺序消息
全局顺序消息保证所有消息都按照发送顺序被消费,适用于对顺序要求极高的场景,如金融交易、日志处理等。
public class OrderedProducer {
public void sendOrderedMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 全局顺序消息:所有消息按发送顺序消费
for (int i = 0; i < 10; i++) {
Message msg = new Message("OrderedTopic", "TagA",
("Ordered message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送到同一个队列保证顺序
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 选择第一个队列
return mqs.get(0);
}
}, null);
System.out.printf("Send result: %s%n", sendResult);
}
producer.shutdown();
}
}
分区顺序消息
分区顺序消息保证同一分区内的消息按顺序消费,不同分区之间的消息可以并行处理。这种模式在保证顺序的同时,还能提供更好的并发性能,适用于订单处理、用户行为分析等场景。
public class PartitionOrderedProducer {
public void sendPartitionOrderedMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("partition_ordered_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 分区顺序消息:同一分区的消息按顺序消费
for (int i = 0; i < 10; i++) {
Message msg = new Message("PartitionOrderedTopic", "TagA",
("Partition ordered message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 根据业务ID选择队列
String orderId = "ORDER_" + (i % 3); // 3个分区
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("Send result: %s%n", sendResult);
}
producer.shutdown();
}
}
2. 事务消息
事务消息是 RocketMQ 提供的分布式事务解决方案,能够保证本地事务和消息发送的一致性。它采用两阶段提交的思想,通过消息回查机制来确保事务的最终一致性。
事务消息的应用场景:
- 订单创建后发送支付消息
- 库存扣减后发送通知消息
- 用户注册后发送欢迎邮件
事务消息的执行流程:
- 发送半消息(Prepared 消息)到 Broker
- 执行本地事务
- 根据本地事务结果提交或回滚消息
- 如果长时间未收到确认,Broker 会回查事务状态
public class TransactionProducer {
public void sendTransactionMessage() throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 模拟本地事务处理
System.out.println("执行本地事务: " + new String(msg.getBody()));
// 返回提交状态
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 返回回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
// 发送事务消息
Message msg = new Message("TransactionTopic", "TagA",
"Transaction message".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("Transaction send result: %s%n", sendResult);
producer.shutdown();
}
}
3. 消息过滤
消息过滤功能允许消费者只接收感兴趣的消息,减少不必要的网络传输和处理开销。RocketMQ 支持两种过滤方式:Tag 过滤和 SQL 过滤。
消息过滤的优势:
- 减少网络带宽消耗
- 降低消费者处理压力
- 提高消息处理的精确性
- 支持复杂的过滤条件
Tag 过滤
Tag 过滤是 RocketMQ 提供的最简单的过滤方式,通过消息的 Tag 属性进行过滤。支持精确匹配和表达式匹配,适用于简单的消息分类场景。
public class TagFilterConsumer {
public void consumeWithTagFilter() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag_filter_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
// 只消费 TagA 和 TagB 的消息
consumer.subscribe("FilterTopic", "TagA || TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.printf("Receive message: %s, Tag: %s%n",
new String(message.getBody()), message.getTags());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
SQL 过滤
SQL 过滤支持使用 SQL92 语法对消息的 Properties 属性进行过滤,提供更强大的过滤能力。适用于需要根据消息属性进行复杂过滤的场景。
public class SQLFilterConsumer {
public void consumeWithSQLFilter() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql_filter_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
// 使用 SQL92 语法过滤消息
consumer.subscribe("SQLFilterTopic",
MessageSelector.bySql("a > 5 AND b = 'test'"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.printf("Receive message: %s%n",
new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
4. 消息回溯
消息回溯功能允许消费者从指定时间点开始消费消息,这对于数据恢复、重新处理历史数据等场景非常有用。
消息回溯的应用场景:
- 系统故障后的数据恢复
- 重新处理历史数据
- 数据分析和统计
- 测试和调试
回溯方式:
- 按时间回溯:从指定时间点开始消费
- 按偏移量回溯:从指定偏移量开始消费
- 从最新位置消费:从最新消息开始消费
public class MessageBacktrackConsumer {
public void consumeFromTimestamp() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("backtrack_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("BacktrackTopic", "*");
// 设置消费起始时间(回溯到指定时间)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp("20231201080000"); // 格式:yyyyMMddHHmmss
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.printf("Receive message: %s, StoreTime: %s%n",
new String(message.getBody()),
new Date(message.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
最佳实践
1. 性能优化
生产者优化
public class OptimizedProducer {
public void optimizedSend() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("optimized_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 性能优化配置
producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值
producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小
producer.setSendMsgTimeout(3000); // 发送超时时间
producer.setRetryTimesWhenSendFailed(2); // 发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数
producer.start();
// 批量发送消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message msg = new Message("BatchTopic", "TagA",
("Batch message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
messages.add(msg);
}
// 批量发送
SendResult sendResult = producer.send(messages);
System.out.printf("Batch send result: %s%n", sendResult);
producer.shutdown();
}
}
消费者优化
public class OptimizedConsumer {
public void optimizedConsume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OptimizedTopic", "*");
// 性能优化配置
consumer.setConsumeThreadMin(20); // 最小消费线程数
consumer.setConsumeThreadMax(64); // 最大消费线程数
consumer.setConsumeMessageBatchMaxSize(32); // 批量消费消息数量
consumer.setPullBatchSize(32); // 批量拉取消息数量
consumer.setPullInterval(0); // 拉取间隔,0表示不间隔
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
// 批量处理消息
for (MessageExt message : messages) {
System.out.printf("Receive message: %s%n",
new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
2. 监控告警
关键指标监控
监控配置
public class MonitoringConfig {
public void setupMonitoring() {
// 设置监控日志
System.setProperty("rocketmq.client.logUseSlf4j", "true");
System.setProperty("rocketmq.client.logLevel", "INFO");
System.setProperty("rocketmq.client.logRoot", "/opt/logs/rocketmq");
// 设置统计开关
System.setProperty("rocketmq.client.statistics", "true");
}
}
3. 故障处理
消费失败处理
public class FailureHandlingConsumer {
public void handleConsumeFailure() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("failure_handling_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("FailureTopic", "*");
// 设置重试次数
consumer.setMaxReconsumeTimes(3);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
try {
// 业务处理逻辑
processMessage(message);
System.out.printf("Process message success: %s%n",
new String(message.getBody()));
} catch (Exception e) {
System.err.printf("Process message failed: %s, error: %s%n",
new String(message.getBody()), e.getMessage());
// 根据重试次数决定是否继续重试
if (message.getReconsumeTimes() >= 3) {
// 超过重试次数,记录到死信队列或人工处理
handleDeadLetterMessage(message);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
// 返回重试状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
private void processMessage(MessageExt message) throws Exception {
// 模拟业务处理
if (Math.random() < 0.3) {
throw new RuntimeException("模拟处理失败");
}
}
private void handleDeadLetterMessage(MessageExt message) {
// 处理死信消息
System.out.printf("Handle dead letter message: %s%n",
new String(message.getBody()));
}
}
常见问题
1. 消息丢失问题
问题描述: 消息发送成功但消费者没有收到消息
可能原因:
- 消费者消费速度过慢,消息过期被删除
- 消费者组配置错误
- 网络问题导致消息丢失
解决方案:
// 1. 检查消息存储时间
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 2. 增加消息存储时间
// 在 broker.conf 中配置
fileReservedTime=72 // 消息保留72小时
// 3. 使用同步发送确保消息到达
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败处理
}
2. 消息重复消费
问题描述: 同一条消息被消费多次
可能原因:
- 网络抖动导致消息确认失败
- 消费者重启导致消息重复拉取
- 事务消息回查机制
解决方案:
// 1. 实现幂等性处理
public class IdempotentConsumer {
private Set<String> processedMessages = new ConcurrentHashMap<>();
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
String messageId = message.getMsgId();
// 检查消息是否已处理
if (processedMessages.contains(messageId)) {
System.out.println("Message already processed: " + messageId);
continue;
}
// 处理消息
processMessage(message);
// 记录已处理的消息
processedMessages.add(messageId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3. 消费积压问题
问题描述: 消费者处理速度跟不上消息生产速度
解决方案:
// 1. 增加消费者实例
// 2. 优化消费逻辑
// 3. 调整消费参数
consumer.setConsumeThreadMin(50);
consumer.setConsumeThreadMax(100);
consumer.setConsumeMessageBatchMaxSize(64);
// 4. 使用批量消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
// 批量处理消息
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (MessageExt message : messages) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
processMessage(message);
});
futures.add(future);
}
// 等待所有消息处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
4. 性能调优
JVM 参数调优:
# 生产环境推荐配置
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
JAVA_OPT="${JAVA_OPT} -XX:+DisableExplicitGC"
Broker 配置优化:
# broker.conf
# 异步刷盘,提高性能
flushDiskType=ASYNC_FLUSH
# 增加发送线程数
sendMessageThreadPoolNums=16
# 增加拉取线程数
pullMessageThreadPoolNums=20
# 优化内存映射
mappedFileSizeCommitLog=1073741824
mappedFileSizeConsumeQueue=300000
总结
RocketMQ 是一个功能强大、性能优异的分布式消息中间件。通过合理配置和使用,可以满足各种业务场景的需求。在使用过程中,需要注意:
- 合理设计消息模型: 根据业务需求选择合适的 Topic 和 Queue 数量
- 优化性能参数: 根据实际负载调整各种参数
- 实现幂等性: 确保消息处理的幂等性
- 监控告警: 建立完善的监控体系
- 故障处理: 制定完善的故障处理方案
通过本文的介绍,相信您已经对 RocketMQ 有了全面的了解,可以开始在实际项目中应用了。
