DukeDuke
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • JAVA

      • JVM

        • 认识JVM
        • JVM类加载器
        • 运行时数据区
        • 执行引擎
        • 本地方法接口
        • 本地方法库
        • JVM垃圾回收
        • JVM性能监控
        • JVM调优
      • 设计模式
        • 单例模式
        • 工厂模式
        • 策略模式
        • 适配器模式
        • 建造者模式
        • 原型模式
        • 装饰器模式
        • 代理模式
        • 外观模式
        • 享元模式
        • 组合模式
        • 桥接模式
      • Java多线程

        • Java 线程基础详解
        • Java 线程池详解
        • Java ThreadLocal 详解
        • Java volatile 详解
        • Java 线程间通信详解
        • Java 线程安全详解
        • Java 线程调度详解
        • Java 线程优先级详解

        • Java 线程中断详解
        • Java 线程死锁详解
      • Java反射
      • Java 面试题

        • Java 基础概念面试题
        • Java 面向对象编程面试题
        • Java 集合框架面试题
        • Java 多线程与并发面试题
        • JVM 与内存管理面试题
        • Java I/O 与 NIO 面试题
        • Java 异常处理面试题
        • Java 反射与注解面试题
        • Java Spring 框架面试题
        • Java 数据库与 JDBC 面试题
        • Java 性能优化面试题
        • Java 实际项目经验面试题
        • Java 高级特性面试题
        • Java 面试准备建议
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

Apache Kafka 消息队列

目录

  • 概述
  • 核心概念
  • 架构设计
  • 安装配置
  • 快速开始
  • 生产者
  • 消费者
  • 高级特性
  • 最佳实践
  • 常见问题

概述

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,现在是一个开源的分布式事件流平台。Kafka 被设计为高吞吐量、低延迟、可扩展的分布式消息系统。

主要特性

  • 高吞吐量: 支持每秒百万级消息处理
  • 低延迟: 毫秒级消息投递
  • 持久化: 消息持久化到磁盘,支持数据保留策略
  • 分布式: 支持集群部署,自动分区和复制
  • 流处理: 支持实时流数据处理
  • 容错性: 支持数据复制和故障恢复
  • 可扩展性: 支持水平扩展

核心概念

1. 消息模型

Kafka 的消息模型是一个发布-订阅模式,类似于传统的消息队列系统,但具有更强的分布式特性。整个消息模型围绕以下几个核心组件构建:

  • Producer(生产者): 消息的发送方,负责将业务数据封装成消息并发送到 Kafka 集群。生产者可以选择性地指定消息的键(Key)和分区,以实现消息的有序性和负载均衡。

  • Consumer(消费者): 消息的接收方,负责从 Kafka 集群中拉取并处理消息。消费者可以组成消费者组(Consumer Group)来实现负载均衡和容错。

  • Broker(代理): Kafka 集群中的服务器节点,负责存储消息、处理生产者和消费者的请求。每个 Broker 都有唯一的 ID 标识。

  • Topic(主题): 消息的逻辑分类,类似于数据库中的表。生产者将消息发送到特定的 Topic,消费者从 Topic 中订阅消息。

  • Partition(分区): Topic 的物理分割,每个 Topic 可以包含多个分区。分区是 Kafka 实现水平扩展和高吞吐量的关键机制,消息在分区内是有序的。

  • Offset(偏移量): 消息在分区中的唯一标识符,类似于数组索引。消费者通过维护偏移量来记录消费进度,实现断点续传。

2. 消息结构

Kafka 中的每条消息都包含多个字段,这些字段共同构成了消息的完整信息。消息结构的设计考虑了性能、可扩展性和功能完整性:

消息字段详细说明:

  • Topic(主题): 消息所属的主题名称,用于消息分类和路由
  • Partition(分区): 消息所在的分区编号,决定消息的物理存储位置
  • Offset(偏移量): 消息在分区中的唯一位置标识,用于消息定位和消费进度跟踪
  • Key(键): 可选的消息键,用于分区选择和消息去重,相同 Key 的消息会被路由到同一分区
  • Value(值): 消息的实际内容,可以是任意格式的数据(JSON、二进制等)
  • Timestamp(时间戳): 消息创建时间,用于消息排序和 TTL 管理
  • Headers(消息头): 可选的键值对元数据,用于传递额外的消息属性

