DukeDuke
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • JAVA

      • JVM

        • 认识JVM
        • JVM类加载器
        • 运行时数据区
        • 执行引擎
        • 本地方法接口
        • 本地方法库
        • JVM垃圾回收
        • JVM性能监控
        • JVM调优
      • 设计模式
        • 单例模式
        • 工厂模式
        • 策略模式
        • 适配器模式
        • 建造者模式
        • 原型模式
        • 装饰器模式
        • 代理模式
        • 外观模式
        • 享元模式
        • 组合模式
        • 桥接模式
      • Java多线程

        • Java 线程基础详解
        • Java 线程池详解
        • Java ThreadLocal 详解
        • Java volatile 详解
        • Java 线程间通信详解
        • Java 线程安全详解
        • Java 线程调度详解
        • Java 线程优先级详解

        • Java 线程中断详解
        • Java 线程死锁详解
      • Java反射
      • Java 面试题

        • Java 基础概念面试题
        • Java 面向对象编程面试题
        • Java 集合框架面试题
        • Java 多线程与并发面试题
        • JVM 与内存管理面试题
        • Java I/O 与 NIO 面试题
        • Java 异常处理面试题
        • Java 反射与注解面试题
        • Java Spring 框架面试题
        • Java 数据库与 JDBC 面试题
        • Java 性能优化面试题
        • Java 实际项目经验面试题
        • Java 高级特性面试题
        • Java 面试准备建议
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

RocketMQ 消息队列

目录

  • 概述
  • 核心概念
  • 架构设计
  • 安装配置
  • 快速开始
  • 消息发送
  • 消息消费
  • 高级特性
  • 最佳实践
  • 常见问题

概述

Apache RocketMQ 是一个分布式消息中间件,具有低延迟、高并发、高可用、高可靠的特点。它广泛应用于电商、金融、物流等对消息可靠性要求较高的场景。

主要特性

  • 高并发: 支持单机百万级消息处理
  • 低延迟: 毫秒级消息投递
  • 高可用: 支持集群部署,自动故障转移
  • 消息可靠性: 支持消息持久化,确保消息不丢失
  • 顺序消息: 支持全局顺序和分区顺序
  • 事务消息: 支持分布式事务
  • 消息过滤: 支持 SQL92 语法和 Tag 过滤
  • 消息回溯: 支持按时间回溯消息

核心概念

1. 消息模型

RocketMQ 采用发布-订阅模式的消息模型,整个系统由四个核心组件构成,各司其职,协同工作:

  • Producer(生产者): 消息的发送方,负责将业务消息发送到指定的 Topic 中。生产者可以是一个应用程序、服务或系统,它们通过 RocketMQ 客户端 API 将消息推送到 Broker 集群。

  • Consumer(消费者): 消息的接收方,负责从指定的 Topic 中拉取并处理消息。消费者通过订阅 Topic 来接收消息,支持集群消费和广播消费两种模式。

  • Broker(消息代理): 消息存储和转发的核心组件,负责接收生产者发送的消息、持久化存储消息、以及向消费者推送消息。Broker 集群是 RocketMQ 的核心,承担着消息的存储、路由和负载均衡功能。

  • NameServer(命名服务器): 轻量级的注册中心,负责管理整个集群的路由信息、服务发现和负载均衡。NameServer 不存储消息数据,只维护 Broker 的地址信息和 Topic 的路由表,为生产者和消费者提供路由服务。

2. 消息结构

RocketMQ 中的消息(Message)是一个完整的数据单元,包含多个组成部分,每个部分都有其特定的作用:

消息组成部分详解:

  • Topic(主题): 消息的分类标识,类似于数据库中的表名。生产者将消息发送到特定的 Topic,消费者订阅感兴趣的 Topic 来接收消息。一个 Topic 可以包含多个 Queue,实现消息的分布式存储和并行处理。

  • Tag(标签): 消息的二级分类,用于对同一 Topic 下的消息进行更细粒度的分类。通过 Tag 可以实现消息的过滤,消费者可以只接收特定 Tag 的消息,提高消息处理的精确性。

  • Key(键): 消息的唯一标识符,通常用于消息去重、消息查询和消息追踪。Key 可以是业务相关的唯一 ID,如订单号、用户 ID 等。

  • Body(消息体): 消息的实际内容,是业务数据的载体。Body 是字节数组格式,可以存储任意格式的数据,如 JSON、XML、二进制数据等。

  • Properties(属性): 消息的扩展属性,以键值对的形式存储。Properties 可以用于存储消息的元数据信息,如消息来源、处理优先级、过期时间等。

