当前位置 : 主页 > 编程语言 > 其它开发 >

我的 Python/Java/Spring/Go/Whatever Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS

来源:互联网 收集:自由互联 发布时间:2022-06-13
My Python/Java/Spring/Go/Whatever Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS/My Brother’s Laptop. Please Help! 本文翻译自stackoverflow中kafka最佳回答, 我相信这篇文章能够解决kafka初学者关于kaf
My Python/Java/Spring/Go/Whatever Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS/My Brother’s Laptop. Please Help!

本文翻译自stackoverflow中kafka最佳回答, 我相信这篇文章能够解决kafka初学者关于kafka网络配置的99.999%的问题

前言

当客户机想要从Apache Kafka®发送或接收一个消息; 有两种类型的连接, 必须被建立:

  1. init 连接到bootstrap。 它将元数据返回给客户端,包括所有集群中的代理(broker)的列表及其连接的端点。
  2. 然后客户端连接到 1.init中返回的一个(或更多)的代理(broker)的元数据(metadata)。 如果代理(broker)没有被正确配置,连接将失败。

往往人们只关注上面的第1步, 结果被步骤2的网络配置卡住。 步骤1 中返回的代理(broker)详细信息被定义为 advertised.listeners --->必须是可以从客户端机器解析的。阅读更多关于协议,看到 kafka documentation 。

下面, 我使用一个客户端连接到kafka在各种排列的部署拓扑。 他们使用Python编写librdkafka ( confluent_kafka ), 原则适用于所有客户的语言。 你可以找到的 代码在GitHub上 。

kafka的插图示例客户机连接到代理(Broker)

假设我们有两个服务器。 一个是我们的客户,另一方面是我们的kafka集群的单一代理(暂时不考虑, kafka集群通常有至少三个代理(broker))。

  1. 客户端发起一bootstrap server(s),这是部署在集群上一个(或更多)的代理(broker)。
    Client ➝ Kafka Broker: Hi! I'm going to read some messages!
  2. 代理(broker)返回元数据,包括集群中所有可以达到的代理(broker)的主机(host)和端口(port)。Kafka Broker ➝ Client: Sounds good. There's just one broker here. You can reach me at
  3. 这个列表是客户机随后用于建立连接来, 生成或使用数据的列表。 初始连接中使用的地址只是为在集群 n 代理(broker)中找到一个引导服务器(bootstrap), 然后这个bootstrap给client返回所有代理(broker)的列表。 这样, 客户端不需要在配置文件中维护所有的代理(broker)的信息列表。 客户端之所以需要知道每一个broker的信息, 是因为客户端之后需要直接连接到一个或多个主题(topic)的代理(broker)上, 并基于分区(partition)来操作消息。
    Client ➝ Kafka Broker: Gimme messages for topic

经常出现的错误是: 代理(broker)配置(server.properties)的返回地址( advertised.listener )是客户端不能解析的。 在这种情况下,时序图大概是这样的:

  1. 客户端发起一bootstrap server(s),它是部署在集群中的一个(或更多)的代理(broker).
    Client ➝ Kafka Broker: Hi! I'm going to read some messages!
  2. 代理(broker)返回一个不正确的主机名(hostname)到客户端(client)
    Kafka Broker ➝ Client: Sounds good. There's just one broker here. You can reach me at
  3. 客户机试图连接到这个错误的地址,然后失败(因为kafka代理(broker)不是在客户端机器localhost上, 而在broker-0上)
    Client: Gimme messages for topic  | Kafka Broker: ???
只是一个代理(broker)?

所有这些例子使用的是一个代理(broker), 这对任何实际环境完全无用。 在实践中,你会有一个最少的三个broker cluster。 你的客户会对一个(或更多)这样的引导,代理(broker)将返回的元数据 集群中的每个代理(broker) 给客户端。

场景0:客户端和kafka同样运行在本地机器上

对于这个示例,我在 [Confluent Platform], 但是你也可以在本地复现这个kafka运行情况。

$ confluent local status kafka
…
kafka is [UP]
zookeeper is [UP]

我的Python客户机连接与引导服务器设置的 localhost: 9092

Single machine

这工作得很好:

注意:代理(broker)返回元数据 192.168.10.83 ,但由于这是我本地机器的IP,它顺利运行。

场景1:客户端和kafka运行在不同的机器上

现在让我们检查连接到kafka代理(broker)运行在另一台机器。 这可能是一个机器在你的本地网络,或者运行在云基础设施如亚马逊网络服务(AWS),微软Azure,或谷歌云平台(GCP)。

Client machine | Broker machine (asgard03)

在这个例子中,我的client在我的笔记本上运行,连接到在局域网上运行的另一台机器 asgard03 上的kafka:

初始连接成功。 但请注意 BrokerMetadata ; 我们得到的返回值显示有一个代理(broker)和一个主机名 localhost 这意味着我们的客户使用 localhost 试图连接到一个代理,来生产和消费信息。 这是坏消息,因为在客户的机器上,没有kafka代理(broker) localhost . 所以这时候无论是生产还是消费都是没办法通过这个``asgard'03`机器上的kafka服务来传递的.

Client machine | Broker machine (asgard03)

因此会发生上面视频里的事情:

那么我们如何解决呢? 我们去找可爱的kafka管理员(很可能是我们自己)并修复在broker(s)上错误配置的的server.properties. 只有在配置中advertised.listeners 正确提供的主机名和端口; 代理(broker)才可以联系到客户(client)。 上面我们看到返回 localhost , 而这显然无法在lan上建立client和broker上的联系。 让我们去解决这个问题。