3. 分区模型

Kafka 的分区模型是其实现高吞吐量和水平扩展的核心机制。分区(Partition)是 Topic 的物理分割,每个 Topic 可以包含多个分区,这些分区分布在不同的 Broker 上。

分区模型的工作原理:

  1. 水平扩展: 通过增加分区数量,可以并行处理更多消息,提高整体吞吐量
  2. 负载均衡: 消息可以分布到不同的分区,实现负载的均匀分布
  3. 有序性保证: 同一分区内的消息是有序的,但不同分区之间的消息顺序不保证
  4. 并行消费: 消费者可以并行消费不同分区的消息,提高消费效率

分区选择策略:

  • 无 Key 消息: 使用轮询(Round Robin)策略,消息均匀分布到各个分区
  • 有 Key 消息: 使用 Hash 策略,相同 Key 的消息总是路由到同一分区,保证消息的有序性
  • 指定分区: 生产者可以直接指定目标分区,适用于需要精确控制消息路由的场景

架构设计

整体架构

Kafka 采用分布式架构设计,主要由四个核心组件构成:生产者集群、Kafka 集群、消费者组和 Zookeeper 集群。这种架构设计实现了高可用性、高吞吐量和水平扩展能力。

架构组件说明:

  • Producer Cluster(生产者集群): 多个生产者实例,负责向 Kafka 集群发送消息。生产者可以独立运行,也可以集成在业务应用中。

  • Kafka Cluster(Kafka 集群): 由多个 Broker 节点组成的分布式集群,负责消息的存储、路由和分发。每个 Broker 可以处理多个 Topic 的多个分区。

  • Consumer Group(消费者组): 多个消费者实例组成的消费组,实现负载均衡和容错。组内消费者共同消费 Topic 的消息,每个分区只能被组内一个消费者消费。

  • Zookeeper(协调服务): 负责集群的元数据管理、配置管理、Leader 选举和消费者组协调。在 Kafka 2.8+版本中,Kafka 开始支持 KRaft 模式,可以逐步摆脱对 Zookeeper 的依赖。

数据流向:

  1. 消息发送: 生产者将消息发送到 Kafka 集群的指定 Topic
  2. 消息存储: Broker 将消息持久化到本地磁盘,并创建相应的索引文件
  3. 消息消费: 消费者从 Kafka 集群拉取消息进行处理
  4. 元数据同步: 所有组件通过 Zookeeper 进行元数据同步和协调

组件职责

组件职责
Broker消息存储、分区管理、副本同步
Producer消息发送、分区选择、负载均衡
Consumer消息消费、偏移量管理、组协调
Zookeeper集群协调、元数据管理、配置管理

存储架构

Kafka 的存储架构采用了日志分段(Log Segment)的设计,这种设计既保证了高性能的写入,又提供了高效的读取能力。每个分区在物理上被分割成多个段文件,每个段包含日志文件、偏移量索引文件和时间戳索引文件。

存储架构特点:

  1. 顺序写入: 消息以追加的方式写入日志文件,充分利用磁盘的顺序写入性能
  2. 分段管理: 将大的日志文件分割成多个段,便于管理和清理
  3. 索引优化: 通过偏移量索引和时间戳索引,实现快速的消息定位
  4. 批量操作: 支持批量写入和批量读取,提高 I/O 效率

文件类型说明:

  • .log 文件: 存储实际的消息数据,采用二进制格式,包含消息的完整信息
  • .index 文件: 偏移量索引文件,记录偏移量到物理位置的映射,用于快速定位消息
  • .timeindex 文件: 时间戳索引文件,记录时间戳到偏移量的映射,支持按时间范围查询消息

段文件管理策略:

  • 段大小限制: 当段文件达到配置的大小(默认 1GB)时,会创建新的段文件
  • 时间限制: 当段文件超过配置的时间(默认 7 天)时,会创建新的段文件
  • 清理策略: 支持基于时间和基于大小的日志清理策略,自动删除过期数据

安装配置

环境要求

  • JDK 1.8+
  • Zookeeper 3.4.6+
  • 内存 4GB+
  • 磁盘空间 20GB+

下载安装

# 下载 Kafka
wget https://downloads.apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz

# 解压
tar -xzf kafka_2.13-2.8.1.tgz
cd kafka_2.13-2.8.1

配置修改

1. 配置 Zookeeper

# config/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

2. 配置 Kafka

# config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

启动服务

# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动 Kafka
bin/kafka-server-start.sh config/server.properties &

快速开始

Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.1</version>
</dependency>

生产者示例

以下是一个完整的 Kafka 生产者示例,展示了如何创建生产者、配置序列化器、发送消息等基本操作:

public class ProducerExample {
    public static void main(String[] args) {
        // 1. 配置生产者属性
        Properties props = new Properties();
        // 指定Kafka集群的地址,可以是多个地址用逗号分隔
        props.put("bootstrap.servers", "localhost:9092");
        // 配置键的序列化器,用于将Java对象转换为字节数组
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 配置值的序列化器,用于将消息内容转换为字节数组
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建Kafka生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 3. 发送消息
            for (int i = 0; i < 10; i++) {
                // 创建生产者记录,包含主题、键和值
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    "test-topic",     // 主题名称
                    "key-" + i,       // 消息键,用于分区选择
                    "Hello Kafka " + i // 消息内容
                );

                // 发送消息(异步发送)
                producer.send(record);
                System.out.println("发送消息: " + record);
            }
        } finally {
            // 4. 关闭生产者,确保所有消息都被发送
            producer.close();
        }
    }
}

代码说明:

  1. 配置属性: 通过 Properties 对象配置生产者的各种参数,包括服务器地址、序列化器等
  2. 创建生产者: 使用配置的属性创建 KafkaProducer 实例
  3. 创建消息记录: ProducerRecord 封装了要发送的消息,包含主题、键、值等信息
  4. 发送消息: 调用 send()方法发送消息,默认是异步发送
  5. 资源清理: 使用完毕后关闭生产者,确保资源得到释放

消费者示例

以下是一个完整的 Kafka 消费者示例,展示了如何创建消费者、订阅主题、拉取和处理消息:

public class ConsumerExample {
    public static void main(String[] args) {
        // 1. 配置消费者属性
        Properties props = new Properties();
        // 指定Kafka集群的地址
        props.put("bootstrap.servers", "localhost:9092");
        // 设置消费者组ID,同一组内的消费者会协调消费消息
        props.put("group.id", "test-group");
        // 配置键的反序列化器,用于将字节数组转换为Java对象
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置值的反序列化器,用于将消息内容转换为Java对象
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2. 创建Kafka消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        try {
            // 3. 订阅主题,消费者会从这些主题中拉取消息
            consumer.subscribe(Arrays.asList("test-topic"));

            // 4. 消费消息的循环
            while (true) {
                // 拉取消息,超时时间为100毫秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                // 处理拉取到的消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息 - offset: %d, key: %s, value: %s%n",
                        record.offset(),    // 消息在分区中的偏移量
                        record.key(),       // 消息键
                        record.value()      // 消息内容
                    );

                    // 这里可以添加具体的业务处理逻辑
                    processMessage(record);
                }
            }
        } finally {
            // 5. 关闭消费者
            consumer.close();
        }
    }

    // 业务消息处理方法
    private static void processMessage(ConsumerRecord<String, String> record) {
        // 实现具体的业务逻辑
        // 例如:数据转换、存储到数据库、调用其他服务等
        System.out.println("处理消息: " + record.value());
    }
}

代码说明:

  1. 配置属性: 配置消费者的各种参数,包括服务器地址、消费者组 ID、反序列化器等
  2. 创建消费者: 使用配置的属性创建 KafkaConsumer 实例
  3. 订阅主题: 通过 subscribe()方法订阅一个或多个主题
  4. 拉取消息: 使用 poll()方法从 Kafka 拉取消息,这是一个阻塞操作
  5. 处理消息: 遍历拉取到的消息记录,进行业务处理
  6. 资源清理: 使用完毕后关闭消费者,释放相关资源

重要注意事项:

  • 消费者组: 同一消费者组内的消费者会协调消费消息,每个分区只能被组内一个消费者消费
  • 偏移量管理: 消费者会自动管理偏移量,记录消费进度
  • 消息处理: 在实际应用中,应该将消息处理逻辑放在 try-catch 块中,确保异常不会影响消费进度