3. 队列模型

RocketMQ 采用队列模型来实现消息的分布式存储和并行处理。这种设计使得系统能够支持高并发和大规模的消息处理:

队列模型的工作原理:

  • Topic 与 Queue 的关系: 每个 Topic 包含多个 Queue(队列),Queue 是消息存储和分发的基本单位。消息在 Topic 的各个 Queue 之间进行负载均衡分布,实现消息的并行处理。

  • 消息分布策略: 生产者发送消息时,RocketMQ 会根据负载均衡算法将消息分布到不同的 Queue 中。这种设计避免了单点瓶颈,提高了系统的吞吐量和并发处理能力。

  • 消费者负载均衡: 消费者组中的多个消费者实例会平均分配 Topic 下的 Queue,每个消费者负责处理分配给它的 Queue 中的消息。当消费者数量变化时,系统会自动重新分配 Queue,实现动态负载均衡。

  • 顺序保证: 在同一个 Queue 内,消息的消费顺序与发送顺序一致。如果需要全局顺序,可以将所有消息发送到同一个 Queue;如果需要分区顺序,可以根据业务键(如订单 ID)将相关消息发送到同一个 Queue。

架构设计

整体架构

RocketMQ 采用分布式架构设计,通过多个组件的协同工作来实现高可用、高性能的消息处理能力。整个系统采用主从复制模式,确保数据的高可靠性:

架构设计特点:

  • 集群化部署: 所有组件都支持集群部署,NameServer 集群提供高可用的路由服务,Broker 集群提供高可用的消息存储服务。

  • 主从复制: Broker 采用 Master-Slave 架构,Master 负责读写操作,Slave 负责数据备份。当 Master 故障时,Slave 可以自动切换为 Master,保证服务不中断。

  • 无状态设计: NameServer 采用无状态设计,各个节点之间相互独立,任何一个节点故障都不会影响整体服务。

  • 负载均衡: 生产者和消费者通过 NameServer 获取路由信息,实现自动的负载均衡和故障转移。

组件职责

RocketMQ 的各个组件都有明确的职责分工,通过协同工作来实现完整的消息处理流程:

组件职责
NameServer路由信息管理、服务发现、负载均衡
Broker消息存储、消息转发、消息过滤
Producer消息发送、负载均衡、故障转移
Consumer消息消费、负载均衡、消息确认

详细职责说明:

  • NameServer 职责:

    • 路由信息管理: 维护 Topic 与 Broker 的映射关系,记录每个 Topic 分布在哪些 Broker 上
    • 服务发现: 为生产者和消费者提供 Broker 的地址信息,实现动态的服务发现
    • 负载均衡: 根据 Broker 的负载情况,为生产者和消费者提供最优的路由选择
    • 健康检查: 定期检查 Broker 的健康状态,及时剔除故障节点
  • Broker 职责:

    • 消息存储: 将接收到的消息持久化存储到磁盘,支持消息的可靠存储
    • 消息转发: 根据消费者的订阅信息,将消息推送给相应的消费者
    • 消息过滤: 支持 Tag 过滤和 SQL 过滤,减少不必要的网络传输
    • 事务处理: 支持事务消息的处理,确保分布式事务的一致性
  • Producer 职责:

    • 消息发送: 将业务消息发送到指定的 Topic
    • 负载均衡: 根据 Broker 的负载情况选择最优的 Broker 进行消息发送
    • 故障转移: 当某个 Broker 不可用时,自动切换到其他可用的 Broker
    • 消息确认: 支持同步、异步和单向三种发送模式,满足不同的业务需求
  • Consumer 职责:

    • 消息消费: 从订阅的 Topic 中拉取消息并进行业务处理
    • 负载均衡: 在消费者组内进行负载均衡,确保消息的均匀分配
    • 消息确认: 处理完消息后向 Broker 确认,支持消费进度的管理
    • 故障恢复: 当消费者重启时,能够从上次消费的位置继续消费

消息存储架构

RocketMQ 采用高效的消息存储架构,通过三种不同类型的文件来实现消息的存储、索引和查询功能:

