目录
- 1、添加依赖
- 2、创建会话
- 3、创建节点
- 4、删除节点
- 5、获取数据
- 6、更新数据
从编码风格上来讲,curator提供了基于Fluent的编程风格支持
1、添加依赖
在pom.xml文件中添加如下内容:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency>
2、创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很⼤。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下:
1. 使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
其中参数RetryPolicy提供重试策略的接口,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)
2. 通过调用CuratorFramework中的start()方法来启动会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy); client.start();
其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的底层都是通过第三个⽅法builder来实现的。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); private static CuratorFramework Client = CuratorFrameworkFactory.builder() .connectString("server1:2181,server2:2181,server3:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .build(); client.start();
参数:
connectString:zk的server地址,多个server之间使⽤英⽂逗号分隔开
connectionTimeoutMs: 连 接 超 时 时 间 , 如 上 是 30s, 默 认 是 15
ssessionTimeoutMs: 会 话 超 时 时 间 , 如 上 是 50s, 默 认 是 60s
retryPolicy:失败重试策略
ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间
计 算 公 式 : 当 前 sleep 时 间 =baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
maxRetries:最大重试次数
maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。
其他,查看org.apache.curator.RetryPolicy接⼝的实现类
start():完成会话的创建
package com.lagou.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class CreateSession { // 创建会话 public static void main(String[] args) { // 不使用fluent编程风格 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("8.142.8.105:2181", retryPolicy); curatorFramework.start(); System.out.println("会话被建立了"); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.142.8.105:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2被创建了"); } }
需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离
3、创建节点
curator提供了⼀系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
下面简单介绍⼀下常用的几个节点创建场景。
(1)创建⼀个初始内容为空的节点
client.create().forPath(path);
Curator默认创建的是持久节点,内容为空。
(2)创建⼀个包含内容的节点
client.create().forPath(path,"我是内容".getBytes());
Curator和ZkClient不同的是依旧采用Zookeeper原生API的⻛格,内容使用byte[]作为方法参数。
(3)递归创建父节点,并选择节点类型
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
creatingParentsIfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中⼀个可能的原因就是试图对⼀个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断⼀下该父节点是否存在——这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动地递归创建所有需要的⽗节点。
下面通过一个实际例子来演示如何在代码中使用这些API。
package com.lagou.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class CreateNote_curator { // 创建会话 public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.142.8.105:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2被创建了"); // 创建节点 String path = "/lg-curator/c1"; String s = client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes()); System.out.println("节点递归创建成功,该节点路径:" + s); } }
4、删除节点
删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用新增不同的方法调用即可。
(1)删除一个子节点
client.delete().forPath(path);
(2)删除节点并递归删除其子节点
client.delete().deletingChildrenIfNeeded().forPath(path);
(3)指定版本进行删除
client.delete().withVersion(1).forPath(path);
如果此版本已经不存在,则删除异常,异常信息如下。
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =BadVersion for
(4)强制保证删除一个节点
client.delete().guaranteed().forPath(path);
只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到⼀些网络异常的情况,此guaranteed的强制删除就会很有效果。
演示实例:
package com.lagou.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class DeleteNote_curator { // 创建会话 public static void main(String[] args) throws Exception { // 不使用fluent编程风格 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.142.8.105:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2被创建了"); // 删除节点 String path = "/lg-curator"; client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path); System.out.println("删除成功,删除的节点:" + path); } }
5、获取数据
获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息
// 普通查询 client.getData().forPath(path); // 包含状态查询 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path);
演示:
package com.lagou.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class GetNote_curator { // 创建会话 public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.142.8.105:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2被创建了"); // 创建节点 String path = "/lg-curator/c1"; String s = client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes()); System.out.println("节点递归创建成功,该节点路径:" + s); // 获取节点的数据内容以及状态信息 // 数据内容 byte[] bytes = client.getData().forPath(path); System.out.println("获取到的节点数据内容:" + new String(bytes)); // 状态信息 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("获取节点的状态信息:" + stat); } }
6、更新数据
更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如 果version已经变更,则抛出异常。
// 普通更新 client.setData().forPath(path,"新内容".getBytes()); // 指定版本更新 client.setData().withVersion(1).forPath(path);
版本不一致异常信息:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
案例演示:
package com.lagou.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class UpdateNote_curator { // 创建会话 public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.142.8.105:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2被创建了"); String path = "/lg-curator/c1"; // 获取节点的数据内容以及状态信息 // 数据内容 byte[] bytes = client.getData().forPath(path); System.out.println("获取到的节点数据内容:" + new String(bytes)); // 状态信息 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("获取节点的状态信息:" + stat); // 更新节点内容 int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion(); System.out.println("当前的最新版本是"+version); byte[] byte2 = client.getData().forPath(path); System.out.println("修改后的节点数据内容:" + new String(byte2)); } }
到此这篇关于Zookeeper Curator使用介绍的文章就介绍到这了,更多相关Zookeeper Curator内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!