DukeDuke
主页
关于我们
主页
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

Apache Kafka 技术文档

1. Kafka 简介

1.1 什么是 Kafka

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,现在是一个开源的分布式事件流平台。Kafka 被设计用于处理实时数据流,具有高吞吐量、低延迟、可扩展性和容错性等特点。

Kafka 的发展背景: 在传统的数据处理架构中,系统之间通常通过直接调用或数据库共享来实现数据交换,这种方式存在以下问题:

  • 紧耦合:系统之间依赖关系复杂,难以维护
  • 性能瓶颈:数据库成为系统瓶颈,影响整体性能
  • 数据一致性:难以保证分布式环境下的数据一致性
  • 扩展性差:难以应对高并发和大数据量的场景

Kafka 通过消息队列的方式解决了这些问题,提供了:

  • 解耦:生产者和消费者通过消息队列解耦
  • 异步处理:支持异步消息处理,提高系统性能
  • 高吞吐量:支持每秒百万级消息处理
  • 持久化:消息持久化存储,支持重放

1.2 Kafka 的核心特性

  • 高吞吐量: 支持每秒百万级消息处理

    • 基于磁盘的顺序写入,性能优异
    • 零拷贝技术,减少数据复制开销
    • 批量处理,提高网络利用率
  • 低延迟: 毫秒级消息传递延迟

    • 内存映射文件,快速读写
    • 异步处理,减少阻塞
    • 优化的网络协议
  • 可扩展性: 支持水平扩展

    • 分布式架构,支持集群部署
    • 动态添加节点,无需停机
    • 自动负载均衡
  • 持久化: 消息持久化存储

    • 可配置的保留策略
    • 支持消息重放
    • 数据压缩,节省存储空间
  • 容错性: 高可用性保证

    • 多副本机制
    • 自动故障转移
    • 数据一致性保证

1.3 Kafka vs 传统消息队列

为了更好地理解 Kafka 的优势,我们来看看它与传统消息队列的对比:

特性KafkaRabbitMQActiveMQ详细说明
吞吐量极高中等中等Kafka 基于磁盘顺序写入,性能优异
延迟低低低都支持低延迟消息传递
持久化强中等中等Kafka 专为持久化设计
扩展性优秀一般一般Kafka 天然支持分布式
复杂度中等低低Kafka 概念较多,学习成本高
生态丰富丰富一般Kafka 与大数据生态集成好
适用场景大数据流处理传统应用传统应用Kafka 更适合大数据场景

为什么选择 Kafka?

  1. 性能卓越:在相同硬件条件下,Kafka 的吞吐量通常比其他消息队列高 10-100 倍
  2. 生态丰富:与 Hadoop、Spark、Flink 等大数据组件深度集成
  3. 持久化设计:专为大数据场景设计,支持海量数据存储
  4. 流处理能力:不仅支持消息传递,还支持流处理
  5. 企业级特性:支持多租户、安全认证、监控等企业级功能

2. Kafka 架构

2.1 整体架构

Kafka 采用分布式架构,整个系统由多个组件协同工作。为了更好地理解 Kafka 的架构,我们先来看一个完整的架构图:

┌─────────────────────────────────────────────────────────────────┐
│                        Kafka Cluster                           │
├─────────────────────────────────────────────────────────────────┤
│  Producer 1  │  Producer 2  │  Producer N  │  Consumer Group   │
├─────────────────────────────────────────────────────────────────┤
│                    Kafka Broker 1                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │ Partition 0 │  │ Partition 1 │  │ Partition 2 │            │
│  │   Topic A   │  │   Topic A   │  │   Topic A   │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
├─────────────────────────────────────────────────────────────────┤
│                    Kafka Broker 2                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │ Partition 0 │  │ Partition 1 │  │ Partition 2 │            │
│  │   Topic B   │  │   Topic B   │  │   Topic B   │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
├─────────────────────────────────────────────────────────────────┤
│                    Kafka Broker 3                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │ Partition 0 │  │ Partition 1 │  │ Partition 2 │            │
│  │   Topic C   │  │   Topic C   │  │   Topic C   │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
├─────────────────────────────────────────────────────────────────┤
│                    Zookeeper Cluster                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │ ZK Node 1   │  │ ZK Node 2   │  │ ZK Node 3   │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘

架构详细说明:

  1. Producer(生产者):

    • 负责向 Kafka 发送消息
    • 支持批量发送和异步发送
    • 可以指定分区策略
    • 支持消息确认机制
  2. Consumer(消费者):

    • 从 Kafka 读取消息
    • 支持消费者组模式
    • 自动分区分配
    • 支持消息偏移量管理
  3. Broker(代理):

    • Kafka 集群中的服务器节点
    • 负责存储和转发消息
    • 管理分区和副本
    • 处理客户端请求
  4. Topic(主题):

    • 消息的分类标签
    • 支持多个分区
    • 可以配置保留策略
    • 支持压缩和清理
  5. Partition(分区):

    • Topic 的分片,提高并行度
    • 每个分区有序
    • 支持副本机制
    • 分区键决定消息路由
  6. Zookeeper:

    • 管理集群元数据
    • 选举 Leader
    • 监控 Broker 状态
    • 存储配置信息

2.2 核心组件详解

2.2.1 Producer(生产者)

Producer 是消息的生产者,负责向 Kafka 发送消息。它支持多种发送模式和配置选项。

主要功能:

  • 消息发送:向指定的 Topic 发送消息
  • 分区选择:根据分区策略选择目标分区
  • 批量发送:支持批量发送提高性能
  • 异步发送:支持异步发送提高吞吐量

发送模式:

// 1. 同步发送
producer.send(record).get();

// 2. 异步发送
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println("消息发送成功: " + metadata.offset());
        }
    }
});

// 3. 批量发送
producer.send(record1);
producer.send(record2);
producer.flush(); // 确保所有消息发送完成

分区策略:

// 1. 轮询策略(默认)
// 消息会轮询分配到各个分区

// 2. 随机策略
// 随机选择分区

// 3. 按键分区策略
// 根据消息键的哈希值选择分区
producer.send(new ProducerRecord<>("my-topic", "key", "value"));

// 4. 自定义分区策略
public class CustomPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }
}

2.2.2 Consumer(消费者)

Consumer 是消息的消费者,负责从 Kafka 读取消息。它支持消费者组模式和多种消费模式。

消费者组模式:

  • 多个消费者组成一个消费者组
  • 每个分区只能被组内的一个消费者消费
  • 支持水平扩展和负载均衡
  • 自动处理消费者加入和离开

消费模式:

// 1. 自动提交偏移量
Properties props = new Properties();
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// 2. 手动提交偏移量
props.put("enable.auto.commit", "false");
// 在消息处理完成后手动提交
consumer.commitSync();

// 3. 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                          Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        }
    }
});

消费控制:

// 1. 指定偏移量开始消费
consumer.seek(partition, offset);

// 2. 从最早位置开始消费
consumer.seekToBeginning(partitions);

// 3. 从最新位置开始消费
consumer.seekToEnd(partitions);

// 4. 暂停和恢复消费
consumer.pause(partitions);
consumer.resume(partitions);

2.2.3 Broker(代理)

Broker 是 Kafka 集群中的服务器节点,负责存储和转发消息。每个 Broker 可以管理多个 Topic 的分区。

主要职责:

  • 消息存储:将消息持久化到磁盘
  • 消息转发:将消息转发给消费者
  • 分区管理:管理分区的 Leader 和 Follower
  • 元数据管理:维护 Topic 和分区的元数据

存储机制:

Kafka 数据目录结构:
├── topic1-0/
│   ├── 00000000000000000000.log    # 消息日志文件
│   ├── 00000000000000000000.index  # 偏移量索引文件
│   └── 00000000000000000000.timeindex # 时间索引文件
├── topic1-1/
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.timeindex
└── __consumer_offsets/             # 消费者偏移量存储
    ├── 0/
    ├── 1/
    └── 2/

日志分段:

  • 每个分区由多个日志段组成
  • 每个日志段包含多个消息
  • 支持日志压缩和清理
  • 可配置的保留策略