存储架构详解:

  • CommitLog(提交日志):

    • 作用: 存储所有消息的实际内容,是消息的物理存储文件
    • 特点: 采用顺序写入的方式,所有 Topic 的消息都写入同一个 CommitLog 文件
    • 优势: 顺序写入性能极高,能够充分利用磁盘的写入性能
    • 结构: 每个消息包含消息长度、消息内容、消息属性等信息
  • ConsumeQueue(消费队列):

    • 作用: 为每个 Topic 的每个 Queue 建立索引,记录消息在 CommitLog 中的位置
    • 特点: 按 Topic 和 Queue 进行分组,每个 ConsumeQueue 对应一个 Queue
    • 优势: 支持随机读取,消费者可以快速定位到需要消费的消息
    • 结构: 每条记录包含消息在 CommitLog 中的偏移量、消息大小、消息 Tag 等信息
  • IndexFile(索引文件):

    • 作用: 为消息的 Key 建立索引,支持通过 Key 快速查找消息
    • 特点: 采用 Hash 索引结构,支持精确匹配和模糊查询
    • 优势: 提供消息的快速检索能力,支持消息的按 Key 查询功能
    • 结构: 包含 Hash 槽、Hash 冲突链表、消息索引等信息

存储流程:

  1. 消息到达 Broker 后,首先写入 CommitLog 文件(顺序写入)
  2. 同时更新对应 Topic 和 Queue 的 ConsumeQueue 索引
  3. 如果消息包含 Key,则更新 IndexFile 索引
  4. 消费者通过 ConsumeQueue 找到消息在 CommitLog 中的位置,然后读取消息内容

安装配置

环境要求

  • JDK 1.8+
  • Maven 3.2+
  • 内存 4GB+
  • 磁盘空间 10GB+

下载安装

# 下载 RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

# 解压
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-all-4.9.4-bin-release

配置修改

1. 修改 JVM 参数

# 修改 runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"

# 修改 runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

2. 配置 Broker

# broker.conf
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

启动服务

# 启动 NameServer
nohup sh mqnamesrv &

# 启动 Broker
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

快速开始

Maven 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

生产者示例

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 发送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者示例

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s%n",
                        new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消息发送

发送方式

1. 同步发送

public class SyncProducer {
    public void sendSyncMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA",
            "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 同步发送,等待发送结果
        SendResult sendResult = producer.send(msg);
        System.out.printf("Send result: %s%n", sendResult);

        producer.shutdown();
    }
}

2. 异步发送

public class AsyncProducer {
    public void sendAsyncMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA",
            "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 异步发送,不等待发送结果
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("Send success: %s%n", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.printf("Send failed: %s%n", e.getMessage());
            }
        });

        producer.shutdown();
    }
}

3. 单向发送

public class OnewayProducer {
    public void sendOnewayMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA",
            "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 单向发送,不关心发送结果
        producer.sendOneway(msg);

        producer.shutdown();
    }
}

消息发送流程

消息发送是 RocketMQ 的核心流程之一,涉及多个组件的协同工作。整个流程设计考虑了高可用、负载均衡和故障转移等关键因素:

详细发送流程说明:

  1. 路由信息获取:

    • 生产者启动时,首先向 NameServer 注册并获取 Topic 的路由信息
    • NameServer 返回该 Topic 对应的 Broker 列表和 Queue 分布情况
    • 生产者缓存路由信息,定期更新以应对 Broker 的变化
  2. 消息路由选择:

    • 根据负载均衡策略(如轮询、随机等)选择合适的 Broker
    • 根据消息的 Key 或业务规则选择具体的 Queue
    • 支持自定义消息队列选择器,实现特定的路由逻辑
  3. 消息发送:

    • 将消息发送到选定的 Broker
    • Broker 接收消息后进行验证和存储
    • 根据发送模式(同步/异步/单向)返回相应的结果
  4. 结果处理:

    • 同步发送:等待 Broker 返回发送结果,确保消息已成功存储
    • 异步发送:通过回调函数处理发送结果,不阻塞主线程
    • 单向发送:不关心发送结果,适用于对可靠性要求不高的场景
  5. 故障处理:

    • 如果发送失败,自动重试或切换到其他 Broker
    • 更新本地路由缓存,剔除故障的 Broker
    • 支持发送失败的回调处理,便于业务逻辑的容错处理

