下面我将为您详细讲解“Linux下Kafka单机安装配置方法(图文)”。 1. 准备工作 从Kafka官网上下载Kafka二进制包,并解压到本地目录。 2. 修改配置文件 进入Kafka的安装目录,找到config/serv
下面我将为您详细讲解“Linux下Kafka单机安装配置方法(图文)”。
1. 准备工作从Kafka官网上下载Kafka二进制包,并解压到本地目录。
2. 修改配置文件- 进入Kafka的安装目录,找到config/server.properties文件。
- 修改下列配置项:
# 监听端口号
listeners=PLAINTEXT://localhost:9092
# 数据日志目录
log.dirs=/tmp/kafka-logs
# zookeeper的地址和端口
zookeeper.connect=localhost:2181
3. 启动Kafka服务器
- 进入Kafka安装目录,执行以下命令启动Kafka服务器:
$ bin/kafka-server-start.sh config/server.properties
- 如果一切正常,Kafka服务器将启动并监听9092端口。
- 启动Kafka的消费者进程,消费者绑定到相同的Zookeeper节点:
$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
- 启动一个Kafka的生产者进程,将一些消息发送到test主题:
$ bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
- 在生产者终端输入消息并回车发送。在消费者终端,您应该能够看到已经接收到您刚才发送的消息。
假设您要将一个Spring应用程序与Kafka集成,请按照以下步骤进行:
- 添加以下依赖关系:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
- 添加以下配置项到您的Spring配置文件中:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka服务器的地址和端口
template:
default-topic: test # 默认主题名
- 创建一个消息生产者:
import org.springframework.kafka.core.KafkaTemplate;
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
- 创建一个消息消费者:
import org.springframework.kafka.annotation.KafkaListener;
public class KafkaConsumer {
@KafkaListener(topics = "test")
public void handleMessage(String message) {
System.out.println("Received message: " + message);
}
}
示例说明2
假设您要使用Kafka Streams API编写一个数据管道,请按照以下步骤进行:
自由互联热门推荐:PDF电子发票识别软件,一键识别电子发票并导入到Excel中!10大顶级数据挖掘软件!人工智能的十大作用!- 添加以下依赖关系:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
- 编写管道处理程序:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Processor {
@KafkaListener(topics = "input-topic")
public void process(KStream<String, String> input) {
KStream<String, String> output = input
.filter((key, value) -> value.contains("foobar"))
.mapValues(value -> value.toUpperCase());
output.to("output-topic");
}
}
- 在应用程序入口点中,创建和启动Kafka Streams管道:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.springframework.context.annotation.Bean;
public class KafkaStreamsApp {
@Bean
public KafkaStreams kafkaStreams(StreamsBuilder streamsBuilder) {
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, configuration());
streams.start();
return streams;
}
}
以上就是“Linux下Kafka单机安装配置方法(图文)”的完整攻略。希望对您有所帮助。