Kafka安装教程

Kafka网络配置

broker之间是怎么通过zookeeper连接的?

是的,你的理解基本正确!我来详细解释一下 Kafka 中 Zookeeper 的作用以及 Broker 如何通过 advertised.listeners​ 进行连接。


1. Zookeeper 的作用

在 Kafka 2.8 之前,Zookeeper 是 Kafka 的核心依赖,主要用于以下功能:

  1. Broker 注册

    • 每个 Broker 启动时,会向 Zookeeper 注册自己的信息,包括:

      • Broker ID
      • advertised.listeners​ 地址
      • 其他元数据(如支持的协议、端口等)。
    • Zookeeper 会维护一个 Broker 列表,记录集群中所有活跃的 Broker。

  2. 元数据管理

    • Zookeeper 存储 Kafka 集群的元数据,包括:

      • Topic 和 Partition 的信息。
      • 每个 Partition 的 Leader 和 ISR(In-Sync Replicas)。
      • Broker 的状态(如是否存活)。
  3. Leader 选举

    • 当某个 Partition 的 Leader 失效时,Zookeeper 会协调选举新的 Leader。
  4. 配置管理

    • Zookeeper 存储 Kafka 集群的配置信息,如 Topic 配置、Broker 配置等。

2. Broker 注册和连接过程

以下是 Broker 注册和连接的具体过程:

2.1 Broker 启动

  1. Broker 启动时

    • Broker 读取配置文件中的 advertised.listeners​ 和 listener​。
    • listener​ 用于 Broker 监听请求,通常是内部地址(如 0.0.0.0:9092​)。
    • advertised.listeners​ 是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。
  2. 向 Zookeeper 注册

    • Broker 启动后,会向 Zookeeper 注册自己的信息,包括:

      • Broker ID
      • advertised.listeners​ 地址
      • 其他元数据(如支持的协议、端口等)。
    • 注意:Broker 不会将 listener​ 的地址注册到 Zookeeper 中,而是注册 advertised.listeners​ 的地址。

2.2 其他 Broker 获取新 Broker 的信息

  1. 从 Zookeeper 获取元数据

    • 当一个新的 Broker 启动并注册到 Zookeeper 后,其他 Broker 会从 Zookeeper 获取更新的元数据。
    • 元数据中包含了新 Broker 的 advertised.listeners​ 地址。
  2. 通过 advertised.listeners连接新 Broker

    • 其他 Broker 使用新 Broker 的 advertised.listeners​ 地址与其建立连接。

    • 例如:

      • 新 Broker 的 advertised.listeners​ 是 kafka3.example.com:9092​。
      • 其他 Broker 会通过 kafka3.example.com:9092​ 连接到新 Broker。

2.3 分区复制和心跳检测

  1. 分区复制

    • 如果新 Broker 是某个 Partition 的 Follower,它会通过 Leader 的 advertised.listeners​ 地址连接到 Leader,并拉取数据。
  2. 心跳检测

    • Broker 之间通过 advertised.listeners​ 地址定期发送心跳消息,检测彼此的健康状态。

3. 为什么 listener的地址不注册到 Zookeeper?

  1. listener是内部地址

    • listener​ 是 Broker 监听的地址,通常是内部地址(如 0.0.0.0​ 或私有 IP)。
    • 这个地址可能对其他 Broker 或客户端不可达(尤其是在跨网络环境时)。
  2. advertised.listeners是对外地址

    • advertised.listeners​ 是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。
    • 这个地址通常是外部地址(如域名、公共 IP 或负载均衡器地址),确保所有组件都能访问。
  3. 统一访问入口

    • 通过 advertised.listeners​,Kafka 提供了一个统一的访问入口,屏蔽了内部网络的复杂性。

4. 实际场景示例

假设我们有一个 Kafka 集群,包含 3 个 Broker,部署在 Kubernetes 中。每个 Broker 的配置如下:

Broker 1

  • listener​:PLAINTEXT://0.0.0.0:9092​(监听所有网络接口)
  • advertised.listeners​:PLAINTEXT://kafka1.example.com:9092​(对外暴露的地址)

Broker 2

  • listener​:PLAINTEXT://0.0.0.0:9092​(监听所有网络接口)
  • advertised.listeners​:PLAINTEXT://kafka2.example.com:9092​(对外暴露的地址)

Broker 3

  • listener​:PLAINTEXT://0.0.0.0:9092​(监听所有网络接口)
  • advertised.listeners​:PLAINTEXT://kafka3.example.com:9092​(对外暴露的地址)

连接过程

  1. Broker 启动

    • Broker 1 启动后,向 Zookeeper 注册自己的 advertised.listeners​ 地址(kafka1.example.com:9092​)。
    • 类似地,Broker 2 和 Broker 3 也会注册自己的 advertised.listeners​ 地址。
  2. 获取元数据

    • Broker 1 从 Zookeeper 获取元数据,发现集群中有 3 个 Broker,地址分别为:

      • kafka1.example.com:9092
      • kafka2.example.com:9092
      • kafka3.example.com:9092
  3. 建立连接

    • Broker 1 通过 kafka2.example.com:9092​ 和 kafka3.example.com:9092​ 连接到 Broker 2 和 Broker 3。
    • 类似地,Broker 2 和 Broker 3 也会通过 advertised.listeners​ 地址连接到其他 Broker。
  4. 分区复制和心跳检测

    • Broker 之间通过 advertised.listeners​ 地址进行分区复制和心跳检测。

5. 总结

  • Zookeeper 的作用

    • 管理 Broker 的注册信息(包括 advertised.listeners​)。
    • 维护集群的元数据和状态。
  • Broker 的连接

    • Broker 之间通过 advertised.listeners​ 地址进行连接,而不是 listener​ 地址。
    • listener​ 是 Broker 监听的内部地址,不会注册到 Zookeeper 中。
    • advertised.listeners​ 是 Broker 对外暴露的地址,客户端和其他 Broker 使用这个地址连接。

通过这种方式,Kafka 能够灵活地适应各种网络环境,并确保 Broker 之间以及客户端与 Broker 之间的高效连接。


Docker部署单节点kafka

version: "2"

services:
  kafka:
    container_name: kafka
    image: 'bitnami/kafka:3.5'
    ports:
      - '19092:9092'
      - '19093:9093'
    environment:
      ### 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=1
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:19092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=1

Docker部署集群kafka

version: "2"

services:
  kafka1:
    container_name: kafka1
    image: 'bitnami/kafka:3.5'
    user: "root"
    ports:
      - '19092:9092'
      - '19093:9093'
    environment:
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=1
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:19092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=1
    volumes:
      - ./kafka/broker1:/bitnami/kafka:rw

  kafka2:
    container_name: kafka2
    image: 'bitnami/kafka:3.5'
    user: "root"
    ports:
      - '29092:9092'
      - '29093:9093'
    environment:
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=2
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:29092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=2
    volumes:
      - ./kafka/broker2:/bitnami/kafka:rw

  kafka3:
    container_name: kafka3
    image: 'bitnami/kafka:3.5'
    user: "root"
    ports:
      - '39092:9092'
      - '39093:9093'
    environment:
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=3
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.28:39092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=3
    volumes:
      - ./kafka/broker3:/bitnami/kafka:rw