消息消费

消费模式

1. 集群消费 (Clustering)

public class ClusterConsumer {
    public void consumeMessage() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                // 集群消费:每条消息只会被一个消费者消费
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s%n",
                        new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

2. 广播消费 (Broadcasting)

public class BroadcastConsumer {
    public void consumeMessage() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING); // 设置为广播模式
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                // 广播消费:每条消息会被所有消费者消费
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s%n",
                        new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

消费进度管理

消费进度管理是 RocketMQ 保证消息可靠消费的关键机制。它通过记录每个消费者组的消费位置,确保消息不会丢失,同时支持消息的重试和故障恢复:

消费进度管理机制详解:

  • 消费进度存储:

    • 消费进度存储在 Broker 端,以消费者组为单位进行管理
    • 每个消费者组对每个 Queue 都有一个消费进度记录
    • 消费进度记录消息在 Queue 中的偏移量(Offset)
  • 进度更新策略:

    • 自动更新: 消费者处理完消息后,自动向 Broker 报告消费进度
    • 批量更新: 支持批量更新消费进度,减少网络开销
    • 定时更新: 定期更新消费进度,即使没有处理消息也会更新
  • 重试机制:

    • 重试次数: 消息处理失败时,会自动重试,默认最多重试 16 次
    • 重试间隔: 重试间隔逐渐增加,避免频繁重试对系统造成压力
    • 死信队列: 超过最大重试次数的消息会进入死信队列,需要人工处理
  • 故障恢复:

    • 消费者重启: 消费者重启后,会从上次消费的位置继续消费
    • 消费者扩容: 新增消费者时,会重新分配 Queue,实现负载均衡
    • 消费者缩容: 减少消费者时,剩余的消费者会接管更多的 Queue
  • 进度查询:

    • 支持查询指定消费者组的消费进度
    • 支持查询消息的堆积情况,便于监控和告警
    • 支持重置消费进度,实现消息的重新消费

高级特性

1. 顺序消息

顺序消息是 RocketMQ 提供的重要特性,能够保证消息按照发送顺序被消费。这在很多业务场景中非常重要,比如订单状态变更、库存扣减等需要严格顺序的操作。

顺序消息的实现原理:

  • 通过将消息发送到同一个 Queue 来保证顺序
  • 消费者按顺序从 Queue 中拉取消息进行处理
  • 支持全局顺序和分区顺序两种模式

全局顺序消息

全局顺序消息保证所有消息都按照发送顺序被消费,适用于对顺序要求极高的场景,如金融交易、日志处理等。

public class OrderedProducer {
    public void sendOrderedMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 全局顺序消息:所有消息按发送顺序消费
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("OrderedTopic", "TagA",
                ("Ordered message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 发送到同一个队列保证顺序
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 选择第一个队列
                    return mqs.get(0);
                }
            }, null);

            System.out.printf("Send result: %s%n", sendResult);
        }

        producer.shutdown();
    }
}

分区顺序消息

分区顺序消息保证同一分区内的消息按顺序消费,不同分区之间的消息可以并行处理。这种模式在保证顺序的同时,还能提供更好的并发性能,适用于订单处理、用户行为分析等场景。

public class PartitionOrderedProducer {
    public void sendPartitionOrderedMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("partition_ordered_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 分区顺序消息:同一分区的消息按顺序消费
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("PartitionOrderedTopic", "TagA",
                ("Partition ordered message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 根据业务ID选择队列
            String orderId = "ORDER_" + (i % 3); // 3个分区
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    String orderId = (String) arg;
                    int index = Math.abs(orderId.hashCode()) % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("Send result: %s%n", sendResult);
        }

        producer.shutdown();
    }
}

2. 事务消息

事务消息是 RocketMQ 提供的分布式事务解决方案,能够保证本地事务和消息发送的一致性。它采用两阶段提交的思想,通过消息回查机制来确保事务的最终一致性。

事务消息的应用场景:

  • 订单创建后发送支付消息
  • 库存扣减后发送通知消息
  • 用户注册后发送欢迎邮件

事务消息的执行流程:

