Kafka安装教程
Kafka网络配置
broker之间是怎么通过zookeeper连接的?
是的,你的理解基本正确!我来详细解释一下 Kafka 中 Zookeeper 的作用以及 Broker 如何通过 advertised.listeners
进行连接。
1. Zookeeper 的作用
在 Kafka 2.8 之前,Zookeeper 是 Kafka 的核心依赖,主要用于以下功能:
-
Broker 注册:
-
每个 Broker 启动时,会向 Zookeeper 注册自己的信息,包括:
- Broker ID
advertised.listeners
地址- 其他元数据(如支持的协议、端口等)。
-
Zookeeper 会维护一个 Broker 列表,记录集群中所有活跃的 Broker。
-
-
元数据管理:
-
Zookeeper 存储 Kafka 集群的元数据,包括:
- Topic 和 Partition 的信息。
- 每个 Partition 的 Leader 和 ISR(In-Sync Replicas)。
- Broker 的状态(如是否存活)。
-
-
Leader 选举:
- 当某个 Partition 的 Leader 失效时,Zookeeper 会协调选举新的 Leader。
-
配置管理:
- Zookeeper 存储 Kafka 集群的配置信息,如 Topic 配置、Broker 配置等。
2. Broker 注册和连接过程
以下是 Broker 注册和连接的具体过程:
2.1 Broker 启动
-
Broker 启动时:
- Broker 读取配置文件中的
advertised.listeners
和listener
。 listener
用于 Broker 监听请求,通常是内部地址(如0.0.0.0:9092
)。advertised.listeners
是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。
- Broker 读取配置文件中的
-
向 Zookeeper 注册:
-
Broker 启动后,会向 Zookeeper 注册自己的信息,包括:
- Broker ID
advertised.listeners
地址- 其他元数据(如支持的协议、端口等)。
-
注意:Broker 不会将
listener
的地址注册到 Zookeeper 中,而是注册advertised.listeners
的地址。
-
2.2 其他 Broker 获取新 Broker 的信息
-
从 Zookeeper 获取元数据:
- 当一个新的 Broker 启动并注册到 Zookeeper 后,其他 Broker 会从 Zookeeper 获取更新的元数据。
- 元数据中包含了新 Broker 的
advertised.listeners
地址。
-
通过
advertised.listeners
连接新 Broker:-
其他 Broker 使用新 Broker 的
advertised.listeners
地址与其建立连接。 -
例如:
- 新 Broker 的
advertised.listeners
是kafka3.example.com:9092
。 - 其他 Broker 会通过
kafka3.example.com:9092
连接到新 Broker。
- 新 Broker 的
-
2.3 分区复制和心跳检测
-
分区复制:
- 如果新 Broker 是某个 Partition 的 Follower,它会通过 Leader 的
advertised.listeners
地址连接到 Leader,并拉取数据。
- 如果新 Broker 是某个 Partition 的 Follower,它会通过 Leader 的
-
心跳检测:
- Broker 之间通过
advertised.listeners
地址定期发送心跳消息,检测彼此的健康状态。
- Broker 之间通过
3. 为什么 listener
的地址不注册到 Zookeeper?
-
listener
是内部地址:listener
是 Broker 监听的地址,通常是内部地址(如0.0.0.0
或私有 IP)。- 这个地址可能对其他 Broker 或客户端不可达(尤其是在跨网络环境时)。
-
advertised.listeners
是对外地址:advertised.listeners
是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。- 这个地址通常是外部地址(如域名、公共 IP 或负载均衡器地址),确保所有组件都能访问。
-
统一访问入口:
- 通过
advertised.listeners
,Kafka 提供了一个统一的访问入口,屏蔽了内部网络的复杂性。
- 通过
4. 实际场景示例
假设我们有一个 Kafka 集群,包含 3 个 Broker,部署在 Kubernetes 中。每个 Broker 的配置如下:
Broker 1
listener
:PLAINTEXT://0.0.0.0:9092
(监听所有网络接口)advertised.listeners
:PLAINTEXT://kafka1.example.com:9092
(对外暴露的地址)
Broker 2
listener
:PLAINTEXT://0.0.0.0:9092
(监听所有网络接口)advertised.listeners
:PLAINTEXT://kafka2.example.com:9092
(对外暴露的地址)
Broker 3
listener
:PLAINTEXT://0.0.0.0:9092
(监听所有网络接口)advertised.listeners
:PLAINTEXT://kafka3.example.com:9092
(对外暴露的地址)
连接过程
-
Broker 启动:
- Broker 1 启动后,向 Zookeeper 注册自己的
advertised.listeners
地址(kafka1.example.com:9092
)。 - 类似地,Broker 2 和 Broker 3 也会注册自己的
advertised.listeners
地址。
- Broker 1 启动后,向 Zookeeper 注册自己的
-
获取元数据:
-
Broker 1 从 Zookeeper 获取元数据,发现集群中有 3 个 Broker,地址分别为:
kafka1.example.com:9092
kafka2.example.com:9092
kafka3.example.com:9092
-
-
建立连接:
- Broker 1 通过
kafka2.example.com:9092
和kafka3.example.com:9092
连接到 Broker 2 和 Broker 3。 - 类似地,Broker 2 和 Broker 3 也会通过
advertised.listeners
地址连接到其他 Broker。
- Broker 1 通过
-
分区复制和心跳检测:
- Broker 之间通过
advertised.listeners
地址进行分区复制和心跳检测。
- Broker 之间通过
5. 总结
-
Zookeeper 的作用:
- 管理 Broker 的注册信息(包括
advertised.listeners
)。 - 维护集群的元数据和状态。
- 管理 Broker 的注册信息(包括
-
Broker 的连接:
- Broker 之间通过
advertised.listeners
地址进行连接,而不是listener
地址。 listener
是 Broker 监听的内部地址,不会注册到 Zookeeper 中。advertised.listeners
是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。
- Broker 之间通过
通过这种方式,Kafka 能够灵活地适应各种网络环境,并确保 Broker 之间以及客户端与 Broker 之间的高效连接。
Docker部署单节点kafka
version: "2"
services:
kafka:
container_name: kafka
image: 'bitnami/kafka:3.5'
ports:
- '19092:9092'
- '19093:9093'
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:19092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=1
Docker部署集群kafka
version: "2"
services:
kafka1:
container_name: kafka1
image: 'bitnami/kafka:3.5'
user: "root"
ports:
- '19092:9092'
- '19093:9093'
environment:
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:19092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=1
volumes:
- ./kafka/broker1:/bitnami/kafka:rw
kafka2:
container_name: kafka2
image: 'bitnami/kafka:3.5'
user: "root"
ports:
- '29092:9092'
- '29093:9093'
environment:
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=2
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:29092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=2
volumes:
- ./kafka/broker2:/bitnami/kafka:rw
kafka3:
container_name: kafka3
image: 'bitnami/kafka:3.5'
user: "root"
ports:
- '39092:9092'
- '39093:9093'
environment:
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=3
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:39092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=3
volumes:
- ./kafka/broker3:/bitnami/kafka:rw