当前位置 : 主页 > 编程语言 > java >

Dubbo源码学习--Redis注册中心(三)

来源:互联网 收集:自由互联 发布时间:2022-09-29
基于 Redis1​实现的注册中心2。 使用 Redis 的 Key/Map 结构存储数据结构: 主 Key 为服务名和类型 Map 中的 Key 为 URL 地址 Map 中的 Value 为过期时间,用于判断脏数据,脏数据由监控中心删除


基于 Redis 1​ 实现的注册中心 2。

使用 Redis 的 Key/Map 结构存储数据结构:

  • 主 Key 为服务名和类型
  • Map 中的 Key 为 URL 地址
  • Map 中的 Value 为过期时间,用于判断脏数据,脏数据由监控中心删除3

使用 Redis 的 Publish/Subscribe 事件通知数据变更:

  • 通过事件的值区分事件类型:

​​register​​

  • ,

​​unregister​​

  • ,

​​subscribe​​

  • ,

​​unsubscribe​​

  • 普通消费者直接订阅指定服务提供者的 Key,只会收到指定服务的

​​register​​

  • ,

​​unregister​​

  • 监控中心通过

​​psubscribe​​

  • 功能订阅

​​/dubbo/*​​

  • ,会收到所有服务的所有变更事件

调用过程:

  • 服务提供方启动时,向
  • ​​Key:/dubbo/com.foo.BarService/providers​​

  • 并向
  • ​​Channel:/dubbo/com.foo.BarService/providers​​

  • 发送
  • ​​register​​

  • 服务消费方启动时,从
  • ​​Channel:/dubbo/com.foo.BarService/providers​​

  • 订阅
  • ​​register​​

  • ​​unregister​​

  • 并向
  • ​​Key:/dubbo/com.foo.BarService/providers​​

  • 服务消费方收到
  • ​​register​​

  • ​​unregister​​

  • 事件后,从
  • ​​Key:/dubbo/com.foo.BarService/providers​​

  • 服务监控中心启动时,从
  • ​​Channel:/dubbo/*​​

  • 订阅
  • ​​register​​

  • ​​unregister​​

  • ,以及
  • ​​subscribe​​

  • ​​unsubsribe​​

  • 事件
  • 服务监控中心收到
  • ​​register​​

  • ​​unregister​​

  • 事件后,从
  • ​​Key:/dubbo/com.foo.BarService/providers​​

  • 下获取提供者地址列表
  • 服务监控中心收到
  • ​​subscribe​​

  • ​​unsubsribe​​

  • 事件后,从
  • ​​Key:/dubbo/com.foo.BarService/consumers​​

    配置

    <dubbo:registry address="redis://10.20.153.10:6379"

    <dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379"

    <dubbo:registry protocol="redis" address="10.20.153.10:6379"

    <dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379"

    选项

    • 可通过

    ​​<dubbo:registry group="dubbo" />​​

    • 设置 redis 中 key 的前缀,缺省为

    ​​dubbo​​

    • 可通过

    ​​<dubbo:registry cluster="replicate" />​​

    • 设置 redis 集群策略,缺省为

    ​​failover​​

    ​​failover​​

    • : 只写入和读取任意一台,失败时重试另一台,需要服务器端自行配置数据同步

    ​​replicate​​

    • : 在客户端同时写入所有服务器,只读取单台,服务器端不需要同步,注册中心集群增大,性能压力也会更大

    可靠性声明

    阿里内部并没有采用 Redis 做为注册中心,而是使用自己实现的基于数据库的注册中心,即:Redis 注册中心并没有在阿里内部长时间运行的可靠性保障,此 Redis 桥接实现只为开源版本提供,其可靠性依赖于 Redis 本身的可靠性。

    Redis注册中心实现了RedisRegistryFactory工厂,用来生成Registry的实现类RedisRegistry。

    public class RedisRegistryFactory extends AbstractRegistryFactory {

    @Override
    protected Registry createRegistry(URL url) {
    return new RedisRegistry(url);
    }
    }

    RedisRegistry中主要实现了如下接口:


    (1)doRegister(URL url);//向注册中心注册服务


    (2)doUnregister(URL url);//取消服务注册


    (3)doSubscribe(URL url, NotifyListener listener);//向注册中心监听服务


    (4)doUnsubscribe(URL url, NotifyListener listener);//取消监听注册中心服务

    public class RedisRegistry extends FailbackRegistry {

    private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);

    private static final int DEFAULT_REDIS_PORT = 6379;

    private final static String DEFAULT_ROOT = "dubbo";

    private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

    private final ScheduledFuture<?> expireFuture;

    private final String root;

    private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();

    private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

    private final int reconnectPeriod;

    private final int expirePeriod;

    private volatile boolean admin = false;

    private boolean replicate;

    //构造函数中主要完成了redis连接相关的操作
    public RedisRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
    throw new IllegalStateException("registry address == null");
    }
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    if (url.getParameter("max.idle", 0) > 0)
    config.setMaxIdle(url.getParameter("max.idle", 0));
    if (url.getParameter("min.idle", 0) > 0)
    config.setMinIdle(url.getParameter("min.idle", 0));
    if (url.getParameter("max.active", 0) > 0)
    config.setMaxTotal(url.getParameter("max.active", 0));
    if (url.getParameter("max.total", 0) > 0)
    config.setMaxTotal(url.getParameter("max.total", 0));
    if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0)
    config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
    if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
    config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
    if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
    config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
    if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
    config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
    throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    replicate = "replicate".equals(cluster);

    List<String> addresses = new ArrayList<String>();
    addresses.add(url.getAddress());
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (backups != null && backups.length > 0) {
    addresses.addAll(Arrays.asList(backups));
    }

    String password = url.getPassword();
    for (String address : addresses) {
    int i = address.indexOf(':');
    String host;
    int port;
    if (i > 0) {
    host = address.substring(0, i);
    port = Integer.parseInt(address.substring(i + 1));
    } else {
    host = address;
    port = DEFAULT_REDIS_PORT;
    }
    if (StringUtils.isEmpty(password)) {
    this.jedisPools.put(address, new JedisPool(config, host, port,
    url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
    } else {
    this.jedisPools.put(address, new JedisPool(config, host, port,
    url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), password));
    }
    }

    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
    group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
    group = group + Constants.PATH_SEPARATOR;
    }
    this.root = group;

    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
    public void run() {
    try {
    deferExpired(); // Extend the expiration time
    } catch (Throwable t) { // Defensive fault tolerance
    logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
    }
    }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    }

    private void deferExpired() {
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    for (URL url : new HashSet<URL>(getRegistered())) {
    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
    String key = toCategoryPath(url);
    if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
    jedis.publish(key, Constants.REGISTER);
    }
    }
    }
    if (admin) {
    clean(jedis);
    }
    if (!replicate) {
    break;// If the server side has synchronized data, just write a single machine
    }
    } finally {
    jedis.close();
    }
    } catch (Throwable t) {
    logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
    }
    }
    }

    // The monitoring center is responsible for deleting outdated dirty data
    private void clean(Jedis jedis) {
    Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
    if (keys != null && !keys.isEmpty()) {
    for (String key : keys) {
    Map<String, String> values = jedis.hgetAll(key);
    if (values != null && values.size() > 0) {
    boolean delete = false;
    long now = System.currentTimeMillis();
    for (Map.Entry<String, String> entry : values.entrySet()) {
    URL url = URL.valueOf(entry.getKey());
    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
    long expire = Long.parseLong(entry.getValue());
    if (expire < now) {
    jedis.hdel(key, entry.getKey());
    delete = true;
    if (logger.isWarnEnabled()) {
    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
    }
    }
    }
    }
    if (delete) {
    jedis.publish(key, Constants.UNREGISTER);
    }
    }
    }
    }
    }

    public boolean isAvailable() {
    for (JedisPool jedisPool : jedisPools.values()) {
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    if (jedis.isConnected()) {
    return true; // At least one single machine is available.
    }
    } finally {
    jedis.close();
    }
    } catch (Throwable t) {
    }
    }
    return false;
    }

    @Override
    public void destroy() {
    super.destroy();
    try {
    expireFuture.cancel(true);
    } catch (Throwable t) {
    logger.warn(t.getMessage(), t);
    }
    try {
    for (Notifier notifier : notifiers.values()) {
    notifier.shutdown();
    }
    } catch (Throwable t) {
    logger.warn(t.getMessage(), t);
    }
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    jedisPool.destroy();
    } catch (Throwable t) {
    logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
    }
    }
    }

    //注册地址到redis中主 Key 为服务名和类型
    //Map 中的 Key 为 URL 地址,Map 中的 Value 为过期时间,用于判断脏数据,脏数据由监控中心删除
    @Override
    public void doRegister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    jedis.hset(key, value, expire);
    jedis.publish(key, Constants.REGISTER);
    success = true;
    if (!replicate) {
    break; // If the server side has synchronized data, just write a single machine
    }
    } finally {
    jedis.close();
    }
    } catch (Throwable t) {
    exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }

    //取消注册,删除key值并发送通知
    @Override
    public void doUnregister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    jedis.hdel(key, value);
    jedis.publish(key, Constants.UNREGISTER);
    success = true;
    if (!replicate) {
    break; // If the server side has synchronized data, just write a single machine
    }
    } finally {
    jedis.close();
    }
    } catch (Throwable t) {
    exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }

    //订阅
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
    String service = toServicePath(url);
    Notifier notifier = notifiers.get(service);
    if (notifier == null) {
    //开启线程来订阅通知数据
    Notifier newNotifier = new Notifier(service);
    notifiers.putIfAbsent(service, newNotifier);
    notifier = notifiers.get(service);
    if (notifier == newNotifier) {
    notifier.start();
    }
    }
    boolean success = false;
    RpcException exception = null;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    if (service.endsWith(Constants.ANY_VALUE)) {
    admin = true;
    Set<String> keys = jedis.keys(service);
    if (keys != null && !keys.isEmpty()) {
    Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>();
    for (String key : keys) {
    String serviceKey = toServicePath(key);
    Set<String> sk = serviceKeys.get(serviceKey);
    if (sk == null) {
    sk = new HashSet<String>();
    serviceKeys.put(serviceKey, sk);
    }
    sk.add(key);
    }
    for (Set<String> sk : serviceKeys.values()) {
    doNotify(jedis, sk, url, Arrays.asList(listener));
    }
    }
    } else {
    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));
    }
    success = true;
    break; // Just read one server's data
    } finally {
    jedis.close();
    }
    } catch (Throwable t) { // Try the next server
    exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
    }

    private void doNotify(Jedis jedis, String key) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) {
    doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue()));
    }
    }

    private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
    if (keys == null || keys.isEmpty()
    || listeners == null || listeners.isEmpty()) {
    return;
    }
    long now = System.currentTimeMillis();
    List<URL> result = new ArrayList<URL>();
    List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
    String consumerService = url.getServiceInterface();
    for (String key : keys) {
    if (!Constants.ANY_VALUE.equals(consumerService)) {
    String prvoiderService = toServiceName(key);
    if (!prvoiderService.equals(consumerService)) {
    continue;
    }
    }
    String category = toCategoryName(key);
    if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
    continue;
    }
    List<URL> urls = new ArrayList<URL>();
    Map<String, String> values = jedis.hgetAll(key);
    if (values != null && values.size() > 0) {
    for (Map.Entry<String, String> entry : values.entrySet()) {
    URL u = URL.valueOf(entry.getKey());
    if (!u.getParameter(Constants.DYNAMIC_KEY, true)
    || Long.parseLong(entry.getValue()) >= now) {
    if (UrlUtils.isMatch(url, u)) {
    urls.add(u);
    }
    }
    }
    }
    if (urls.isEmpty()) {
    urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
    .setAddress(Constants.ANYHOST_VALUE)
    .setPath(toServiceName(key))
    .addParameter(Constants.CATEGORY_KEY, category));
    }
    result.addAll(urls);
    if (logger.isInfoEnabled()) {
    logger.info("redis notify: " + key + " = " + urls);
    }
    }
    if (result == null || result.isEmpty()) {
    return;
    }
    for (NotifyListener listener : listeners) {
    notify(url, listener, result);
    }
    }

    private String toServiceName(String categoryPath) {
    String servicePath = toServicePath(categoryPath);
    return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
    }

    private String toCategoryName(String categoryPath) {
    int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR);
    return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
    }

    private String toServicePath(String categoryPath) {
    int i;
    if (categoryPath.startsWith(root)) {
    i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length());
    } else {
    i = categoryPath.indexOf(Constants.PATH_SEPARATOR);
    }
    return i > 0 ? categoryPath.substring(0, i) : categoryPath;
    }

    private String toServicePath(URL url) {
    return root + url.getServiceInterface();
    }

    private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
    }

    //创建消息订阅线程,当服务未多个的时候创建的线程也是多个,比较消耗资源
    private class NotifySub extends JedisPubSub {

    private final JedisPool jedisPool;

    public NotifySub(JedisPool jedisPool) {
    this.jedisPool = jedisPool;
    }

    //调用回调函数
    @Override
    public void onMessage(String key, String msg) {
    if (logger.isInfoEnabled()) {
    logger.info("redis event: " + key + " = " + msg);
    }
    if (msg.equals(Constants.REGISTER)
    || msg.equals(Constants.UNREGISTER)) {
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    //更新本地内存中的通知数据
    doNotify(jedis, key);
    } finally {
    jedis.close();
    }
    } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
    logger.error(t.getMessage(), t);
    }
    }
    }

    @Override
    public void onPMessage(String pattern, String key, String msg) {
    onMessage(key, msg);
    }

    @Override
    public void onSubscribe(String key, int num) {
    }

    @Override
    public void onPSubscribe(String pattern, int num) {
    }

    @Override
    public void onUnsubscribe(String key, int num) {
    }

    @Override
    public void onPUnsubscribe(String pattern, int num) {
    }

    }
    //接收通知线程
    private class Notifier extends Thread {

    private final String service;
    private final AtomicInteger connectSkip = new AtomicInteger();
    private final AtomicInteger connectSkiped = new AtomicInteger();
    private final Random random = new Random();
    private volatile Jedis jedis;
    private volatile boolean first = true;
    private volatile boolean running = true;
    private volatile int connectRandom;

    public Notifier(String service) {
    super.setDaemon(true);
    super.setName("DubboRedisSubscribe");
    this.service = service;
    }

    private void resetSkip() {
    connectSkip.set(0);
    connectSkiped.set(0);
    connectRandom = 0;
    }

    private boolean isSkip() {
    int skip = connectSkip.get(); // Growth of skipping times
    if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
    if (connectRandom == 0) {
    connectRandom = random.nextInt(10);
    }
    skip = 10 + connectRandom;
    }
    if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times
    return true;
    }
    connectSkip.incrementAndGet();
    connectSkiped.set(0);
    connectRandom = 0;
    return false;
    }
    //开启线程监听通知数据
    @Override
    public void run() {
    while (running) {
    try {
    if (!isSkip()) {
    try {
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    jedis = jedisPool.getResource();
    try {
    if (service.endsWith(Constants.ANY_VALUE)) {
    if (!first) {
    first = false;
    Set<String> keys = jedis.keys(service);
    if (keys != null && !keys.isEmpty()) {
    for (String s : keys) {
    doNotify(jedis, s);
    }
    }
    resetSkip();
    }
    jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
    } else {
    if (!first) {
    first = false;
    doNotify(jedis, service);
    resetSkip();
    }
    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
    }
    break;
    } finally {
    jedis.close();
    }
    } catch (Throwable t) { // Retry another server
    logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
    // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
    sleep(reconnectPeriod);
    }
    }
    } catch (Throwable t) {
    logger.error(t.getMessage(), t);
    sleep(reconnectPeriod);
    }
    }
    } catch (Throwable t) {
    logger.error(t.getMessage(), t);
    }
    }
    }

    public void shutdown() {
    try {
    running = false;
    jedis.disconnect();
    } catch (Throwable t) {
    logger.warn(t.getMessage(), t);
    }
    }

    }

    }


    上一篇:设计模式和七大设计原则不难的
    下一篇:没有了
    网友评论