前言本文翻译自stackoverflow中kafka最佳回答, 我相信这篇文章能够解决kafka初学者关于kafka网络配置的99.999%的问题
当客户机想要从Apache Kafka®发送或接收一个消息; 有两种类型的连接, 必须被建立:
- init 连接到bootstrap。 它将元数据返回给客户端,包括所有集群中的代理(broker)的列表及其连接的端点。
- 然后客户端连接到 1.init中返回的一个(或更多)的代理(broker)的元数据(metadata)。 如果代理(broker)没有被正确配置,连接将失败。
往往人们只关注上面的第1步, 结果被步骤2的网络配置卡住。 步骤1 中返回的代理(broker)详细信息被定义为 advertised.listeners
--->必须是可以从客户端机器解析的。阅读更多关于协议,看到 kafka documentation 。
下面, 我使用一个客户端连接到kafka在各种排列的部署拓扑。 他们使用Python编写librdkafka ( confluent_kafka
), 原则适用于所有客户的语言。 你可以找到的 代码在GitHub上 。
假设我们有两个服务器。 一个是我们的客户,另一方面是我们的kafka集群的单一代理(暂时不考虑, kafka集群通常有至少三个代理(broker))。
- 客户端发起一bootstrap server(s),这是部署在集群上一个(或更多)的代理(broker)。
- 代理(broker)返回元数据,包括集群中所有可以达到的代理(broker)的主机(host)和端口(port)。
- 这个列表是客户机随后用于建立连接来, 生成或使用数据的列表。 初始连接中使用的地址只是为在集群 n 代理(broker)中找到一个引导服务器(bootstrap), 然后这个bootstrap给client返回所有代理(broker)的列表。 这样, 客户端不需要在配置文件中维护所有的代理(broker)的信息列表。 客户端之所以需要知道每一个broker的信息, 是因为客户端之后需要直接连接到一个或多个主题(topic)的代理(broker)上, 并基于分区(partition)来操作消息。
经常出现的错误是: 代理(broker)配置(server.properties
)的返回地址( advertised.listener
)是客户端不能解析的。 在这种情况下,时序图大概是这样的:
- 客户端发起一bootstrap server(s),它是部署在集群中的一个(或更多)的代理(broker).
- 代理(broker)返回一个不正确的主机名(hostname)到客户端(client)
- 客户机试图连接到这个错误的地址,然后失败(因为kafka代理(broker)不是在客户端机器
localhost
上, 而在broker-0
上)
所有这些例子使用的是一个代理(broker), 这对任何实际环境完全无用。 在实践中,你会有一个最少的三个broker cluster。 你的客户会对一个(或更多)这样的引导,代理(broker)将返回的元数据 集群中的每个代理(broker) 给客户端。
场景0:客户端和kafka同样运行在本地机器上对于这个示例,我在 [Confluent Platform], 但是你也可以在本地复现这个kafka运行情况。
$ confluent local status kafka
…
kafka is [UP]
zookeeper is [UP]
我的Python客户机连接与引导服务器设置的 localhost: 9092
.
这工作得很好:
注意:代理(broker)返回元数据 192.168.10.83
,但由于这是我本地机器的IP,它顺利运行。
现在让我们检查连接到kafka代理(broker)运行在另一台机器。 这可能是一个机器在你的本地网络,或者运行在云基础设施如亚马逊网络服务(AWS),微软Azure,或谷歌云平台(GCP)。
在这个例子中,我的client在我的笔记本上运行,连接到在局域网上运行的另一台机器 asgard03
上的kafka:
初始连接成功。 但请注意 BrokerMetadata
; 我们得到的返回值显示有一个代理(broker)和一个主机名 localhost
。 这意味着我们的客户使用 localhost
试图连接到一个代理,来生产和消费信息。 这是坏消息,因为在客户的机器上,没有kafka代理(broker) localhost
. 所以这时候无论是生产还是消费都是没办法通过这个``asgard'03`机器上的kafka服务来传递的.
因此会发生上面视频里的事情:
那么我们如何解决呢? 我们去找可爱的kafka管理员(很可能是我们自己)并修复在broker(s)上错误配置的的server.properties
. 只有在配置中advertised.listeners
正确提供的主机名和端口; 代理(broker)才可以联系到客户(client)。 上面我们看到返回 localhost
, 而这显然无法在lan上建立client和broker上的联系。 让我们去解决这个问题。
在我代理(broker)的 server.properties
, 原先错误的代理(broker)长这样:
advertised.listeners=PLAINTEXT://localhost:9092
listeners=PLAINTEXT://0.0.0.0:9092
和修改之后正确的 advertised.listeners
配置:
advertised.listeners=PLAINTEXT://asgard03.moffatt.me:9092
listeners=PLAINTEXT://0.0.0.0:9092
listener
本身保持不变(它与所有可用的网卡绑定在端口9092上)。 唯一的区别在于,此listeber
会告诉客户到达 asgard03.moffatt.me
而不是 localhost
。
所以在应用这些更改 advertised.listener
在每个代理(broker)和重新启动每个其中之一,生产者和消费者正常工作:
代理(broker)现在显示正确的主机名解析元数据从客户端。
场景2:卡夫卡和客户机运行在Docker现在我们要进入docker的精彩世界。 docker网络本身是一个野兽,我不打算在这里让读者详细了解它, 在本文中只需要搞懂kafka的listener
相关配置就可以了, 志于docker network config那是另外的话题。
如果你还记得一件事: 当您运行在docker,它执行一个容器在自己的小世界。 它似乎成为自己的主机名,自己的网络地址,它自己的文件系统。 比如,当你问代码在一个docker容器连接 localhost
,它将连接到 本身 和 不是 您正在运行的主机。 这是令人费解的,因为人们习惯在使用电脑的 localhost
时是默认连接到本地的主机的。 但是记住, 不是你的电脑在运行的代码。 而是你的电脑上运行的某个docker容器在运行这些代码, 这些容器拥有独立的网络地址空间。
我们将从最简单的排列方式, 在docker在同一个docker网络运行kafka和我们的客户。 首先,创建一个Dockerfile包括Python客户机到docker的容器:
FROM python:3
# We'll add netcat cos it's a really useful
# network troubleshooting tool
RUN apt-get update
RUN apt-get install -y netcat
# Install the Confluent Kafka python library
RUN pip install confluent_kafka
# Add our script
ADD python_kafka_test_client.py /
ENTRYPOINT [ "python", "/python_kafka_test_client.py"]
构建docker形象:
docker build -t python_kafka_test_client .
然后提供一个kafka代理(broker):
docker network create rmoff_kafka
docker run --network=rmoff_kafka --rm --detach --name zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:5.5.0
docker run --network=rmoff_kafka --rm --detach --name broker \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.5.0
确认你有两个容器运行:一个ApacheZooKeeper 和一个 Kafka broker:
$ docker ps
IMAGE STATUS PORTS NAMES
confluentinc/cp-kafka:5.5.0 Up 32 seconds 0.0.0.0:9092->9092/tcp broker
confluentinc/cp-zookeeper:5.5.0 Up 33 seconds 2181/tcp, 2888/tcp, 3888/tcp zookeeper
请注意,我们正在创造自己的docker的网络运行这些容器,这样zookeeper和kafka就可以交流。
让我们转到客户端,看看会发生什么:
$ docker run --network=rmoff_kafka --rm --name python_kafka_test_client \
--tty python_kafka_test_client broker:9092
在返回的元数据中可以看到,即使我们成功地连接到代理(broker)最初,它给了我们 localhost
代理(broker)主机。
试着修复它? 由于 Kafka 代理(broker)在网络上的名称是 broker(继承自其容器名称),我们需要将advertised_listener
设置为broker
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
修改后
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:9092 \
现在我们的代理(broker)是这样的:
docker stop broker
docker run --network=rmoff_kafka --rm --detach --name broker \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.5.0
和client的交互就完美:
场景3:卡夫卡在docker-compose使用docker compose可以减少输入docker 命令行的次数特别是当需要配置的参数很多的时候。
关闭场景二中的容器docker rm -f broker; docker rm -f zookeeper
; 然后创建 docker-compose.yml
在本地使用这个 例子 。
---
version: '3.5'
networks:
rmoff_kafka:
name: rmoff_kafka
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
container_name: zookeeper
networks:
- rmoff_kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:5.5.0
container_name: broker
networks:
- rmoff_kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
client:
image: python_kafka_test_client
container_name: python_kafka_test_client
depends_on:
- broker
networks:
- rmoff_kafka
entrypoint:
- bash
- -c
- |
echo 'Giving Kafka a bit of time to start up…'
sleep 30
# Run the client code
python /python_kafka_test_client.py broker:9092
确保你在与上述相同的文件夹中 docker-compose.yml
运行:
docker-compose up
你会看到zookeeper和kafka代理(broker); 然后Python测试客户端:
很好,嗯