当前位置 : 主页 > 编程语言 > java >

Apache Pulsar——Java API操作tenant、namespace、topic

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、添加pom.xml依赖 dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-client/artifactIdversion2.10.0/version/dependency 二、tenant租户的Java API public class PulsarTenant { public static void main(String[] args) throws P

一、添加pom.xml依赖

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.0</version> </dependency>

二、tenant租户的Java API

public class PulsarTenant { public static void main(String[] args) throws PulsarClientException, PulsarAdminException { String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080"; // 1.创建pulsar的Admin管理对象 PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build(); // 2.基于pulsar的Admin对象进行相关的操作 // 2.1 创建租户 Set<String> allowedClusters = new HashSet<>(); allowedClusters.add("pulsar-cluster"); TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build(); pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo); // 2.2 查看当前有那些租户 List<String> tenants = pulsarAdmin.tenants().getTenants(); tenants.forEach(System.out::println); // 2.3 删除租户操作 pulsarAdmin.tenants().deleteTenant("my-tenant"); // 3.关闭管理对象 pulsarAdmin.close(); } }

三、namespace命令空间的Java API

public class PulsarNamespace { public static void main(String[] args) throws PulsarAdminException, PulsarClientException { String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080"; // 1.创建pulsar的Admin管理对象 PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build(); // 2.基于pulsar的Admin对象进行相关的操作 // 2.1 创建名称空间 String tenant = "my-tenant"; String namespace = tenant + "/my-ns"; pulsarAdmin.namespaces().createNamespace(namespace); // 2.2 获取租户下的名称空间列表 List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant); namespaces.forEach(System.out::println); // 2.3 删除名称空间 pulsarAdmin.namespaces().deleteNamespace(namespace); // 3.关闭管理对象 pulsarAdmin.close(); } }

四、topic的Java API

public class PulsarTopic { public static void main(String[] args) throws PulsarClientException, PulsarAdminException { String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080"; // 1.创建pulsar的Admin管理对象 PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build(); // 2.基于pulsar的Admin对象进行相关的操作 // 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化 String tenant = "my-tenant"; String namespace = tenant + "/my-ns"; String nonPartitionedTopicName = "non-persistent://my-tenant/my-ns/my-non-partitioned-topic"; String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"; // 2.2 创建无分区的topic pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName); // 2.3 创建有分区的topic pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3); // 2.4 修改有分区的Topic的分区数量 pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6); // 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic List<String> topics = pulsarAdmin.topics().getList(namespace); topics.forEach(System.out::println); // 2.6 查询当前有分区的topic列表 List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace); partitionedTopicList.forEach(System.out::println); // 2.7 查询有分区的Topic,有多少个分区 int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions; System.out.println(partitions); // 2.8 删除无分区的Topic pulsarAdmin.topics().delete(nonPartitionedTopicName); // 2.9 删除有分区的Topic pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName); // 3.关闭管理对象 pulsarAdmin.close(); } }
上一篇:Apache Pulsar——producer consumer的Java API
下一篇:没有了
网友评论