当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Nacos服务端注册流程

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、Nacos Server服务注册流程——AP模式 Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。 1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口 @RestCon

一、Nacos Server服务注册流程——AP模式

  • Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。

1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @Autowired private ServiceManager serviceManager; @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //这里可以看出Nacos作为服务注册中心没有用到group //命名空间 final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //服务名称 final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //将服务注册注册请求参数转换成Instance final Instance instance = parseInstance(request); //注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }

2、serviceManager.registerInstance注册服务实例

@Component public class ServiceManager implements RecordListener<Service> { //Nacos的注册表 private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空的服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //新增实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); //加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求 synchronized (service) { //更新并返回总的instanceList列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); //创建新的instance列表对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); //将实例列表集合和key设置进consistencyService中 consistencyService.put(key, instances); } } //获取服务 public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } }

3、addIpAddresses更新并返回总的instance服务实例列表

这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。

@Component public class ServiceManager implements RecordListener<Service> { @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { //重DataStore类中的dataMap获取老的实例集合数据 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取集群中所有相关的实例集合,临时的或者是永久的 List<Instance> currentIPs = service.allIPs(ephemeral); //IP端口和实例的映射 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); //实例ID集合 Set<String> currentInstanceIds = Sets.newHashSet(); //放入对应的集合里 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //更新后的老的实例集合 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { //重新创建一个 instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { //遍历新的实例 if (!service.getClusterMap().containsKey(instance.getClusterName())) { //不存在就创建服务实例集群 Cluster cluster = new Cluster(instance.getClusterName(), service); //初始化,开启集群心跳检查 cluster.init(); //添加服务实例集群 service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //删除操作的话就删除老的实例集合的数据 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { //否则添加 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { //存在原实例,则直接使用原服务InstanceId instance.setInstanceId(oldInstance.getInstanceId()); } else { //否则,则直接使用原服务InstanceId instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } //返回总的实例集合 return new ArrayList<>(instanceMap.values()); } }
  • Service的allIPs获取集群中的实例集合 遍历集群,获取集群里的实例集合,临时的或者是永久的。
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); public List<Instance> allIPs(boolean ephemeral) { List<Instance> result = new ArrayList<>(); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { result.addAll(entry.getValue().allIPs(ephemeral)); } return result; } }
  • ServiceManager的setValid更新老的实例集合
  • 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
@Component public class ServiceManager implements RecordListener<Service> { private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) { Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size()); //遍历老的实例集合,如果新的实例存在的话就更新 for (Instance instance : oldInstances) { //获取对应新的实例 Instance instance1 = map.get(instance.toIpAddr()); //存在就更新 if (instance1 != null) { instance.setHealthy(instance1.isHealthy()); instance.setLastBeat(instance1.getLastBeat()); } //放入映射 instanceMap.put(instance.getDatumKey(), instance); } return instanceMap; } }
  • Cluster的init集群初始化
  • 即是开启一个心跳检查的任务
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { private HealthCheckTask checkTask; public void init() { if (inited) { return; } checkTask = new HealthCheckTask(this); HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }

4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中

@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } } @DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //将服务实例集合存入DataStore中的dataMap中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加数据变更的任务 notifier.addTask(key, DataOperation.CHANGE); } public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); /** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //在ArrayBlockingQueue中添加任务 tasks.offer(Pair.with(datumKey, action)); } } }

注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private volatile Notifier notifier = new Notifier(); @PostConstruct public void init() { //当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费 GlobalExecutor.submitDistroNotifyTask(notifier); } }

5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //取出注册请求任务 Pair<String, DataOperation> pair = tasks.take(); //处理任务 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { //如果是一个数据变更动作,服务注册数据数据变更 if (action == DataOperation.CHANGE) { //从dataStore中获取服务注册请求放入的服务实例集合 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } //如果是一个数据删除动作 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }

6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册

@JsonInclude(Include.NON_NULL) public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); @Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } //权重最大值边界设定 if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } //权重最小值边界设定 if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //更新IP列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } //遍历服务注册的实例列表 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { //ClusterName为空,则设置默认值 instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } //如果不包含ClusterName,则初始化 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); //初始化Cluster,即先创建服务健康检查任务, //并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制 cluster.init(); //根据ClusterName注册服务集群实例 getClusterMap().put(instance.getClusterName(), cluster); } //根据ClusterName获取集群IP集合 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { //IP列表为空,也注册IP为空的服务 clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } //新增服务实例 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //更新为临时节点 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } //设置最后更新的时间 setLastModifiedMillis(System.currentTimeMillis()); //广播,UDP通知通客户端service发生了改变 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); } } public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private HealthCheckTask checkTask; public void init() { if (inited) { return; } //创建健康检查任务 checkTask = new HealthCheckTask(this); //执行健康检查任务 HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }
  • 最核心的就是updateIPs
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private Set<Instance> persistentInstances = new HashSet<>(); @JsonIgnore private Set<Instance> ephemeralInstances = new HashSet<>(); public void updateIps(List<Instance> ips, boolean ephemeral) { //拿到cluster中旧的instance列表 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } //updatedIps主要做的是找出oldipmap中的实例并返回 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } //找出新增的实例列表,即ips中的实例,oldipmap不存在的实例列表 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { //进行新实例的健康检查设置 HealthCheckStatus.reset(ip); } } //找出oldipmap的ip的实例。不存在于ips中的实例 List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { //将不存在新实例ip列表的值的健康检查删除 HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); //更新服务下cluster的intances列表 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } }

为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。

  • 服务注册通过CopyOnWrite支持并发读写的能力
  • Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。

Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。

7、同步实例信息到Nacos Server集群其它节点

回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DistroProtocol distroProtocol; @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } }
  • 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送http请求同步实例信息到Nacos Server集群其它节点