生产者

发送方式

1. 同步发送

public class SyncProducer {
    public void sendSyncMessage() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(
            "sync-topic", "key", "Hello Kafka");

        try {
            // 同步发送,等待发送结果
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("Sent record to topic %s partition %d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        }

        producer.close();
    }
}

2. 异步发送

public class AsyncProducer {
    public void sendAsyncMessage() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(
            "async-topic", "key", "Hello Kafka");

        // 异步发送,不等待发送结果
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    exception.printStackTrace();
                } else {
                    System.out.printf("Sent record to topic %s partition %d with offset %d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
                }
            }
        });

        producer.close();
    }
}

分区策略

public class PartitionStrategyProducer {
    public void sendWithPartitionStrategy() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 1. 指定分区发送
        ProducerRecord<String, String> record1 = new ProducerRecord<>(
            "partition-topic", 0, "key1", "Message to partition 0");
        producer.send(record1);

        // 2. 使用键进行分区
        ProducerRecord<String, String> record2 = new ProducerRecord<>(
            "partition-topic", "user-123", "Message for user 123");
        producer.send(record2);

        // 3. 自定义分区器
        props.put("partitioner.class", "com.example.CustomPartitioner");
        KafkaProducer<String, String> customProducer = new KafkaProducer<>(props);

        producer.close();
        customProducer.close();
    }
}

消息发送流程

消费者

消费模式

1. 消费者组模式

public class ConsumerGroupExample {
    public void consumeWithGroup() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("group-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
            }
        }
    }
}

2. 独立消费者模式

public class IndependentConsumerExample {
    public void consumeIndependently() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 指定分区和偏移量
        TopicPartition partition = new TopicPartition("independent-topic", 0);
        consumer.assign(Arrays.asList(partition));
        consumer.seek(partition, 0); // 从开始位置消费

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
            }
        }
    }
}

偏移量管理

public class OffsetManagementExample {
    public void manageOffsets() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "offset-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 偏移量提交策略
        props.put("enable.auto.commit", "false"); // 手动提交偏移量

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("offset-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
            }

            // 手动提交偏移量
            consumer.commitSync();
        }
    }
}

消费进度管理

高级特性

1. 事务消息

public class TransactionalProducer {
    public void sendTransactionalMessage() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("transactional.id", "my-transactional-id");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();

        try {
            producer.beginTransaction();

            // 发送多条消息
            producer.send(new ProducerRecord<>("tx-topic", "key1", "value1"));
            producer.send(new ProducerRecord<>("tx-topic", "key2", "value2"));

            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 回滚事务
            producer.abortTransaction();
            e.printStackTrace();
        }

        producer.close();
    }
}

2. 流处理

public class StreamProcessingExample {
    public void processStream() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 创建输入流
        KStream<String, String> source = builder.stream("input-topic");

        // 处理流数据
        KStream<String, String> processed = source
            .filter((key, value) -> value.length() > 5)
            .mapValues(value -> value.toUpperCase())
            .selectKey((key, value) -> value.substring(0, 3));

        // 输出到目标Topic
        processed.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

3. 连接器

public class ConnectorExample {
    public void setupFileConnector() {
        // 文件连接器配置
        Map<String, String> config = new HashMap<>();
        config.put("connector.class", "FileStreamSource");
        config.put("file", "/path/to/input/file.txt");
        config.put("topic", "file-topic");

        // 创建连接器
        Connector connector = new FileStreamSourceConnector();
        connector.start(config);
    }
}

4. 监控指标

最佳实践

1. 性能优化

Kafka 的性能优化主要从生产者、消费者和 Broker 三个层面进行。通过合理的参数配置和代码优化,可以显著提升 Kafka 的吞吐量和降低延迟。

生产者优化

生产者性能优化的核心思想是减少网络请求次数、提高批量处理能力和合理配置确认机制。以下是优化后的生产者示例:

public class OptimizedProducer {
    public void optimizedSend() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 性能优化配置详解
        props.put("batch.size", 16384);        // 批量大小:16KB,当消息累积到16KB时发送
        props.put("linger.ms", 5);             // 等待时间:5ms,等待更多消息加入批次
        props.put("compression.type", "snappy"); // 压缩类型:使用snappy压缩算法
        props.put("acks", "1");                // 确认级别:只需Leader确认,平衡性能和可靠性
        props.put("retries", 3);               // 重试次数:失败时重试3次
        props.put("buffer.memory", 33554432);  // 缓冲区大小:32MB,用于缓存待发送消息
        props.put("max.in.flight.requests.per.connection", 5); // 最大并发请求数

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 批量发送示例
        List<ProducerRecord<String, String>> records = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            records.add(new ProducerRecord<>("batch-topic", "key-" + i, "value-" + i));
        }