在我代理(broker)的 server.properties , 原先错误的代理(broker)长这样:

advertised.listeners=PLAINTEXT://localhost:9092
listeners=PLAINTEXT://0.0.0.0:9092

和修改之后正确的 advertised.listeners 配置:

advertised.listeners=PLAINTEXT://asgard03.moffatt.me:9092
listeners=PLAINTEXT://0.0.0.0:9092

Client machine | Broker machine (asgard03)

listener本身保持不变(它与所有可用的网卡绑定在端口9092上)。 唯一的区别在于,此listeber会告诉客户到达 asgard03.moffatt.me 而不是 localhost

所以在应用这些更改 advertised.listener 在每个代理(broker)和重新启动每个其中之一,生产者和消费者正常工作:

代理(broker)现在显示正确的主机名解析元数据从客户端。

场景2:卡夫卡和客户机运行在Docker

现在我们要进入docker的精彩世界。 docker网络本身是一个野兽,我不打算在这里让读者详细了解它, 在本文中只需要搞懂kafka的listener相关配置就可以了, 志于docker network config那是另外的话题。

如果你还记得一件事: 当您运行在docker,它执行一个容器在自己的小世界。 它似乎成为自己的主机名,自己的网络地址,它自己的文件系统。 比如,当你问代码在一个docker容器连接 localhost ,它将连接到 本身不是 您正在运行的主机。 这是令人费解的,因为人们习惯在使用电脑的 localhost 时是默认连接到本地的主机的。 但是记住, 不是你的电脑在运行的代码。 而是你的电脑上运行的某个docker容器在运行这些代码, 这些容器拥有独立的网络地址空间。

Docker host (e.g., your laptop) – Container: Client | Container: Kafka broker

我们将从最简单的排列方式, 在docker在同一个docker网络运行kafka和我们的客户。 首先,创建一个Dockerfile包括Python客户机到docker的容器:

FROM python:3

# We'll add netcat cos it's a really useful
# network troubleshooting tool
RUN apt-get update
RUN apt-get install -y netcat

# Install the Confluent Kafka python library
RUN pip install confluent_kafka

# Add our script
ADD python_kafka_test_client.py /
ENTRYPOINT [ "python", "/python_kafka_test_client.py"]

构建docker形象:

docker build -t python_kafka_test_client .

然后提供一个kafka代理(broker):

docker network create rmoff_kafka
docker run --network=rmoff_kafka --rm --detach --name zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:5.5.0
docker run --network=rmoff_kafka --rm --detach --name broker \
           -p 9092:9092 \
           -e KAFKA_BROKER_ID=1 \
           -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
           -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
           -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
           confluentinc/cp-kafka:5.5.0

确认你有两个容器运行:一个ApacheZooKeeper 和一个 Kafka broker:

$ docker ps
IMAGE                              STATUS              PORTS                          NAMES
confluentinc/cp-kafka:5.5.0        Up 32 seconds       0.0.0.0:9092->9092/tcp         broker
confluentinc/cp-zookeeper:5.5.0    Up 33 seconds       2181/tcp, 2888/tcp, 3888/tcp   zookeeper

请注意,我们正在创造自己的docker的网络运行这些容器,这样zookeeper和kafka就可以交流。

让我们转到客户端,看看会发生什么:

$ docker run --network=rmoff_kafka --rm --name python_kafka_test_client \
        --tty python_kafka_test_client broker:9092

在返回的元数据中可以看到,即使我们成功地连接到代理(broker)最初,它给了我们 localhost 代理(broker)主机。

Docker host (e.g., your laptop) – Container: Client | Container: Broker

试着修复它? 由于 Kafka 代理(broker)在网络上的名称是 broker(继承自其容器名称),我们需要将advertised_listener设置为broker

    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \

修改后

    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:9092 \

Docker host (e.g., your laptop) – Container: Client | Container: Kafka broker
现在我们的代理(broker)是这样的:

docker stop broker
docker run --network=rmoff_kafka --rm --detach --name broker \
           -p 9092:9092 \
           -e KAFKA_BROKER_ID=1 \
           -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
           -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:9092 \
           -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
           confluentinc/cp-kafka:5.5.0

和client的交互就完美:

场景3:卡夫卡在docker-compose

使用docker compose可以减少输入docker 命令行的次数特别是当需要配置的参数很多的时候。
关闭场景二中的容器docker rm -f broker; docker rm -f zookeeper; 然后创建 docker-compose.yml 在本地使用这个 例子 。

---
version: '3.5'

networks: 
  rmoff_kafka:
    name: rmoff_kafka

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    container_name: zookeeper
    networks: 
      - rmoff_kafka
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  broker:
    image: confluentinc/cp-kafka:5.5.0
    container_name: broker
    networks: 
      - rmoff_kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  client:
    image: python_kafka_test_client
    container_name: python_kafka_test_client
    depends_on: 
      - broker
    networks: 
      - rmoff_kafka
    entrypoint: 
      - bash 
      - -c 
      - |
        echo 'Giving Kafka a bit of time to start up…'
        sleep 30
        # Run the client code
        python /python_kafka_test_client.py broker:9092

确保你在与上述相同的文件夹中 docker-compose.yml 运行:

docker-compose up

你会看到zookeeper和kafka代理(broker); 然后Python测试客户端:

很好,嗯

网友评论