当前位置 : 主页 > 编程语言 > java >

Zookeeper干货

来源:互联网 收集:自由互联 发布时间:2022-10-26
一、基本操作命令 进入​​zkClli.sh​​ ZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n |-b val path history redo

一、基本操作命令

进入​​zkClli.sh​​

ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port

二、项目整合zookeeper

依赖

<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>


<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>

初始化实例

  • 配置applicationContext-zookeeper.xml
<description>zookeeper 放入spring容器,项目启动加载的时候就建立和zk的连接</description>


<!--创建重连策略-->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!--每次重试连接的等待时间-->
<constructor-arg index="0" value="1000"></constructor-arg>
<!--设置的重连的次数-->
<constructor-arg index="1" value="5"></constructor-arg>
</bean>


<!--创建zookeeper客户端-->
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory"
factory-method="newClient" init-method="start">
<constructor-arg index="0" value="47.107.63.171:2181"></constructor-arg>
<constructor-arg index="1" value="10000"></constructor-arg>
<constructor-arg index="2" value="10000"></constructor-arg>
<constructor-arg index="3" ref="retryPolicy"></constructor-arg>
</bean>


<!--客户端配置-->
<bean id="ZKCurator" class="com.tony.web.util.ZKCurator" init-method="init">
<constructor-arg index="0" ref="client"></constructor-arg>
</bean>

创建zkCurator类

并自动初始化,启动监听

package com.tony.web.util;


import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZKCurator {
//通过bean自动创建实例
private CuratorFramework client = null;


final static Logger log = LoggerFactory.getLogger(ZKCurator.class);


public ZKCurator(CuratorFramework client){
this.client = client;
}


public void init(){
client = client.usingNamespace("admin");


try {
//判断在admin命名空间下是否有bgm节点,/admin/bgm
if (client.checkExists().forPath("/bgm") == null) {
/**
* 对于zk来讲,有两种类型的节点:
* 持久节点:当客户端断开连接时,znode 不会被自动删除,
* 临时节点:znode 将在客户端断开连接时被删除,
*/
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //持久化节点
.withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名权限,完全开发
.forPath("/bgm");
log.info("zookeeper初始化成功");
log.info("zookeeper服务器状态:{}",client.isStarted());
}


} catch (Exception e) {
log.error("zookeeper客户端连接、初始化错误...");
e.printStackTrace();
}

}


/**
* @Descrption: 增加或刪除bgm,向zookeeper创建子节点,供小程序监听
* @param bgmId
* @param operaObj
*/
public void sendBgmOperator(String bgmId, String operaObj){

try {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath("/bgm/" + bgmId, operaObj.getBytes("utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}

}

三、定义配置类

实现自动依赖注入来自动实例化

@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {

@Bean(initMethod = "init")
public ZKCuratorClient zkCuratorClient(){
return new ZKCuratorClient();
}
}

通过自动实例化自动开启监听

四、创建客户端监听

package com.tony;


import com.tony.config.ResourceConfig;
import com.tony.service.BgmService;
import com.tony.utils.BGMOperatorTypeEnum;
import com.tony.utils.JsonUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.aspectj.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;


@Component
public class ZKCuratorClient {


private CuratorFramework client = null;
final static Logger log = LoggerFactory.getLogger(ZKCuratorClient.class);


@Autowired
private BgmService bgmService;


@Autowired
private ResourceConfig resourceConfig;


//public static final String ZOOKEEPER_SERVER = "47.107.63.171:2181";
//优化:使用resource.properties来依赖注入

public void init(){
if(client != null){
return;
}

//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 5);
//创建zk客户端
client = CuratorFrameworkFactory.builder().connectString(resourceConfig.getZookeeperServer())
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
//启动客户端
client.start();

try {
//String testNodeData = new String(client.getData().forPath("/bgm/210816FNTPDX97HH"));
//log.info("测试节点数据为:{}" + testNodeData);
addChildWatch("/bgm");
} catch (Exception e) {
e.printStackTrace();
}
}


public void addChildWatch(String nodePath) throws Exception{
final PathChildrenCache cache = new PathChildrenCache(client, nodePath, true);


cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
log.info("监听到事件CHILD_ADDED");


//1. 从数据库查询bgm对象,获取路径path
String path = event.getData().getPath();
String operatotObjStr = new String(event.getData().getData(), "utf-8");
Map<String, Object> map = JsonUtils.jsonToPojo(operatotObjStr, Map.class);


System.out.println("operatorObjStr: " + operatotObjStr);
System.out.println("map: " + map);


//从zookeeper获取bgmPath
String operatorType = (String) map.get("operaType");
String songPath = (String) map.get("path");


//String[] arr = path.split("/");
//String bgmId = arr[arr.length - 1];


//从数据库获取bgmPath
//Bgm bgm = bgmService.queryBgmById(bgmId);
//if(bgm == null){
// return;
//}
//bgm所在的相对路径
//String songPath = bgm.getPath();


//2. 定义保存到本地的bgm路径
// String filePath = "D:/tony_videos_dev" + songPath;
/*优化*/ String filePath = resourceConfig.getFileSpace() + songPath;
//3. 定义下载的路径(播放url)
String arrPath[] = songPath.split("\\\\");
String finalPath = "";
//3.1 处理url的斜杠以及编码
for(int i = 0; i < arrPath.length; i++){
if(StringUtils.isNotBlank(arrPath[i])){
finalPath += "/";
finalPath += URLEncoder.encode(arrPath[i], StandardCharsets.UTF_8.toString()) ;
}
}
// String bgmUrl = "http://192.168.56.1:8080/mvc" + finalPath;
/*优化*/ String bgmUrl = resourceConfig.getBgmServer() + finalPath;
if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)){
//下载bgm到springboot服务器
URL url = new URL(bgmUrl);
File file = new File(filePath);
FileUtils.copyURLToFile(url, file);
client.delete().forPath(path);
log.info("文件 {} 已同步!",filePath);
}else if (operatorType.equals(BGMOperatorTypeEnum.DELETE.type)){
File file = new File(filePath);
log.info("文件 {} 已删除!",filePath);
FileUtils.forceDelete(file);
client.delete().forPath(path);
}

}
}
});


}
}

