当前位置 : 主页 > 编程语言 > java >

Ⅵ:zookeeper的Watcher事件监听机制

来源:互联网 收集:自由互联 发布时间:2022-07-13
❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️


❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️

文章目录

  • ​​前置:--》把握住Watcher流程《--​​
  • ​​1、watcher的连接状态判断​​
  • ​​2、watcher机制下的exists​​
  • ​​Ⅰ、连接对象的监听器​​
  • ​​Ⅱ、自定义watcher​​
  • ​​Ⅲ、watcher的多次监听​​
  • ​​Ⅳ、多个watcher同时监听一个节点​​
  • ​​3、watcher机制下的getData​​
  • ​​Ⅰ、连接对象的监听器​​
  • ​​Ⅱ、自定义watcher监听器​​
  • ​​Ⅲ、多次watcher监听​​
  • ​​Ⅳ、多个watcher同时监听一个节点​​
  • ​​4、watcher机制下的getChildren​​
  • ​​Ⅰ、连接对象的监视器​​
  • ​​Ⅱ、自定义watcher监听器​​
  • ​​Ⅲ、多次watcher监听​​
  • ​​Ⅳ、多个watcher同时监听一个节点​​

Ⅵ:zookeeper的Watcher事件监听机制_ide

xshell7连接云服务器演示结果,如果未知请看第一章

前置:–》把握住Watcher流程《–

1、连接zookeeper服务器
2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现)
3、执行回调函数process
4、释放当前线程

1、watcher的连接状态判断

package com.zookeeper.watcher;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
* @author:抱着鱼睡觉的喵喵
* @date:2021/5/7
* @description:
*/
public class WatcherConnection implements Watcher {
//计数器,使当前线程等待其他线程完成
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
public static void main(String[] args) {
try {
//连接zookeeper服务
zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());
//使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程)
countDownLatch.await();
Thread.sleep(1000);
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
//回调函数,进性状态的判断
@Override
public void process(WatchedEvent watchedEvent) {
try {
if (watchedEvent.getType() == Event.EventType.None) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接");
} else if (watchedEvent.getState() == Event.KeeperState.Expired) {
System.out.println("超时了");
} else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

2、watcher机制下的exists

Ⅰ、连接对象的监听器

public class WatcherExistsTest {
private String IP = "8.140.37.103:2181";
private ZooKeeper zookeeper;
@Before
public void connection() throws IOException, InterruptedException {
//计数器对象,使当前线程等待其他线程的完成
final CountDownLatch downLatch = new CountDownLatch(1);
zookeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判断是否连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
//使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
downLatch.countDown();
System.out.println("连接成功!");
}
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
//主线程进入等待态
downLatch.await();
}
@Test
public void watcherExists() throws KeeperException, InterruptedException {
//第一个参数是节点路径
//第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听
zookeeper.exists("/exists", true);
Thread.sleep(10000);
}
@After
public void close() {
try {
zookeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

此时在zookeeper客户端创建/exists节点

Ⅵ:zookeeper的Watcher事件监听机制_自定义_02


IDEA控制台就会出现NodeCreated​​在这里插入代码片​​

当然还有删除节点的NodeDeleted等,不再演示

Ⅵ:zookeeper的Watcher事件监听机制_ide_03

Ⅱ、自定义watcher

public class WatcherExistsTest {
private String IP = "8.140.37.103:2181";
private ZooKeeper zookeeper;
@Before
public void connection() throws IOException, InterruptedException {
//计数器对象,使当前线程等待其他线程的完成
final CountDownLatch downLatch = new CountDownLatch(1);
zookeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
//使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
downLatch.countDown();
}
}
});
//主线程进入等待态
downLatch.await();
}

@Test
public void watcherExists2() throws KeeperException, InterruptedException {
zookeeper.exists("/exists2", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("自定义watcher!");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
Thread.sleep(10000);
System.out.println("--------------");
}
@After
public void close() {
try {
zookeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果

Ⅵ:zookeeper的Watcher事件监听机制_自定义_04


Ⅵ:zookeeper的Watcher事件监听机制_ide_05

当我修改/exists2节点的数据时,控制台出现了NodeDataChanged

Ⅵ:zookeeper的Watcher事件监听机制_ide_06


Ⅵ:zookeeper的Watcher事件监听机制_自定义_07

Ⅲ、watcher的多次监听

本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听

@Test
public void watcherExists2() throws KeeperException, InterruptedException {

zookeeper.exists("/exists2", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("自定义watcher!");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
zookeeper.exists("/exists2", this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10000);
System.out.println("--------------");
}

Ⅵ:zookeeper的Watcher事件监听机制_ide_08


Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_09

Ⅳ、多个watcher同时监听一个节点

一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。

@Test
public void watcherExists3() throws KeeperException, InterruptedException {
System.out.println("============================");
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象1");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象2");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象3");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
Thread.sleep(10000);
System.out.println("==========================");
}

Ⅵ:zookeeper的Watcher事件监听机制_ide_10


Ⅵ:zookeeper的Watcher事件监听机制_ide_11


3、watcher机制下的getData

getData(String path, boolean b, Stat stat)连接对象的监听器
getData(String path, watcher watcher, Stat stat) 自定义的监听器

Ⅰ、连接对象的监听器

public class WatcherGetDataTest {
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
final String IP = "8.140.37.103:2181";
@Before
public void before() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("=================");
countDownLatch.countDown();
}
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
countDownLatch.await();
}

@Test
public void test() throws KeeperException, InterruptedException {
zooKeeper.getData("/data",true, null);
Thread.sleep(10000);
System.out.println("=======================");
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}

}

启动测试-》修改data节点的数据-》查看idea控制台结果

Ⅵ:zookeeper的Watcher事件监听机制_自定义_12


Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_13

Ⅱ、自定义watcher监听器

@Test
public void test2() throws KeeperException, InterruptedException {
System.out.println("========================");
zooKeeper.getData("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
}, null);
Thread.sleep(10000);
System.out.println("============================");
}

Ⅵ:zookeeper的Watcher事件监听机制_ide_14


Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_15

Ⅲ、多次watcher监听

@Test
public void test3() throws KeeperException, InterruptedException {
System.out.println("=========================");
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {

try {
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
zooKeeper.getData("/data", this, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
zooKeeper.getData("/data", watcher, null);
Thread.sleep(5000);
System.out.println("=======================");
}

Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_16


Ⅵ:zookeeper的Watcher事件监听机制_自定义_17

Ⅳ、多个watcher同时监听一个节点

@Test
public void test4() throws KeeperException, InterruptedException {
System.out.println("=======================");
zooKeeper.getData("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象1");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}, null);
zooKeeper.getData("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象2");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}, null);
zooKeeper.getData("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象3");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}, null);
Thread.sleep(5000);
System.out.println("========================");
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}

Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_18


