Kafka Java示例 简介 Kafka是一个高性能、分布式流处理平台,广泛用于构建实时数据流应用程序。它由Apache Software Foundation开发和维护,以高吞吐量、可扩展性和持久性为特点。 Kafka提供了
Kafka Java示例
简介
Kafka是一个高性能、分布式流处理平台,广泛用于构建实时数据流应用程序。它由Apache Software Foundation开发和维护,以高吞吐量、可扩展性和持久性为特点。
Kafka提供了一种发布/订阅模型,通过主题(topic)进行数据的发布和消费。生产者将数据发布到特定的主题,而消费者则订阅该主题并消费数据。这种模型使得Kafka非常适合处理实时数据流,如日志收集、流式处理和事件驱动的架构等。
本文将介绍如何使用Java编写Kafka生产者和消费者,并提供了详细的代码示例。
环境设置
在开始之前,需要确保已经安装了以下环境:
- Apache Kafka:可以从官方网站(
- Java开发环境:确保已经安装了Java开发环境。
生产者示例
首先,我们来看一个简单的Kafka生产者示例。生产者负责将消息发布到Kafka集群中的主题。
以下是示例代码:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "Hello, Kafka!");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent successfully: " + metadata.topic() + " - " +
metadata.partition() + " - " + metadata.offset());
}
}
});
// 关闭Kafka生产者
producer.close();
}
}
在上述示例中,我们首先配置了Kafka生产者的属性,包括Kafka集群的地址、消息的键值序列化器等。然后,我们创建了一个KafkaProducer对象,并通过send方法发送了一条消息到名为"test-topic"的主题中。最后,我们关闭了KafkaProducer。
消费者示例
接下来,我们来看一个简单的Kafka消费者示例。消费者负责从Kafka集群中的主题中订阅并消费消息。
以下是示例代码:
import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置Kafka消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-group");
// 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
在上述示例中,我们首先配置了Kafka消费者的属性,包括Kafka集群的地址、消息的键值反序列化器等。然后,我们创建了一个KafkaConsumer对象,并通过subscribe方法订阅了名为"test-topic"的主题。最后,我们通过poll方法循环消费消息,并打印出接收到的消息内容。
序列图
以下是Kafka生产者和消费者之间的交互序列图:
sequenceDiagram
participant Producer
participant Kafka
participant Consumer
Producer->>Kafka: 发送消息
Kafka-->>Consumer: 转