        // 批量发送消息
        for (ProducerRecord<String, String> record : records) {
            producer.send(record);
        }

        producer.close();
    }
}

生产者优化参数说明:

  • batch.size: 控制批量发送的大小,较大的批次可以减少网络请求次数,但会增加延迟
  • linger.ms: 控制消息在发送前的等待时间,允许更多消息加入同一批次
  • compression.type: 压缩算法可以减少网络传输量,snappy 在压缩率和性能间取得良好平衡
  • acks: 确认级别,0 表示不等待确认(最快),1 表示等待 Leader 确认,all 表示等待所有副本确认
  • buffer.memory: 生产者缓冲区大小,用于缓存待发送的消息
  • max.in.flight.requests.per.connection: 控制并发请求数,避免消息乱序

消费者优化

消费者性能优化的关键在于提高拉取效率、优化处理逻辑和合理配置超时参数。以下是优化后的消费者示例:

public class OptimizedConsumer {
    public void optimizedConsume() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "optimized-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 性能优化配置详解
        props.put("fetch.min.bytes", 1);              // 最小拉取字节数:至少拉取1字节
        props.put("fetch.max.wait.ms", 500);          // 最大等待时间:最多等待500ms
        props.put("max.partition.fetch.bytes", 1048576); // 分区最大拉取字节数:1MB
        props.put("session.timeout.ms", 30000);       // 会话超时时间:30秒
        props.put("heartbeat.interval.ms", 3000);     // 心跳间隔:3秒
        props.put("enable.auto.commit", "false");     // 禁用自动提交,手动控制偏移量提交

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("optimized-topic"));

        while (true) {
            // 拉取消息,使用较短的超时时间提高响应性
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            if (!records.isEmpty()) {
                // 批量处理消息,使用异步处理提高吞吐量
                List<CompletableFuture<Void>> futures = new ArrayList<>();
                for (ConsumerRecord<String, String> record : records) {
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        processMessage(record);
                    });
                    futures.add(future);
                }

                // 等待所有消息处理完成
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

                // 手动提交偏移量,确保消息处理完成后再提交
                consumer.commitSync();
            }
        }
    }

    private void processMessage(ConsumerRecord<String, String> record) {
        // 模拟消息处理逻辑
        System.out.printf("Processing: offset = %d, key = %s, value = %s%n",
            record.offset(), record.key(), record.value());

        // 这里可以添加具体的业务处理逻辑
        // 例如:数据转换、存储到数据库、调用其他服务等
    }
}

消费者优化参数说明:

  • fetch.min.bytes: 控制每次拉取的最小字节数,减少小批量拉取的网络开销
  • fetch.max.wait.ms: 控制拉取的最大等待时间,平衡延迟和吞吐量
  • max.partition.fetch.bytes: 控制每个分区单次拉取的最大字节数,避免内存溢出
  • session.timeout.ms: 会话超时时间,影响消费者组的重平衡速度
  • heartbeat.interval.ms: 心跳间隔,用于维持消费者与协调者的连接
  • enable.auto.commit: 是否启用自动提交偏移量,手动提交可以更好地控制消费进度

优化策略:

  1. 批量处理: 使用 CompletableFuture 实现消息的异步批量处理
  2. 手动提交: 禁用自动提交,在处理完成后手动提交偏移量
  3. 合理配置: 根据业务需求调整拉取参数,平衡延迟和吞吐量
  4. 错误处理: 添加适当的异常处理,确保消费的稳定性

2. 监控告警

public class MonitoringConfig {
    public void setupMonitoring() {
        // JMX 监控配置
        System.setProperty("com.sun.management.jmxremote", "true");
        System.setProperty("com.sun.management.jmxremote.port", "9999");
        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
        System.setProperty("com.sun.management.jmxremote.ssl", "false");

        // 指标收集
        Metrics metrics = new Metrics();
        metrics.addReporter(new JmxReporter());
    }
}