  1. 发送半消息(Prepared 消息)到 Broker
  2. 执行本地事务
  3. 根据本地事务结果提交或回滚消息
  4. 如果长时间未收到确认,Broker 会回查事务状态
public class TransactionProducer {
    public void sendTransactionMessage() throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                try {
                    // 模拟本地事务处理
                    System.out.println("执行本地事务: " + new String(msg.getBody()));
                    // 返回提交状态
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    // 返回回滚状态
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 检查本地事务状态
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        // 发送事务消息
        Message msg = new Message("TransactionTopic", "TagA",
            "Transaction message".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("Transaction send result: %s%n", sendResult);

        producer.shutdown();
    }
}

3. 消息过滤

消息过滤功能允许消费者只接收感兴趣的消息,减少不必要的网络传输和处理开销。RocketMQ 支持两种过滤方式:Tag 过滤和 SQL 过滤。

消息过滤的优势:

  • 减少网络带宽消耗
  • 降低消费者处理压力
  • 提高消息处理的精确性
  • 支持复杂的过滤条件

Tag 过滤

Tag 过滤是 RocketMQ 提供的最简单的过滤方式,通过消息的 Tag 属性进行过滤。支持精确匹配和表达式匹配,适用于简单的消息分类场景。

public class TagFilterConsumer {
    public void consumeWithTagFilter() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag_filter_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");

        // 只消费 TagA 和 TagB 的消息
        consumer.subscribe("FilterTopic", "TagA || TagB");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s, Tag: %s%n",
                        new String(message.getBody()), message.getTags());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

SQL 过滤

SQL 过滤支持使用 SQL92 语法对消息的 Properties 属性进行过滤,提供更强大的过滤能力。适用于需要根据消息属性进行复杂过滤的场景。

public class SQLFilterConsumer {
    public void consumeWithSQLFilter() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql_filter_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");

