当前位置 : 主页 > 编程语言 > java >

Apache Pulsar——producer consumer的Java API

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、添加pom.xml依赖 dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-client/artifactIdversion2.10.0/version/dependency 二、producer 2.1 producer 同步发送 /** * @Author: huangyibo * @Date: 2022/5/27 22:41 * @Description

一、添加pom.xml依赖

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.0</version> </dependency>

二、producer

2.1 producer 同步发送

/** * @Author: huangyibo * @Date: 2022/5/27 22:41 * @Description: Pulsar 生产者 同步发送 */ public class PulsarProduceSync { public static void main(String[] args) throws PulsarClientException { // 1 创建pulsar的客户端对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端对象进行构建生产者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; // 发送字符串,Schema的类型为:Schema.STRING Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(partitionedTopicName).create(); // 3 发送数据生产 producer.send("hello Pulsar"); // 4 释放资源 producer.close(); pulsarClient.close(); } }

2.2 producer 异步发送

/** * @Author: huangyibo * @Date: 2022/5/27 22:43 * @Description: Pulsar 生产者 异步发送 */ public class PulsarProduceAsync { public static void main(String[] args) throws PulsarClientException, InterruptedException { // 1 创建pulsar的客户端对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端对象进行构建生产者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; // 发送字符串,Schema的类型为:Schema.STRING Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(partitionedTopicName).create(); // 3 发送数据生产 producer.sendAsync("hello Async Pulsar"); // 异步发送, 会先将数据发送到客户端缓存中, 当缓存达到一批后才会进行批量发送 // 等待一定时间,等消息发送成功了,再关闭客户端 Thread.sleep(1000); // 4 释放资源 producer.close(); pulsarClient.close(); } }

2.3 producer Schema发送

/** * @Author: huangyibo * @Date: 2022/5/27 22:58 * @Description: Pulsar 生产者 Schema发送 */ public class PulsarProduceSchema { public static void main(String[] args) throws PulsarClientException { // 1 创建pulsar的客户端对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端对象进行构建生产者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; // 发送字符串,Schema的类型为:Schema.STRING Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class)) .topic(partitionedTopicName).create(); // 3 发送数据生产 User user = new User(); user.setName("张无忌"); user.setAge(20); producer.send(user); // 4 释放资源 producer.close(); pulsarClient.close(); } }

三、consumer

3.1 consumer 普通消费方式

/** * @Author: huangyibo * @Date: 2022/5/28 0:15 * @Description: Pulsar 消费者 */ public class PulsarConsumer { public static void main(String[] args) throws PulsarClientException { // 1 创建pulsar的客户端的对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端构建消费者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) // 可以传入多个topic .topic(partitionedTopicName) //可以消费多个topic //.topics() .subscriptionName("consumeTest") .subscriptionType(SubscriptionType.Exclusive) .subscribe(); // 3 循环从消费者读取数据 while(true) { // 接收消息 Message<String> message = consumer.receive(); try { // 获取消息 String msg = message.getValue(); // 处理数据 System.out.println("获取的数据为: " + msg); // ack确认操作,下次重启从ack的position开始消费数据 consumer.acknowledge(message); } catch(PulsarClientException e) { e.printStackTrace(); //消息消费失败 consumer.negativeAcknowledge(message); } } } }

3.2 consumer Schema方式

/** * @Author: huangyibo * @Date: 2022/5/28 0:23 * @Description: Pulsar 消费者 Schema方式 */ public class PulsarConsumerSchema { public static void main(String[] args) throws PulsarClientException { // 1 创建pulsar的客户端的对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端构建消费者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class)) // 可以传入多个topic .topic(partitionedTopicName) .subscriptionName("consumeTest") .subscriptionType(SubscriptionType.Shared) .subscribe(); // 3 循环从消费者读取数据 while(true) { // 接收消息 Message<User> message = consumer.receive(); try { // 获取消息 User user = message.getValue(); // 处理数据 System.out.println(user); // ack确认操作,下次重启从ack的position开始消费数据 consumer.acknowledge(message); } catch(PulsarClientException e) { e.printStackTrace(); //消息消费失败 consumer.negativeAcknowledge(message); } } } }

3.3 consumer 批量消费方式

/** * @Author: huangyibo * @Date: 2022/5/28 0:27 * @Description: Pulsar 消费者 批量消费方式 */ public class PulsarConsumerBatch { public static void main(String[] args) throws PulsarClientException { // 1 创建pulsar的客户端的对象 String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); // 2 基于客户端构建消费者对象 String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) // 可以传入多个topic .topic(partitionedTopicName) //可以消费多个topic //.topics() .subscriptionName("consumeTest") //设置批量读取数据 .batchReceivePolicy( BatchReceivePolicy.builder() // 1M .maxNumBytes(1024 * 1024) //最大消费消息条数 .maxNumMessages(100) //等待时间 .timeout(2000, TimeUnit.MILLISECONDS) .build() ).subscribe(); // 3 循环从消费者读取数据 while(true) { // 接收消息 Messages<String> messages = consumer.batchReceive(); messages.forEach(message -> { try { // 获取消息 String msg = message.getValue(); // 处理数据 System.out.println("获取的数据为: " + msg); // ack确认操作,下次重启从ack的position开始消费数据 consumer.acknowledge(message); } catch (PulsarClientException e) { e.printStackTrace(); //消息消费失败 consumer.negativeAcknowledge(message); } }); } } }
上一篇:Apache Pulsar——SpringBoot整合Pulsar
下一篇:没有了
网友评论