一、Broker 工作原理
1. ZK 中存储的 Kafka 信息
1 | $ cd /opt/app/zookeeper-3.7.0 |
- 大致目录结构如下:
1 | kafka |
2. Broker 工作流程

- 首先,每台 Broker 节点在启动时向 ZK 注册,表示当前节点可用
- Broker 中的 Controller 模块向 ZK 注册,第一个注册的将负责管理集群 Broker 的上下线,所有 Topic 的分区副本分配和 Leader 选举工作
- 注册的 Controller 监听 Broker 节点的变化,若 Leader 副本所在节点宕机,注册的 Controller 会根据 ZK 中的信息重新选举 Leader,之后更新 ZK 信息
- 注册的 Controller 进行 Leader 选举,选举规则为在 ISR 中存活为前提,按照 AR 中的顺序轮询(如
AR:[1,0,2], ISR:[2,0],则选 0) - 注册的 Controller 在 ZK 中维护 Leader、ISR 等信息
- 未注册的 Controller 从 ZK 中同步信息
1 | # 查看现有分区情况(Replicas即AR) |
3. Broker 重要参数
| 参数名称 | 描述 |
|---|---|
| replica.lag.time.max.ms | 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,就会被移除 ISR。默认 30s |
| auto.leader.rebalance.enable | 自动 Leader 再平衡。默认 true |
| leader.imbalance.per.broker.percentage | 每个 Broker 允许的不平衡的 Leader 的比率,超过该值则控制器会触发 Leader 再平衡。默认 10% |
| leader.imbalance.check.interval.seconds | 检查 Leader 负载是否平衡的间隔时间。默认 300s |
| log.segment.bytes | log 日志划分成 Segment 块的大小,默认 1G |
| log.index.interval.bytes | 每当向 log 文件写入了 4KB 大小的日志,就往 index 文件里面记录一个索引。默认 4KB |
| log.retention.hours | 数据保存的时间,默认 7 天 |
| log.retention.minutes | 数据保存的时间,分钟级别,优先级更高,默认关闭 |
| log.retention.ms | 数据保存的时间,毫秒级别,优先级最高,默认关闭 |
| log.retention.check.interval.ms | 检查数据是否超时的间隔,默认 5min |
| log.retention.bytes | 若超过设置的所有日志总大小,删除最早的 Segment。默认 -1,表示无穷大 |
| log.cleanup.policy | 日志清理策略。默认 delete,表示所有数据启用删除策略;若设为 compact,表示所有数据启用压缩策略 |
| num.io.threads | 负责写磁盘的线程数,参数值要占总核数的 50%。默认 8 |
| num.replica.fetchers | 副本拉取线程数,参数值占总核数 50% 的 1/3 |
| num.network.threads | 数据传输线程数,参数值占总核数 50% 的 2/3。默认 3 |
| log.flush.interval.messages | 强制页缓存刷写到磁盘的条数。默认 long 的最大值,一般不改,交给系统管理 |
| log.flush.interval.ms | 每隔多久,刷数据到磁盘。默认 null,一般不改,交给系统管理 |
二、Kafka 副本
1. 副本概述
- Kafka 默认 1 个副本,生产环境一般配置为 2 个,来保证数据可靠性。配置太多副本会增加磁盘存储空间,增加网络上的数据传输,降低效率
- Kafka 中的副本分为 Leader 和 Follower,生产者和消费者只与 Leader 交互(Hadoop 副本等价),Follower 主动找 Leader 进行同步数据
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas),且
AR = ISR + OSR- ISR:表示和 Leader 保持同步的 Leader 和 Follower 集合。如果 Follower 长时间(
replica.lag.time.max.ms,默认 30s)未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR 到 OSR。Leader 发生故障之后,就会从 ISR 中选举新的 Leader - OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本
- ISR:表示和 Leader 保持同步的 Leader 和 Follower 集合。如果 Follower 长时间(
2. Leader 和 Follower 故障处理细节
- LEO(Log End Offset):副本的最后一个 offset + 1
- HW(High Watermark):所有副本中最小的 LEO
- 消费者能见到的最大 offect 为
HW - 1
(1) Follower 故障处理细节

- Follower 故障后从 ISR 队列中移除,期间 Leader 节点正常接收数据
- 该 Follower 恢复后,读取本地磁盘记录的 HW,将 log 文件大于等于该 HW 的部分截取掉,从该 HW 开始向 Leader 同步
- 等该 Follower 的 LEO 大于等于现在的 HW 时,重新加入 ISR 队列
(2) Leader 故障处理细节

- Leader 故障后从 ISR 队列中移除,控制器从 ISR 中选出一个新的 Leader
- 为保证多个副本之间的数据一致性(不保证数据不丢失或不重复),其余的 Follower 会先将各自 log 文件高于 HW 的部分截取掉,然后从新的 Leader 同步数据
3. 分区副本分配
- 尽可能错开,保证负载均衡及数据可靠性
1 | $ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 16 --replication-factor 3 --topic test2 |
4. 调整分区副本分配
- 添加新 Broker:
- 克隆 hadoop101,修改 IP 和主机名为 hadoop104
- 删除 Kafka 目录下的 datas 和 logs 目录(有整个集群的唯一标识),并修改 broker.id 配置
- 启动 hadoop104 的 Kafka
- 调整分区副本分配:
1 | ## 之前创建的主题仍保留原有配置(不包含新节点) |
- 退役节点:
- 对旧节点上的主题进行重新分配
- 停止旧节点
5. Leader 再平衡
- 正常情况下,Kafka 会自动把 Leader 均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些 Broker 宕机,会导致 Leader 过于集中在几台 Broker 上,造成集群负载不均衡
- Kafka 默认开启 Leader 负载平衡(
auto.leader.rebalance.enable,默认 true,建议关闭)。每过leader.imbalance.check.interval.seconds(默认 300s)时间间隔进行检查,当 Broker 超过其允许的不平衡 Leader 的比率(leader.imbalance.per.broker.percentage,默认 10%),控制器会触发 Leader 的平衡
1 | $ bin/kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic test3 |
三、Kafka 文件存储
1. 文件存储机制
- Topic 是逻辑上的概念,而 Partition 是物理上的概念。每个 Partition 对应于一个 log 文件(虚拟),该 log 文件中存储的就是 Producer 生产的数据,会被不断追加到该 log 文件末端
- 为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 Partition 分为多个 Segment,块大小由
log.segment.bytes(默认 1G)指定 - 每个 Segment 包括:
.log日志文件、.index偏移量索引文件和.timeindex时间戳索引文件等,文件以当前 Segment 的第一条消息的 offset 命名。这些文件位于一个文件夹下,文件夹命名规则为<topic名称>-<分区号>,如 test-0
1 | $ ll data/ |
2. 稀疏索引
- index 为稀疏索引,大约每往 log 文件写入 4KB 数据(
log.index.interval.bytes),就会往 index 文件写入一条索引 - index 文件中保存的 offset 为相对值,这样能确保 offset 的值所占空间不会过大
- 如何定位到 offset=600 的记录?

3. 文件清理策略
- Kafka 中默认的日志保存时间为 7 天(
log.retention.hours),默认检查周期为 5min(log.retention.check.interval.ms) - 日志清理策略(
log.cleanup.policy)- delete:日志删除,默认
- 基于时间:默认打开,以 Segment 中所有记录中的最大时间戳作为该文件的时间戳,超时删该 Segment
- 基于大小:默认关闭,若超过设置的所有日志总大小(
log.retention.bytes,默认 -1,表示无穷大),则删除最早的 Segment
- compact:日志压缩,对于相同 key 的不同 value 值,只保留最后一个版本
- 压缩后的 offset 可能是不连续的,当从 offset 消费消息时,可能会拿到比这个 offset 大的 offset 对应的消息,并从这个位置开始消费
- 这种策略只适合特殊场景,如消息的 key 是用户 id,value 是用户资料,这样即使不连续也是最新的
- delete:日志删除,默认

四、高效读写数据
- Kafka 本身是分布式集群,并且采用分区技术,对海量数据切割存储,提高了生产者和消费者的并行度
- 读数据采用稀疏索引,可以快速定位到要消费的数据
- 顺序写磁盘:以追加的形式写 Segment
- 页缓存:Kafka 重度依赖底层操作系统提供的页缓存 PageCache 功能。当写操作时,操作系统只是将数据写入 PageCache(落盘由内核决定)。当读操作时,先从 PageCache 中查找,如果找不到,再从磁盘中加载到 PageCache
- 零拷贝:Kafka 的数据加工处理操作交由生产者和消费者处理。Kafka Broker 应用层不关心存储的数据,所以就不用走应用层,传输效率高
