一、添加pom.xml依赖 dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-client/artifactIdversion2.10.0/version/dependency 二、producer 2.1 producer 同步发送 /** * @Author: huangyibo * @Date: 2022/5/27 22:41 * @Description
一、添加pom.xml依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.0</version>
</dependency>
二、producer
2.1 producer 同步发送
/**
* @Author: huangyibo
* @Date: 2022/5/27 22:41
* @Description: Pulsar 生产者 同步发送
*/
public class PulsarProduceSync {
public static void main(String[] args) throws PulsarClientException {
// 1 创建pulsar的客户端对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端对象进行构建生产者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
// 发送字符串,Schema的类型为:Schema.STRING
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(partitionedTopicName).create();
// 3 发送数据生产
producer.send("hello Pulsar");
// 4 释放资源
producer.close();
pulsarClient.close();
}
}
2.2 producer 异步发送
/**
* @Author: huangyibo
* @Date: 2022/5/27 22:43
* @Description: Pulsar 生产者 异步发送
*/
public class PulsarProduceAsync {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
// 1 创建pulsar的客户端对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端对象进行构建生产者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
// 发送字符串,Schema的类型为:Schema.STRING
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(partitionedTopicName).create();
// 3 发送数据生产
producer.sendAsync("hello Async Pulsar");
// 异步发送, 会先将数据发送到客户端缓存中, 当缓存达到一批后才会进行批量发送
// 等待一定时间,等消息发送成功了,再关闭客户端
Thread.sleep(1000);
// 4 释放资源
producer.close();
pulsarClient.close();
}
}
2.3 producer Schema发送
/**
* @Author: huangyibo
* @Date: 2022/5/27 22:58
* @Description: Pulsar 生产者 Schema发送
*/
public class PulsarProduceSchema {
public static void main(String[] args) throws PulsarClientException {
// 1 创建pulsar的客户端对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端对象进行构建生产者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
// 发送字符串,Schema的类型为:Schema.STRING
Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class))
.topic(partitionedTopicName).create();
// 3 发送数据生产
User user = new User();
user.setName("张无忌");
user.setAge(20);
producer.send(user);
// 4 释放资源
producer.close();
pulsarClient.close();
}
}
三、consumer
3.1 consumer 普通消费方式
/**
* @Author: huangyibo
* @Date: 2022/5/28 0:15
* @Description: Pulsar 消费者
*/
public class PulsarConsumer {
public static void main(String[] args) throws PulsarClientException {
// 1 创建pulsar的客户端的对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端构建消费者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
// 可以传入多个topic
.topic(partitionedTopicName)
//可以消费多个topic
//.topics()
.subscriptionName("consumeTest")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
// 3 循环从消费者读取数据
while(true) {
// 接收消息
Message<String> message = consumer.receive();
try {
// 获取消息
String msg = message.getValue();
// 处理数据
System.out.println("获取的数据为: " + msg);
// ack确认操作,下次重启从ack的position开始消费数据
consumer.acknowledge(message);
} catch(PulsarClientException e) {
e.printStackTrace();
//消息消费失败
consumer.negativeAcknowledge(message);
}
}
}
}
3.2 consumer Schema方式
/**
* @Author: huangyibo
* @Date: 2022/5/28 0:23
* @Description: Pulsar 消费者 Schema方式
*/
public class PulsarConsumerSchema {
public static void main(String[] args) throws PulsarClientException {
// 1 创建pulsar的客户端的对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端构建消费者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class))
// 可以传入多个topic
.topic(partitionedTopicName)
.subscriptionName("consumeTest")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
// 3 循环从消费者读取数据
while(true) {
// 接收消息
Message<User> message = consumer.receive();
try {
// 获取消息
User user = message.getValue();
// 处理数据
System.out.println(user);
// ack确认操作,下次重启从ack的position开始消费数据
consumer.acknowledge(message);
} catch(PulsarClientException e) {
e.printStackTrace();
//消息消费失败
consumer.negativeAcknowledge(message);
}
}
}
}
3.3 consumer 批量消费方式
/**
* @Author: huangyibo
* @Date: 2022/5/28 0:27
* @Description: Pulsar 消费者 批量消费方式
*/
public class PulsarConsumerBatch {
public static void main(String[] args) throws PulsarClientException {
// 1 创建pulsar的客户端的对象
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
// 2 基于客户端构建消费者对象
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
// 可以传入多个topic
.topic(partitionedTopicName)
//可以消费多个topic
//.topics()
.subscriptionName("consumeTest")
//设置批量读取数据
.batchReceivePolicy(
BatchReceivePolicy.builder()
// 1M
.maxNumBytes(1024 * 1024)
//最大消费消息条数
.maxNumMessages(100)
//等待时间
.timeout(2000, TimeUnit.MILLISECONDS)
.build()
).subscribe();
// 3 循环从消费者读取数据
while(true) {
// 接收消息
Messages<String> messages = consumer.batchReceive();
messages.forEach(message -> {
try {
// 获取消息
String msg = message.getValue();
// 处理数据
System.out.println("获取的数据为: " + msg);
// ack确认操作,下次重启从ack的position开始消费数据
consumer.acknowledge(message);
} catch (PulsarClientException e) {
e.printStackTrace();
//消息消费失败
consumer.negativeAcknowledge(message);
}
});
}
}
}