五、遍历所有节点

public static void listAll(ZooKeeper zk, String path) throws Exception {
List<String> children = zk.getChildren(path, false);
for(String child : children){
String currentNodeName = "/".equals(path) ? path + child : path + "/" + child;
System.out.println(currentNodeName);
listAll(zk, currentNodeName);
}
}

六、创建节点

​​ZooDefs.Ids.CREATOR_ALL_ACL​​是需要添加身份才可以创建,否则身份为只读用户,不能创建和删除

/**
* 使用java远程访问zookeeper
* 创建客户端
* 使用客户端发送命令处理返回结果
* 回收资源
*/
public static void create() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("watch中的方法执行");
}
});




String result = zk.create("/parent", "parent data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("创建的parent的结果:" + result);

//创建临时节点后,在zk.close()后会失效
String tmpResult = zk.create("/parent/tmp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建的/parent/tmp的结果:" + tmpResult);


String seqResult = zk.create("/parent/sequence", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建的/parent/sequence的结果:" + seqResult);


//关闭客户端
zk.close();


}

七、获取数据

public static void get() throws Exception {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {


}
});
byte[] data = zk.getData("/parent", false, null);
String s = new String(data);
System.out.println("/parent获取的数据为:" + s);
}

八、删除节点

在​​3.4.11​​​版本的​​zookeeper​​​客户端,rmr命令可以递归删除,​​delete​​​只能删除子节点为空的节点,新版本​​delte​​​增加了​​deleteAll​​​,​​rmr​​​被替换成了​​deleteAll​​

Zookeeper干货_zookeeper

/**
* 删除节点
* 删除节点前,需要先查询节点的状态(cversion),通过getData来查询这个版本
* 设计是为了保证删除的节点是你想删除的那个
* @throws Exception
*/
public static void delete() throws Exception {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {


}
});


Stat stat = new Stat();
System.out.println(stat);


zk.getData("/parent/sequence0000000001",false, stat);
//System.out.println(stat.getCversion());//cversion
//System.out.println(stat.getVersion());//即dataVersion
//System.out.println(stat.getAversion());//aclVersion
System.out.println(stat.getCversion());


/*delete不能删除非空节点,即存在子节点时
stat.getCversion()是要在getDtata后才能获取到它的版本,因为每次查询后,
stat才能获取到版本号,这样才能删除,如果获取不到那说明没有这个版本号
也就是适合在高并发的时候使用,如果不用stat,而直接用版本号,那么可能就会冲突
*/
zk.delete("/parent/sequence0000000001", stat.getCversion());
}


上一篇:Java多线程(3):ThreadPool(中)
下一篇:没有了
网友评论