@Component public class DistroProtocol { public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { //遍历所有服务ip。进行数据同步 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); //创建延迟任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //其实也是一个定时任务 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } } } public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> { private final ScheduledExecutorService processingExecutor; protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; //NacosDelayTaskExecuteEngine实例化 public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); //初始化tasks tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); //创建定时任务线程池 processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //每间隔一段时间定时执行任务ProcessRunnable processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } @Override public void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); if (null != existTask) { newTask.merge(existTask); } tasks.put(key, newTask); } finally { lock.unlock(); } } /** * process tasks in execute engine. */ protected void processTasks() { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } } private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } } } public class DistroDelayTaskProcessor implements NacosTaskProcessor { @Override public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { //又创建了一个定时任务 DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false; } } public class DistroSyncChangeTask extends AbstractDistroExecuteTask { @Override public void run() { Loggers.DISTRO.info("[DISTRO-START] {}", toString()); try { String type = getDistroKey().getResourceType(); DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); //是调用接口地址,通知其他服务,同步服务变更数据 boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); if (!result) { handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e); handleFailedTask(); } } }

二、Nacos服务端CP模式实现:RaftConsistencyServiceImpl

Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl,简单介绍一下大概实现方式:

  • 1、是阿里自己实现的CP模式的简单raft协议。

  • 2、判断自己是Leader节点的话才执行逻辑,否则转发给Leader。

  • 3、同步更新实例数据到磁盘,异步更新内存注册表。

  • 4、用CountDownLatch实现,必须集群半数以上节点写入成功才返回客户端成功。

  • 5、成功后调用/raft/datum/commit接口提交。

  • 6、发布ValueChangeEvent事件

  • 7、PersistentNotifier监听ValueChangeEvent事件,处理服务变更,调用Service的updateIPs方法进行服务注册完成替换原有服务IP列表。

RaftConsistencyServiceImpl的put永久实例集合一致性服务

  • 和raft选举算法有关。
@Deprecated @DependsOn("ProtocolManager") @Service public class RaftConsistencyServiceImpl implements PersistentConsistencyService { private final RaftCore raftCore; @Override public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } }

raftCore.signalPublish

这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leader,leader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功。

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { private final RaftProxy raftProxy; public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //不是leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //交给leader去做/v1/ns/raft/datum raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); //是leader try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //发布数据改变通知 onPublish(datum, peers.local()); final String content = json.toString(); //只要过半的结点数 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍历所有结点 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { //自己算一次 latch.countDown(); continue; } ///v1/ns/raft/datum/commit final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode()); return; } //异步完成 latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } //等待半数完成 if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } }

RaftCore中的onPublish(datum, peers.local());会发布一个ValueChangeEvent事件

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { public void onPublish(Datum datum, RaftPeer source) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } RaftPeer local = peers.local(); if (datum.value == null) { Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } local.resetLeaderDue(); // if data should be persisted, usually this is true: if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); //发布ValueChangeEvent事件 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); } }

PersistentNotifier

会发布一个ValueChangeEvent事件,在PersistentNotifier中监听并处理

public final class PersistentNotifier extends Subscriber<ValueChangeEvent> { private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>(32); public <T extends Record> void notify(final String key, final DataOperation action, final T value) { if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) { for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) { try { if (action == DataOperation.CHANGE) { listener.onChange(key, value); } if (action == DataOperation.DELETE) { listener.onDelete(key); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } } if (!listenerMap.containsKey(key)) { return; } for (RecordListener listener : listenerMap.get(key)) { try { if (action == DataOperation.CHANGE) { listener.onChange(key, value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(key); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } @Override public void onEvent(ValueChangeEvent event) { notify(event.getKey(), event.getAction(), find.apply(event.getKey())); } }

总结:

  • 1、通过启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费。
  • 2、通过高性能的内存监听队列将服务请求的写和处理进行分割。

好处:

  • 提高性能:服务提供方发起注册和注册中心处理服务注册的实现分离。
  • 采用ArrayBlockingQueue内存队列,避免了并发写的处理问题。

参考: https://www.cnblogs.com/chz-blogs/p/14325288.html

https://www.cnblogs.com/guoxiaoyu/p/14248226.html

https://blog.csdn.net/wangwei19871103/article/details/105834317

https://blog.csdn.net/wangwei19871103/article/details/105835207

网友评论