        // 使用 SQL92 语法过滤消息
        consumer.subscribe("SQLFilterTopic",
            MessageSelector.bySql("a > 5 AND b = 'test'"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s%n",
                        new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

4. 消息回溯

消息回溯功能允许消费者从指定时间点开始消费消息,这对于数据恢复、重新处理历史数据等场景非常有用。

消息回溯的应用场景:

  • 系统故障后的数据恢复
  • 重新处理历史数据
  • 数据分析和统计
  • 测试和调试

回溯方式:

  • 按时间回溯:从指定时间点开始消费
  • 按偏移量回溯:从指定偏移量开始消费
  • 从最新位置消费:从最新消息开始消费
public class MessageBacktrackConsumer {
    public void consumeFromTimestamp() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("backtrack_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("BacktrackTopic", "*");

        // 设置消费起始时间(回溯到指定时间)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        consumer.setConsumeTimestamp("20231201080000"); // 格式:yyyyMMddHHmmss

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s, StoreTime: %s%n",
                        new String(message.getBody()),
                        new Date(message.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

最佳实践

1. 性能优化

生产者优化

public class OptimizedProducer {
    public void optimizedSend() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("optimized_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 性能优化配置
        producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值
        producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小
        producer.setSendMsgTimeout(3000); // 发送超时时间
        producer.setRetryTimesWhenSendFailed(2); // 发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数

        producer.start();

        // 批量发送消息
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("BatchTopic", "TagA",
                ("Batch message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            messages.add(msg);
        }

        // 批量发送
        SendResult sendResult = producer.send(messages);
        System.out.printf("Batch send result: %s%n", sendResult);

        producer.shutdown();
    }
}

消费者优化

public class OptimizedConsumer {
    public void optimizedConsume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("OptimizedTopic", "*");

        // 性能优化配置
        consumer.setConsumeThreadMin(20); // 最小消费线程数
        consumer.setConsumeThreadMax(64); // 最大消费线程数
        consumer.setConsumeMessageBatchMaxSize(32); // 批量消费消息数量
        consumer.setPullBatchSize(32); // 批量拉取消息数量
        consumer.setPullInterval(0); // 拉取间隔,0表示不间隔

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                // 批量处理消息
                for (MessageExt message : messages) {
                    System.out.printf("Receive message: %s%n",
                        new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

2. 监控告警

关键指标监控

监控配置

public class MonitoringConfig {
    public void setupMonitoring() {
        // 设置监控日志
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
        System.setProperty("rocketmq.client.logLevel", "INFO");
        System.setProperty("rocketmq.client.logRoot", "/opt/logs/rocketmq");

        // 设置统计开关
        System.setProperty("rocketmq.client.statistics", "true");
    }
}

3. 故障处理

消费失败处理

public class FailureHandlingConsumer {
    public void handleConsumeFailure() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("failure_handling_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("FailureTopic", "*");

        // 设置重试次数
        consumer.setMaxReconsumeTimes(3);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    try {
                        // 业务处理逻辑
                        processMessage(message);
                        System.out.printf("Process message success: %s%n",
                            new String(message.getBody()));
                    } catch (Exception e) {
                        System.err.printf("Process message failed: %s, error: %s%n",
                            new String(message.getBody()), e.getMessage());

                        // 根据重试次数决定是否继续重试
                        if (message.getReconsumeTimes() >= 3) {
                            // 超过重试次数,记录到死信队列或人工处理
                            handleDeadLetterMessage(message);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        } else {
                            // 返回重试状态
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }

    private void processMessage(MessageExt message) throws Exception {
        // 模拟业务处理
        if (Math.random() < 0.3) {
            throw new RuntimeException("模拟处理失败");
        }
    }

    private void handleDeadLetterMessage(MessageExt message) {
        // 处理死信消息
        System.out.printf("Handle dead letter message: %s%n",
            new String(message.getBody()));
    }
}

常见问题

1. 消息丢失问题

问题描述: 消息发送成功但消费者没有收到消息

可能原因:

  • 消费者消费速度过慢,消息过期被删除
  • 消费者组配置错误
  • 网络问题导致消息丢失

解决方案:

// 1. 检查消息存储时间
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 2. 增加消息存储时间
// 在 broker.conf 中配置
fileReservedTime=72  // 消息保留72小时

// 3. 使用同步发送确保消息到达
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
    // 发送失败处理
}

2. 消息重复消费

问题描述: 同一条消息被消费多次

可能原因:

  • 网络抖动导致消息确认失败
  • 消费者重启导致消息重复拉取
  • 事务消息回查机制

解决方案:

// 1. 实现幂等性处理
public class IdempotentConsumer {
    private Set<String> processedMessages = new ConcurrentHashMap<>();

    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> messages,
            ConsumeConcurrentlyContext context) {
        for (MessageExt message : messages) {
            String messageId = message.getMsgId();

            // 检查消息是否已处理
            if (processedMessages.contains(messageId)) {
                System.out.println("Message already processed: " + messageId);
                continue;
            }

            // 处理消息
            processMessage(message);

            // 记录已处理的消息
            processedMessages.add(messageId);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

3. 消费积压问题

问题描述: 消费者处理速度跟不上消息生产速度

解决方案:

// 1. 增加消费者实例
// 2. 优化消费逻辑
// 3. 调整消费参数
consumer.setConsumeThreadMin(50);
consumer.setConsumeThreadMax(100);
consumer.setConsumeMessageBatchMaxSize(64);

// 4. 使用批量消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> messages,
            ConsumeConcurrentlyContext context) {
        // 批量处理消息
        List<CompletableFuture<Void>> futures = new ArrayList<>();

        for (MessageExt message : messages) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                processMessage(message);
            });
            futures.add(future);
        }

        // 等待所有消息处理完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

4. 性能调优

JVM 参数调优:

# 生产环境推荐配置
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
JAVA_OPT="${JAVA_OPT} -XX:+DisableExplicitGC"

Broker 配置优化:

# broker.conf
# 异步刷盘,提高性能
flushDiskType=ASYNC_FLUSH
# 增加发送线程数
sendMessageThreadPoolNums=16
# 增加拉取线程数
pullMessageThreadPoolNums=20
# 优化内存映射
mappedFileSizeCommitLog=1073741824
mappedFileSizeConsumeQueue=300000

总结

RocketMQ 是一个功能强大、性能优异的分布式消息中间件。通过合理配置和使用,可以满足各种业务场景的需求。在使用过程中,需要注意:

  1. 合理设计消息模型: 根据业务需求选择合适的 Topic 和 Queue 数量
  2. 优化性能参数: 根据实际负载调整各种参数
  3. 实现幂等性: 确保消息处理的幂等性
  4. 监控告警: 建立完善的监控体系
  5. 故障处理: 制定完善的故障处理方案

通过本文的介绍,相信您已经对 RocketMQ 有了全面的了解,可以开始在实际项目中应用了。

最近更新:: 2025/9/4 17:20
Contributors: Duke
Next
Kafka