Apache Kafka 技术文档
1. Kafka 简介
1.1 什么是 Kafka
Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,现在是一个开源的分布式事件流平台。Kafka 被设计用于处理实时数据流,具有高吞吐量、低延迟、可扩展性和容错性等特点。
Kafka 的发展背景: 在传统的数据处理架构中,系统之间通常通过直接调用或数据库共享来实现数据交换,这种方式存在以下问题:
- 紧耦合:系统之间依赖关系复杂,难以维护
- 性能瓶颈:数据库成为系统瓶颈,影响整体性能
- 数据一致性:难以保证分布式环境下的数据一致性
- 扩展性差:难以应对高并发和大数据量的场景
Kafka 通过消息队列的方式解决了这些问题,提供了:
- 解耦:生产者和消费者通过消息队列解耦
- 异步处理:支持异步消息处理,提高系统性能
- 高吞吐量:支持每秒百万级消息处理
- 持久化:消息持久化存储,支持重放
1.2 Kafka 的核心特性
高吞吐量: 支持每秒百万级消息处理
- 基于磁盘的顺序写入,性能优异
- 零拷贝技术,减少数据复制开销
- 批量处理,提高网络利用率
低延迟: 毫秒级消息传递延迟
- 内存映射文件,快速读写
- 异步处理,减少阻塞
- 优化的网络协议
可扩展性: 支持水平扩展
- 分布式架构,支持集群部署
- 动态添加节点,无需停机
- 自动负载均衡
持久化: 消息持久化存储
- 可配置的保留策略
- 支持消息重放
- 数据压缩,节省存储空间
容错性: 高可用性保证
- 多副本机制
- 自动故障转移
- 数据一致性保证
1.3 Kafka vs 传统消息队列
为了更好地理解 Kafka 的优势,我们来看看它与传统消息队列的对比:
| 特性 | Kafka | RabbitMQ | ActiveMQ | 详细说明 |
|---|---|---|---|---|
| 吞吐量 | 极高 | 中等 | 中等 | Kafka 基于磁盘顺序写入,性能优异 |
| 延迟 | 低 | 低 | 低 | 都支持低延迟消息传递 |
| 持久化 | 强 | 中等 | 中等 | Kafka 专为持久化设计 |
| 扩展性 | 优秀 | 一般 | 一般 | Kafka 天然支持分布式 |
| 复杂度 | 中等 | 低 | 低 | Kafka 概念较多,学习成本高 |
| 生态 | 丰富 | 丰富 | 一般 | Kafka 与大数据生态集成好 |
| 适用场景 | 大数据流处理 | 传统应用 | 传统应用 | Kafka 更适合大数据场景 |
为什么选择 Kafka?
- 性能卓越:在相同硬件条件下,Kafka 的吞吐量通常比其他消息队列高 10-100 倍
- 生态丰富:与 Hadoop、Spark、Flink 等大数据组件深度集成
- 持久化设计:专为大数据场景设计,支持海量数据存储
- 流处理能力:不仅支持消息传递,还支持流处理
- 企业级特性:支持多租户、安全认证、监控等企业级功能
2. Kafka 架构
2.1 整体架构
Kafka 采用分布式架构,整个系统由多个组件协同工作。为了更好地理解 Kafka 的架构,我们先来看一个完整的架构图:
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
├─────────────────────────────────────────────────────────────────┤
│ Producer 1 │ Producer 2 │ Producer N │ Consumer Group │
├─────────────────────────────────────────────────────────────────┤
│ Kafka Broker 1 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │
│ │ Topic A │ │ Topic A │ │ Topic A │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Kafka Broker 2 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │
│ │ Topic B │ │ Topic B │ │ Topic B │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Kafka Broker 3 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │
│ │ Topic C │ │ Topic C │ │ Topic C │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Zookeeper Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ZK Node 1 │ │ ZK Node 2 │ │ ZK Node 3 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
架构详细说明:
Producer(生产者):
- 负责向 Kafka 发送消息
- 支持批量发送和异步发送
- 可以指定分区策略
- 支持消息确认机制
Consumer(消费者):
- 从 Kafka 读取消息
- 支持消费者组模式
- 自动分区分配
- 支持消息偏移量管理
Broker(代理):
- Kafka 集群中的服务器节点
- 负责存储和转发消息
- 管理分区和副本
- 处理客户端请求
Topic(主题):
- 消息的分类标签
- 支持多个分区
- 可以配置保留策略
- 支持压缩和清理
Partition(分区):
- Topic 的分片,提高并行度
- 每个分区有序
- 支持副本机制
- 分区键决定消息路由
Zookeeper:
- 管理集群元数据
- 选举 Leader
- 监控 Broker 状态
- 存储配置信息
2.2 核心组件详解
2.2.1 Producer(生产者)
Producer 是消息的生产者,负责向 Kafka 发送消息。它支持多种发送模式和配置选项。
主要功能:
- 消息发送:向指定的 Topic 发送消息
- 分区选择:根据分区策略选择目标分区
- 批量发送:支持批量发送提高性能
- 异步发送:支持异步发送提高吞吐量
发送模式:
// 1. 同步发送
producer.send(record).get();
// 2. 异步发送
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("消息发送成功: " + metadata.offset());
}
}
});
// 3. 批量发送
producer.send(record1);
producer.send(record2);
producer.flush(); // 确保所有消息发送完成
分区策略:
// 1. 轮询策略(默认)
// 消息会轮询分配到各个分区
// 2. 随机策略
// 随机选择分区
// 3. 按键分区策略
// 根据消息键的哈希值选择分区
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
// 4. 自定义分区策略
public class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
}
}
2.2.2 Consumer(消费者)
Consumer 是消息的消费者,负责从 Kafka 读取消息。它支持消费者组模式和多种消费模式。
消费者组模式:
- 多个消费者组成一个消费者组
- 每个分区只能被组内的一个消费者消费
- 支持水平扩展和负载均衡
- 自动处理消费者加入和离开
消费模式:
// 1. 自动提交偏移量
Properties props = new Properties();
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// 2. 手动提交偏移量
props.put("enable.auto.commit", "false");
// 在消息处理完成后手动提交
consumer.commitSync();
// 3. 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
消费控制:
// 1. 指定偏移量开始消费
consumer.seek(partition, offset);
// 2. 从最早位置开始消费
consumer.seekToBeginning(partitions);
// 3. 从最新位置开始消费
consumer.seekToEnd(partitions);
// 4. 暂停和恢复消费
consumer.pause(partitions);
consumer.resume(partitions);
2.2.3 Broker(代理)
Broker 是 Kafka 集群中的服务器节点,负责存储和转发消息。每个 Broker 可以管理多个 Topic 的分区。
主要职责:
- 消息存储:将消息持久化到磁盘
- 消息转发:将消息转发给消费者
- 分区管理:管理分区的 Leader 和 Follower
- 元数据管理:维护 Topic 和分区的元数据
存储机制:
Kafka 数据目录结构:
├── topic1-0/
│ ├── 00000000000000000000.log # 消息日志文件
│ ├── 00000000000000000000.index # 偏移量索引文件
│ └── 00000000000000000000.timeindex # 时间索引文件
├── topic1-1/
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.index
│ └── 00000000000000000000.timeindex
└── __consumer_offsets/ # 消费者偏移量存储
├── 0/
├── 1/
└── 2/
日志分段:
- 每个分区由多个日志段组成
- 每个日志段包含多个消息
- 支持日志压缩和清理
- 可配置的保留策略
2.3 分区和副本机制
2.3.1 分区机制
分区是 Kafka 实现并行处理的核心机制,每个分区都是有序的消息序列。
分区的优势:
- 并行处理:多个分区可以并行处理
- 负载均衡:消息分布到多个分区
- 扩展性:可以动态增加分区数量
- 容错性:单个分区故障不影响其他分区
分区策略:
// 1. 轮询分区(默认)
// 消息会轮询分配到各个分区
// 2. 按键分区
// 相同键的消息会发送到同一分区
producer.send(new ProducerRecord<>("topic", "key", "value"));
// 3. 指定分区
// 直接指定目标分区
producer.send(new ProducerRecord<>("topic", 0, "key", "value"));
2.3.2 副本机制
副本机制是 Kafka 实现高可用性的关键,每个分区可以有多个副本。
副本类型:
- Leader 副本:处理读写请求的主副本
- Follower 副本:同步 Leader 数据的从副本
- ISR(In-Sync Replicas):与 Leader 同步的副本集合
副本分配策略:
# 副本分配策略配置
replica.assignment.strategy=org.apache.kafka.clients.admin.StickyAssignor
# 最小同步副本数
min.insync.replicas=2
# 副本同步超时时间
replica.lag.time.max.ms=10000
Leader 选举:
- 当 Leader 副本故障时,从 ISR 中选择新的 Leader
- 优先选择 ISR 中的第一个副本
- 如果 ISR 为空,则从所有副本中选择
- 确保数据一致性
3. Kafka 安装和配置
3.1 环境要求
- Java 8 或更高版本
- Zookeeper 3.4.13 或更高版本
- 内存: 至少 4GB
- 磁盘: 至少 10GB 可用空间
- 网络: 稳定的网络连接
3.2 下载和安装
# 1. 下载 Kafka
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
# 2. 解压
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
# 3. 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 4. 启动 Kafka
bin/kafka-server-start.sh config/server.properties
3.3 配置文件详解
3.3.1 server.properties(Kafka 配置)
# Broker 唯一标识
broker.id=0
# 监听地址和端口
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
# 日志目录
log.dirs=/tmp/kafka-logs
# Zookeeper 连接信息
zookeeper.connect=localhost:2181
# 网络配置
num.network.threads=3
num.io.threads=8
# 日志配置
num.partitions=3
num.recovery.threads.per.data.dir=1
# 日志保留策略
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 压缩配置
log.cleanup.policy=delete
log.cleaner.enable=true
# 副本配置
default.replication.factor=3
min.insync.replicas=2
# 网络超时配置
replica.socket.timeout.ms=30000
replica.fetch.wait.max.ms=500
3.3.2 zookeeper.properties(Zookeeper 配置)
# 数据目录
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# 最大客户端连接数
maxClientCnxns=0
# 会话超时时间
tickTime=2000
initLimit=5
syncLimit=2
3.4 集群部署
3.4.1 单机多节点部署
# 1. 创建多个配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
# 2. 修改配置文件
# server-1.properties
broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-1
# server-2.properties
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-2
# server-3.properties
broker.id=3
listeners=PLAINTEXT://localhost:9095
log.dirs=/tmp/kafka-logs-3
# 3. 启动集群
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
3.4.2 分布式集群部署
# 1. 在每台服务器上安装 Kafka
# 2. 修改配置文件
broker.id=1 # 每台服务器使用不同的 ID
listeners=PLAINTEXT://server1:9092
advertised.listeners=PLAINTEXT://server1:9092
zookeeper.connect=server1:2181,server2:2181,server3:2181
# 3. 启动集群
# 在每台服务器上启动 Kafka
bin/kafka-server-start.sh config/server.properties
4. Kafka 核心概念
4.1 Topic 和 Partition
4.1.1 Topic 概念
Topic 是消息的分类标签,类似于数据库中的表。每个 Topic 可以有多个分区,分区是消息的物理存储单位。
Topic 的特点:
- 持久化:消息持久化存储
- 分区化:支持多个分区并行处理
- 有序性:每个分区内消息有序
- 可配置:支持多种配置选项
创建 Topic:
# 创建 Topic
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3
# 查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
4.1.2 Partition 概念
Partition 是 Topic 的分片,每个分区都是有序的消息序列。分区是 Kafka 实现并行处理的核心机制。
分区的特点:
- 有序性:分区内消息有序
- 并行性:多个分区可以并行处理
- 负载均衡:消息分布到多个分区
- 容错性:单个分区故障不影响其他分区
分区策略:
// 1. 轮询分区(默认)
// 消息会轮询分配到各个分区
// 2. 按键分区
// 相同键的消息会发送到同一分区
producer.send(new ProducerRecord<>("topic", "key", "value"));
// 3. 指定分区
// 直接指定目标分区
producer.send(new ProducerRecord<>("topic", 0, "key", "value"));
4.2 Offset 和 Consumer Group
4.2.1 Offset 概念
Offset 是消息在分区中的位置标识,类似于数组的索引。每个消费者都会维护自己的偏移量,用于记录消费进度。
Offset 的特点:
- 唯一性:每个消息都有唯一的偏移量
- 有序性:偏移量按顺序递增
- 持久化:偏移量持久化存储
- 可配置:支持多种提交策略
Offset 管理:
// 1. 自动提交偏移量
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// 2. 手动提交偏移量
props.put("enable.auto.commit", "false");
// 在消息处理完成后手动提交
consumer.commitSync();
// 3. 指定偏移量开始消费
consumer.seek(partition, offset);
4.2.2 Consumer Group 概念
Consumer Group 是消费者的逻辑分组,每个消费者组可以消费一个或多个 Topic 的所有分区。
消费者组的特点:
- 负载均衡:分区在消费者组内负载均衡
- 容错性:消费者故障时自动重新分配分区
- 扩展性:可以动态添加或删除消费者
- 隔离性:不同消费者组独立消费
消费者组管理:
# 查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
# 重置消费者组偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --topic my-topic --execute
4.3 消息格式和序列化
4.3.1 消息格式
Kafka 消息由键值对组成,支持多种序列化格式。
消息结构:
┌─────────────────────────────────────────────────────────────┐
│ Kafka Message │
├─────────────────────────────────────────────────────────────┤
│ Offset │ Timestamp │ Key Length │ Key │ Value Length │
│ (8字节) │ (8字节) │ (4字节) │ │ (4字节) │
├─────────────────────────────────────────────────────────────┤
│ Value │ Headers │ CRC32 │ Magic Byte │
│ │ │ (4字节) │ (1字节) │
└─────────────────────────────────────────────────────────────┘
消息属性:
- Offset:消息在分区中的位置
- Timestamp:消息时间戳
- Key:消息键,用于分区选择
- Value:消息内容
- Headers:消息头信息
4.3.2 序列化配置
// 1. 字符串序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. JSON 序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 3. Avro 序列化
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
5. Kafka 核心 API
5.1 Producer API
5.1.1 基本使用
// 1. 创建 Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 2. 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
// 3. 关闭 Producer
producer.close();
5.1.2 高级配置
// 1. 异步发送回调
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("消息发送成功: " + metadata.offset());
}
}
});
// 2. 批量发送
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key" + i, "value" + i);
producer.send(record);
}
producer.flush(); // 确保所有消息发送完成
// 3. 事务发送
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
5.2 Consumer API
5.2.1 基本使用
// 1. 创建 Consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 2. 订阅 Topic
consumer.subscribe(Arrays.asList("my-topic"));
// 3. 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("消费消息: " + record.value());
}
}
5.2.2 高级配置
// 1. 手动提交偏移量
props.put("enable.auto.commit", "false");
consumer.commitSync();
// 2. 指定
