一、基本操作命令 进入zkClli.sh ZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n |-b val path history redo
一、基本操作命令
进入zkClli.sh
ZooKeeper -server host:port cmd argsstat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
二、项目整合zookeeper
依赖
<!-- zookeeper --><dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
初始化实例
- 配置applicationContext-zookeeper.xml
<!--创建重连策略-->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!--每次重试连接的等待时间-->
<constructor-arg index="0" value="1000"></constructor-arg>
<!--设置的重连的次数-->
<constructor-arg index="1" value="5"></constructor-arg>
</bean>
<!--创建zookeeper客户端-->
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory"
factory-method="newClient" init-method="start">
<constructor-arg index="0" value="47.107.63.171:2181"></constructor-arg>
<constructor-arg index="1" value="10000"></constructor-arg>
<constructor-arg index="2" value="10000"></constructor-arg>
<constructor-arg index="3" ref="retryPolicy"></constructor-arg>
</bean>
<!--客户端配置-->
<bean id="ZKCurator" class="com.tony.web.util.ZKCurator" init-method="init">
<constructor-arg index="0" ref="client"></constructor-arg>
</bean>
创建zkCurator类
并自动初始化,启动监听
package com.tony.web.util;import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKCurator {
//通过bean自动创建实例
private CuratorFramework client = null;
final static Logger log = LoggerFactory.getLogger(ZKCurator.class);
public ZKCurator(CuratorFramework client){
this.client = client;
}
public void init(){
client = client.usingNamespace("admin");
try {
//判断在admin命名空间下是否有bgm节点,/admin/bgm
if (client.checkExists().forPath("/bgm") == null) {
/**
* 对于zk来讲,有两种类型的节点:
* 持久节点:当客户端断开连接时,znode 不会被自动删除,
* 临时节点:znode 将在客户端断开连接时被删除,
*/
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //持久化节点
.withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名权限,完全开发
.forPath("/bgm");
log.info("zookeeper初始化成功");
log.info("zookeeper服务器状态:{}",client.isStarted());
}
} catch (Exception e) {
log.error("zookeeper客户端连接、初始化错误...");
e.printStackTrace();
}
}
/**
* @Descrption: 增加或刪除bgm,向zookeeper创建子节点,供小程序监听
* @param bgmId
* @param operaObj
*/
public void sendBgmOperator(String bgmId, String operaObj){
try {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath("/bgm/" + bgmId, operaObj.getBytes("utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、定义配置类
实现自动依赖注入来自动实例化
@Configurationpublic class WebMvcConfig extends WebMvcConfigurerAdapter {
@Bean(initMethod = "init")
public ZKCuratorClient zkCuratorClient(){
return new ZKCuratorClient();
}
}
通过自动实例化自动开启监听
四、创建客户端监听
package com.tony;import com.tony.config.ResourceConfig;
import com.tony.service.BgmService;
import com.tony.utils.BGMOperatorTypeEnum;
import com.tony.utils.JsonUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.aspectj.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Component
public class ZKCuratorClient {
private CuratorFramework client = null;
final static Logger log = LoggerFactory.getLogger(ZKCuratorClient.class);
@Autowired
private BgmService bgmService;
@Autowired
private ResourceConfig resourceConfig;
//public static final String ZOOKEEPER_SERVER = "47.107.63.171:2181";
//优化:使用resource.properties来依赖注入
public void init(){
if(client != null){
return;
}
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 5);
//创建zk客户端
client = CuratorFrameworkFactory.builder().connectString(resourceConfig.getZookeeperServer())
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
//启动客户端
client.start();
try {
//String testNodeData = new String(client.getData().forPath("/bgm/210816FNTPDX97HH"));
//log.info("测试节点数据为:{}" + testNodeData);
addChildWatch("/bgm");
} catch (Exception e) {
e.printStackTrace();
}
}
public void addChildWatch(String nodePath) throws Exception{
final PathChildrenCache cache = new PathChildrenCache(client, nodePath, true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
log.info("监听到事件CHILD_ADDED");
//1. 从数据库查询bgm对象,获取路径path
String path = event.getData().getPath();
String operatotObjStr = new String(event.getData().getData(), "utf-8");
Map<String, Object> map = JsonUtils.jsonToPojo(operatotObjStr, Map.class);
System.out.println("operatorObjStr: " + operatotObjStr);
System.out.println("map: " + map);
//从zookeeper获取bgmPath
String operatorType = (String) map.get("operaType");
String songPath = (String) map.get("path");
//String[] arr = path.split("/");
//String bgmId = arr[arr.length - 1];
//从数据库获取bgmPath
//Bgm bgm = bgmService.queryBgmById(bgmId);
//if(bgm == null){
// return;
//}
//bgm所在的相对路径
//String songPath = bgm.getPath();
//2. 定义保存到本地的bgm路径
// String filePath = "D:/tony_videos_dev" + songPath;
/*优化*/ String filePath = resourceConfig.getFileSpace() + songPath;
//3. 定义下载的路径(播放url)
String arrPath[] = songPath.split("\\\\");
String finalPath = "";
//3.1 处理url的斜杠以及编码
for(int i = 0; i < arrPath.length; i++){
if(StringUtils.isNotBlank(arrPath[i])){
finalPath += "/";
finalPath += URLEncoder.encode(arrPath[i], StandardCharsets.UTF_8.toString()) ;
}
}
// String bgmUrl = "http://192.168.56.1:8080/mvc" + finalPath;
/*优化*/ String bgmUrl = resourceConfig.getBgmServer() + finalPath;
if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)){
//下载bgm到springboot服务器
URL url = new URL(bgmUrl);
File file = new File(filePath);
FileUtils.copyURLToFile(url, file);
client.delete().forPath(path);
log.info("文件 {} 已同步!",filePath);
}else if (operatorType.equals(BGMOperatorTypeEnum.DELETE.type)){
File file = new File(filePath);
log.info("文件 {} 已删除!",filePath);
FileUtils.forceDelete(file);
client.delete().forPath(path);
}
}
}
});
}
}
五、遍历所有节点
public static void listAll(ZooKeeper zk, String path) throws Exception {List<String> children = zk.getChildren(path, false);
for(String child : children){
String currentNodeName = "/".equals(path) ? path + child : path + "/" + child;
System.out.println(currentNodeName);
listAll(zk, currentNodeName);
}
}
六、创建节点
ZooDefs.Ids.CREATOR_ALL_ACL是需要添加身份才可以创建,否则身份为只读用户,不能创建和删除
/*** 使用java远程访问zookeeper
* 创建客户端
* 使用客户端发送命令处理返回结果
* 回收资源
*/
public static void create() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("watch中的方法执行");
}
});
String result = zk.create("/parent", "parent data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("创建的parent的结果:" + result);
//创建临时节点后,在zk.close()后会失效
String tmpResult = zk.create("/parent/tmp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建的/parent/tmp的结果:" + tmpResult);
String seqResult = zk.create("/parent/sequence", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建的/parent/sequence的结果:" + seqResult);
//关闭客户端
zk.close();
}
七、获取数据
public static void get() throws Exception {ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
byte[] data = zk.getData("/parent", false, null);
String s = new String(data);
System.out.println("/parent获取的数据为:" + s);
}
八、删除节点
在3.4.11版本的zookeeper客户端,rmr命令可以递归删除,delete只能删除子节点为空的节点,新版本delte增加了deleteAll,rmr被替换成了deleteAll
/*** 删除节点
* 删除节点前,需要先查询节点的状态(cversion),通过getData来查询这个版本
* 设计是为了保证删除的节点是你想删除的那个
* @throws Exception
*/
public static void delete() throws Exception {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
Stat stat = new Stat();
System.out.println(stat);
zk.getData("/parent/sequence0000000001",false, stat);
//System.out.println(stat.getCversion());//cversion
//System.out.println(stat.getVersion());//即dataVersion
//System.out.println(stat.getAversion());//aclVersion
System.out.println(stat.getCversion());
/*delete不能删除非空节点,即存在子节点时
stat.getCversion()是要在getDtata后才能获取到它的版本,因为每次查询后,
stat才能获取到版本号,这样才能删除,如果获取不到那说明没有这个版本号
也就是适合在高并发的时候使用,如果不用stat,而直接用版本号,那么可能就会冲突
*/
zk.delete("/parent/sequence0000000001", stat.getCversion());
}