2.3 分区和副本机制

2.3.1 分区机制

分区是 Kafka 实现并行处理的核心机制,每个分区都是有序的消息序列。

分区的优势:

  • 并行处理:多个分区可以并行处理
  • 负载均衡:消息分布到多个分区
  • 扩展性:可以动态增加分区数量
  • 容错性:单个分区故障不影响其他分区

分区策略:

// 1. 轮询分区(默认)
// 消息会轮询分配到各个分区

// 2. 按键分区
// 相同键的消息会发送到同一分区
producer.send(new ProducerRecord<>("topic", "key", "value"));

// 3. 指定分区
// 直接指定目标分区
producer.send(new ProducerRecord<>("topic", 0, "key", "value"));

2.3.2 副本机制

副本机制是 Kafka 实现高可用性的关键,每个分区可以有多个副本。

副本类型:

  • Leader 副本:处理读写请求的主副本
  • Follower 副本:同步 Leader 数据的从副本
  • ISR(In-Sync Replicas):与 Leader 同步的副本集合

副本分配策略:

# 副本分配策略配置
replica.assignment.strategy=org.apache.kafka.clients.admin.StickyAssignor

# 最小同步副本数
min.insync.replicas=2

# 副本同步超时时间
replica.lag.time.max.ms=10000

Leader 选举:

  • 当 Leader 副本故障时,从 ISR 中选择新的 Leader
  • 优先选择 ISR 中的第一个副本
  • 如果 ISR 为空,则从所有副本中选择
  • 确保数据一致性

3. Kafka 安装和配置

3.1 环境要求

  • Java 8 或更高版本
  • Zookeeper 3.4.13 或更高版本
  • 内存: 至少 4GB
  • 磁盘: 至少 10GB 可用空间
  • 网络: 稳定的网络连接

3.2 下载和安装

# 1. 下载 Kafka
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz

# 2. 解压
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

# 3. 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 4. 启动 Kafka
bin/kafka-server-start.sh config/server.properties

3.3 配置文件详解

3.3.1 server.properties(Kafka 配置)

# Broker 唯一标识
broker.id=0

# 监听地址和端口
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

# 日志目录
log.dirs=/tmp/kafka-logs

# Zookeeper 连接信息
zookeeper.connect=localhost:2181

# 网络配置
num.network.threads=3
num.io.threads=8

# 日志配置
num.partitions=3
num.recovery.threads.per.data.dir=1

# 日志保留策略
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# 压缩配置
log.cleanup.policy=delete
log.cleaner.enable=true

# 副本配置
default.replication.factor=3
min.insync.replicas=2

# 网络超时配置
replica.socket.timeout.ms=30000
replica.fetch.wait.max.ms=500

3.3.2 zookeeper.properties(Zookeeper 配置)

# 数据目录
dataDir=/tmp/zookeeper

# 客户端端口
clientPort=2181

# 最大客户端连接数
maxClientCnxns=0

# 会话超时时间
tickTime=2000
initLimit=5
syncLimit=2

3.4 集群部署

3.4.1 单机多节点部署

# 1. 创建多个配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties

# 2. 修改配置文件
# server-1.properties
broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-1

# server-2.properties
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-2

# server-3.properties
broker.id=3
listeners=PLAINTEXT://localhost:9095
log.dirs=/tmp/kafka-logs-3

# 3. 启动集群
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

3.4.2 分布式集群部署

# 1. 在每台服务器上安装 Kafka
# 2. 修改配置文件
broker.id=1  # 每台服务器使用不同的 ID
listeners=PLAINTEXT://server1:9092
advertised.listeners=PLAINTEXT://server1:9092
zookeeper.connect=server1:2181,server2:2181,server3:2181

# 3. 启动集群
# 在每台服务器上启动 Kafka
bin/kafka-server-start.sh config/server.properties

4. Kafka 核心概念

4.1 Topic 和 Partition

4.1.1 Topic 概念

Topic 是消息的分类标签,类似于数据库中的表。每个 Topic 可以有多个分区,分区是消息的物理存储单位。