Ⅵ:zookeeper的Watcher事件监听机制_自定义_19

4、watcher机制下的getChildren

getChildren(String path, boolean b) //使用连接对象的监视器
getChildren(String path, watcher w) //自定义监视器
子节点的修改不会被监测到

Ⅰ、连接对象的监视器

public class WatcherGetChildrenTest {
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
final String IP = "8.140.37.103:2181";
@Before
public void before() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("=================");
countDownLatch.countDown();
}
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
countDownLatch.await();
}

@Test
public void test() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/data", true);
Thread.sleep(5000);
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
}

Ⅵ:zookeeper的Watcher事件监听机制_ide_20


Ⅵ:zookeeper的Watcher事件监听机制_自定义_21

Ⅱ、自定义watcher监听器

@Test
public void test2() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("==================");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
Thread.sleep(10000);
System.out.println("====================");
}

Ⅵ:zookeeper的Watcher事件监听机制_自定义_22


Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_23

Ⅲ、多次watcher监听

@Test
public void test3() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("================");
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
try {
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
zooKeeper.getChildren("/data", this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
Thread.sleep(5000);
}

Ⅵ:zookeeper的Watcher事件监听机制_ide_24


Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_25

Ⅳ、多个watcher同时监听一个节点

@Test
public void test4() throws KeeperException, InterruptedException {
System.out.println("==================================");
zooKeeper.getChildren("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象1");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getType());
}
});
zooKeeper.getChildren("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象2");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
zooKeeper.getChildren("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象3");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
Thread.sleep(5000);
System.out.println("================================");
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}

Ⅵ:zookeeper的Watcher事件监听机制_zookeeper_26


Ⅵ:zookeeper的Watcher事件监听机制_自定义_27


网友评论