一、添加pom.xml依赖 dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-client/artifactIdversion2.10.0/version/dependency 二、producer 2.1 producer 同步发送 /** * @Author: huangyibo * @Date: 2022/5/27 22:41 * @Description
           
        
        
                    
                一、添加pom.xml依赖
<dependency>
	<groupId>org.apache.pulsar</groupId>
	<artifactId>pulsar-client</artifactId>
	<version>2.10.0</version>
</dependency>
二、producer
2.1 producer 同步发送
/**
 * @Author: huangyibo
 * @Date: 2022/5/27 22:41
 * @Description: Pulsar 生产者 同步发送
 */
public class PulsarProduceSync {
    public static void main(String[] args) throws PulsarClientException {
        // 1 创建pulsar的客户端对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端对象进行构建生产者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        // 发送字符串,Schema的类型为:Schema.STRING
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(partitionedTopicName).create();
        // 3 发送数据生产
        producer.send("hello Pulsar");
        // 4 释放资源
        producer.close();
        pulsarClient.close();
    }
}
2.2 producer 异步发送
/**
 * @Author: huangyibo
 * @Date: 2022/5/27 22:43
 * @Description: Pulsar 生产者 异步发送
 */
public class PulsarProduceAsync {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 1 创建pulsar的客户端对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端对象进行构建生产者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        // 发送字符串,Schema的类型为:Schema.STRING
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(partitionedTopicName).create();
        // 3 发送数据生产
        producer.sendAsync("hello Async Pulsar");
        // 异步发送, 会先将数据发送到客户端缓存中, 当缓存达到一批后才会进行批量发送
        // 等待一定时间,等消息发送成功了,再关闭客户端
        Thread.sleep(1000);
        // 4 释放资源
        producer.close();
        pulsarClient.close();
    }
}
2.3 producer Schema发送
/**
 * @Author: huangyibo
 * @Date: 2022/5/27 22:58
 * @Description: Pulsar 生产者 Schema发送
 */
public class PulsarProduceSchema {
    public static void main(String[] args) throws PulsarClientException {
        // 1 创建pulsar的客户端对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端对象进行构建生产者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        // 发送字符串,Schema的类型为:Schema.STRING
        Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class))
                .topic(partitionedTopicName).create();
        // 3 发送数据生产
        User user = new User();
        user.setName("张无忌");
        user.setAge(20);
        producer.send(user);
        // 4 释放资源
        producer.close();
        pulsarClient.close();
    }
}
三、consumer
3.1 consumer 普通消费方式
/**
 * @Author: huangyibo
 * @Date: 2022/5/28 0:15
 * @Description: Pulsar 消费者
 */
public class PulsarConsumer {
    public static void main(String[] args) throws PulsarClientException {
        // 1 创建pulsar的客户端的对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端构建消费者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                // 可以传入多个topic
                .topic(partitionedTopicName)
                //可以消费多个topic
                //.topics()
                .subscriptionName("consumeTest")
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
        // 3 循环从消费者读取数据
        while(true) {
            // 接收消息
            Message<String> message = consumer.receive();
            try {
                // 获取消息
                String msg = message.getValue();
                // 处理数据
                System.out.println("获取的数据为: " + msg);
                // ack确认操作,下次重启从ack的position开始消费数据
                consumer.acknowledge(message);
            } catch(PulsarClientException e) {
                e.printStackTrace();
                //消息消费失败
                consumer.negativeAcknowledge(message);
            }
        }
    }
}
3.2 consumer Schema方式
/**
 * @Author: huangyibo
 * @Date: 2022/5/28 0:23
 * @Description: Pulsar 消费者 Schema方式
 */
public class PulsarConsumerSchema {
    public static void main(String[] args) throws PulsarClientException {
        // 1 创建pulsar的客户端的对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端构建消费者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class))
                // 可以传入多个topic
                .topic(partitionedTopicName)
                .subscriptionName("consumeTest")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
        // 3 循环从消费者读取数据
        while(true) {
            // 接收消息
            Message<User> message = consumer.receive();
            try {
                // 获取消息
                User user = message.getValue();
                // 处理数据
                System.out.println(user);
                // ack确认操作,下次重启从ack的position开始消费数据
                consumer.acknowledge(message);
            } catch(PulsarClientException e) {
                e.printStackTrace();
                //消息消费失败
                consumer.negativeAcknowledge(message);
            }
        }
    }
}
3.3 consumer 批量消费方式
/**
 * @Author: huangyibo
 * @Date: 2022/5/28 0:27
 * @Description: Pulsar 消费者 批量消费方式
 */
public class PulsarConsumerBatch {
    public static void main(String[] args) throws PulsarClientException {
        // 1 创建pulsar的客户端的对象
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
        // 2 基于客户端构建消费者对象
        String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                // 可以传入多个topic
                .topic(partitionedTopicName)
                //可以消费多个topic
                //.topics()
                .subscriptionName("consumeTest")
                //设置批量读取数据
                .batchReceivePolicy(
                    BatchReceivePolicy.builder()
                        // 1M
                        .maxNumBytes(1024 * 1024)
                        //最大消费消息条数
                        .maxNumMessages(100)
                        //等待时间
                        .timeout(2000, TimeUnit.MILLISECONDS)
                        .build()
                ).subscribe();
        // 3 循环从消费者读取数据
        while(true) {
            // 接收消息
            Messages<String> messages = consumer.batchReceive();
            messages.forEach(message -> {
                try {
                    // 获取消息
                    String msg = message.getValue();
                    // 处理数据
                    System.out.println("获取的数据为: " + msg);
                    // ack确认操作,下次重启从ack的position开始消费数据
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                    //消息消费失败
                    consumer.negativeAcknowledge(message);
                }
            });
        }
    }
}