Topic 的特点:

  • 持久化:消息持久化存储
  • 分区化:支持多个分区并行处理
  • 有序性:每个分区内消息有序
  • 可配置:支持多种配置选项

创建 Topic:

# 创建 Topic
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3

# 查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 Topic 详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

4.1.2 Partition 概念

Partition 是 Topic 的分片,每个分区都是有序的消息序列。分区是 Kafka 实现并行处理的核心机制。

分区的特点:

  • 有序性:分区内消息有序
  • 并行性:多个分区可以并行处理
  • 负载均衡:消息分布到多个分区
  • 容错性:单个分区故障不影响其他分区

分区策略:

// 1. 轮询分区(默认)
// 消息会轮询分配到各个分区

// 2. 按键分区
// 相同键的消息会发送到同一分区
producer.send(new ProducerRecord<>("topic", "key", "value"));

// 3. 指定分区
// 直接指定目标分区
producer.send(new ProducerRecord<>("topic", 0, "key", "value"));

4.2 Offset 和 Consumer Group

4.2.1 Offset 概念

Offset 是消息在分区中的位置标识,类似于数组的索引。每个消费者都会维护自己的偏移量,用于记录消费进度。

Offset 的特点:

  • 唯一性:每个消息都有唯一的偏移量
  • 有序性:偏移量按顺序递增
  • 持久化:偏移量持久化存储
  • 可配置:支持多种提交策略

Offset 管理:

// 1. 自动提交偏移量
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// 2. 手动提交偏移量
props.put("enable.auto.commit", "false");
// 在消息处理完成后手动提交
consumer.commitSync();

// 3. 指定偏移量开始消费
consumer.seek(partition, offset);

4.2.2 Consumer Group 概念

Consumer Group 是消费者的逻辑分组,每个消费者组可以消费一个或多个 Topic 的所有分区。

消费者组的特点:

  • 负载均衡:分区在消费者组内负载均衡
  • 容错性:消费者故障时自动重新分配分区
  • 扩展性:可以动态添加或删除消费者
  • 隔离性:不同消费者组独立消费

消费者组管理:

# 查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

# 重置消费者组偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --topic my-topic --execute

4.3 消息格式和序列化

4.3.1 消息格式

Kafka 消息由键值对组成,支持多种序列化格式。

消息结构:

┌─────────────────────────────────────────────────────────────┐
│                    Kafka Message                          │
├─────────────────────────────────────────────────────────────┤
│  Offset  │  Timestamp  │  Key Length  │  Key  │  Value Length │
│  (8字节)  │  (8字节)   │   (4字节)   │       │    (4字节)    │
├─────────────────────────────────────────────────────────────┤
│  Value   │  Headers   │  CRC32      │  Magic Byte          │
│          │            │  (4字节)    │  (1字节)             │
└─────────────────────────────────────────────────────────────┘

消息属性:

  • Offset:消息在分区中的位置
  • Timestamp:消息时间戳
  • Key:消息键,用于分区选择
  • Value:消息内容
  • Headers:消息头信息

4.3.2 序列化配置

// 1. 字符串序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. JSON 序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 3. Avro 序列化
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");

5. Kafka 核心 API

5.1 Producer API

5.1.1 基本使用

// 1. 创建 Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 2. 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);

// 3. 关闭 Producer
producer.close();

5.1.2 高级配置

// 1. 异步发送回调
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println("消息发送成功: " + metadata.offset());
        }
    }
});

// 2. 批量发送
for (int i = 0; i < 100; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key" + i, "value" + i);
    producer.send(record);
}
producer.flush(); // 确保所有消息发送完成

// 3. 事务发送
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

5.2 Consumer API

5.2.1 基本使用

// 1. 创建 Consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 2. 订阅 Topic
consumer.subscribe(Arrays.asList("my-topic"));

// 3. 消费消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("消费消息: " + record.value());
    }
}

5.2.2 高级配置

// 1. 手动提交偏移量
props.put("enable.auto.commit", "false");
consumer.commitSync();

// 2. 指定
最近更新:: 2025/10/11 10:54
Contributors: Duke
Prev
Flink 数据处理
Next
HBase 数据存储