3. 故障处理

public class FailureHandlingConsumer {
    public void handleConsumeFailure() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "failure-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("failure-topic"));

        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 业务处理逻辑
                        processMessage(record);
                        System.out.printf("Processed: offset = %d, key = %s%n",
                            record.offset(), record.key());
                    } catch (Exception e) {
                        System.err.printf("Failed to process: offset = %d, error = %s%n",
                            record.offset(), e.getMessage());

                        // 记录失败消息到死信队列
                        sendToDeadLetterQueue(record);
                    }
                }

                // 提交偏移量
                consumer.commitSync();

            } catch (Exception e) {
                System.err.println("Consumer error: " + e.getMessage());
                // 等待一段时间后重试
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    private void processMessage(ConsumerRecord<String, String> record) throws Exception {
        // 模拟业务处理
        if (Math.random() < 0.1) {
            throw new RuntimeException("模拟处理失败");
        }
    }

    private void sendToDeadLetterQueue(ConsumerRecord<String, String> record) {
        // 发送到死信队列
        System.out.printf("Sent to dead letter queue: offset = %d%n", record.offset());
    }
}

常见问题

1. 消息丢失问题

问题描述: 消息发送成功但消费者没有收到消息

可能原因:

  • 生产者确认级别设置不当
  • 消费者偏移量提交策略问题
  • 网络分区导致消息丢失

解决方案:

// 1. 设置合适的确认级别
props.put("acks", "all"); // 等待所有副本确认

// 2. 设置重试机制
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);

// 3. 消费者手动提交偏移量
props.put("enable.auto.commit", "false");
consumer.commitSync();

2. 消息重复消费

问题描述: 同一条消息被消费多次

可能原因:

  • 消费者重启导致偏移量回退
  • 网络问题导致偏移量提交失败
  • 事务回滚导致消息重复

解决方案:

// 1. 实现幂等性处理
public class IdempotentConsumer {
    private Set<Long> processedOffsets = new ConcurrentHashMap<>();

    public void consumeMessage(ConsumerRecord<String, String> record) {
        long offset = record.offset();

        // 检查消息是否已处理
        if (processedOffsets.contains(offset)) {
            System.out.println("Message already processed: " + offset);
            return;
        }

        // 处理消息
        processMessage(record);

        // 记录已处理的消息
        processedOffsets.add(offset);
    }
}

3. 消费积压问题

问题描述: 消费者处理速度跟不上消息生产速度

解决方案:

// 1. 增加消费者实例
// 2. 优化消费逻辑
// 3. 调整消费参数
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 100);
props.put("max.partition.fetch.bytes", 1048576);

// 4. 使用批量消费
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (ConsumerRecord<String, String> record : records) {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        processMessage(record);
    });
    futures.add(future);
}

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

4. 性能调优

JVM 参数调优:

# 生产环境推荐配置
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

Broker 配置优化:

# server.properties
# 网络线程数
num.network.threads=8
# IO线程数
num.io.threads=16
# 日志段大小
log.segment.bytes=1073741824
# 日志保留时间
log.retention.hours=168
# 压缩类型
compression.type=snappy

总结

Apache Kafka 是一个功能强大的分布式流处理平台,具有以下特点:

  1. 高吞吐量: 支持每秒百万级消息处理
  2. 低延迟: 毫秒级消息投递
  3. 持久化: 消息持久化到磁盘
  4. 分布式: 支持集群部署和自动分区
  5. 流处理: 支持实时流数据处理

在使用过程中,需要注意:

  1. 合理设计分区策略: 根据业务需求选择合适的分区数量
  2. 优化性能参数: 根据实际负载调整各种参数
  3. 实现幂等性: 确保消息处理的幂等性
  4. 监控告警: 建立完善的监控体系
  5. 故障处理: 制定完善的故障处理方案

通过本文的介绍,相信您已经对 Apache Kafka 有了全面的了解,可以开始在实际项目中应用了。

最近更新:: 2025/9/4 17:20
Contributors: Duke
Prev
RocketMQ
Next
RabbitMQ