当前位置 : 主页 > 编程语言 > java >

Apache Pulsar——Adaptor适配器

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、Pulsar Adaptor on Kafka 适配器 Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。   在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平

一、Pulsar Adaptor on Kafka 适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。

 

在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平台中,那么Pulsar adaptor on kafka就变的非常的有用了,它可以帮助我们在不改变原有kafka的代码基础上,即可接入pulsar,但是需要注意,相关配置信息需要进行一些调整,例如:地址与topic。

1.1 需要导入Pulsar兼容kafka的依赖包

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> </dependency>

1.2 编写生产者

public class KafkaAdaptorProducer { public static void main(String[] args) throws Exception { //1. 创建kafka生产者的核心类对象: KafkaProducer //1.1: 创建生产者配置对象: 设置相关配置 Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.put("acks", "all"); // 消息的确认方案 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化类型 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化类型Producer<String, String> producer = new KafkaProducer<>(props); Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); } //3. 释放资源 producer.close(); } }

1.3 编写消费者

public class KafkaAdaptorConsumer { public static void main(String[] args) { //1. 创建kafka的消费者的核心对象: KafkaConsumer //1.1: 创建消费者配置对象, 并设置相关的参数: Properties props = new Properties(); props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.setProperty("group.id", "test"); // 消费者组的 id props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量 props.setProperty("auto.commit.interval.ms","1000");//每间隔多长时间提交一次偏移量:单位 毫秒 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key 反序列化 props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // val 发序列化 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //2. 给消费者设置订阅topic: consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1")); //3. 循环获取相关的消息数据 while (true) { //3.1: 从kafka中获取消息数据: 参数表示等待超时时间 // 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息 for (ConsumerRecord<String, String> record : records) { String massage = record.value(); System.out.println("消息数据为:"+massage); } } } }

二、Pulsar Adaptor on Spark 适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从Pulsar接收原始数据。

 

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理

2.1 导入相关的依赖包

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-spark</artifactId> <version>2.8.0</version> </dependency>

2.2 编写spark的流式代码

import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.spark.SparkStreamingPulsarReceiver; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import java.util.HashSet; import java.util.Set; /** * @Author: huangyibo * @Date: 2022/6/6 22:40 * @Description: */ public class SparkStreamingAdaptor { public static void main(String[] args) throws InterruptedException { String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; String topic = "persistent://public/default/test_src"; String subs = "test_sub"; //1. 创建Java Spark Streaming 对象 SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Adaptor"); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10)); //2. 设置数据源: 从Pulsar中读取数据 ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData<>(); Set<String> set = new HashSet<>(); set.add(topic); pulsarConf.setTopicNames(set); pulsarConf.setSubscriptionName(subs); SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl, pulsarConf, new AuthenticationDisabled()); JavaReceiverInputDStream<byte[]> lineStream = streamingContext.receiverStream(pulsarReceiver); //3. 对接收到数据进行处理 JavaDStream<String> stream = lineStream.map((Function<byte[], String>) String::new); //4. 输出操作 stream.print(); //5. 启动 streamingContext.start(); streamingContext.awaitTermination(); } }
上一篇:Apache Pulsar——协议适配器
下一篇:没有了
网友评论