一、Nacos Server服务注册流程——AP模式
- Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。
1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @Autowired private ServiceManager serviceManager; @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //这里可以看出Nacos作为服务注册中心没有用到group //命名空间 final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //服务名称 final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //将服务注册注册请求参数转换成Instance final Instance instance = parseInstance(request); //注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }2、serviceManager.registerInstance注册服务实例
@Component public class ServiceManager implements RecordListener<Service> { //Nacos的注册表 private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空的服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //新增实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); //加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求 synchronized (service) { //更新并返回总的instanceList列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); //创建新的instance列表对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); //将实例列表集合和key设置进consistencyService中 consistencyService.put(key, instances); } } //获取服务 public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } }3、addIpAddresses更新并返回总的instance服务实例列表
这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。
@Component public class ServiceManager implements RecordListener<Service> { @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { //重DataStore类中的dataMap获取老的实例集合数据 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取集群中所有相关的实例集合,临时的或者是永久的 List<Instance> currentIPs = service.allIPs(ephemeral); //IP端口和实例的映射 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); //实例ID集合 Set<String> currentInstanceIds = Sets.newHashSet(); //放入对应的集合里 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //更新后的老的实例集合 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { //重新创建一个 instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { //遍历新的实例 if (!service.getClusterMap().containsKey(instance.getClusterName())) { //不存在就创建服务实例集群 Cluster cluster = new Cluster(instance.getClusterName(), service); //初始化,开启集群心跳检查 cluster.init(); //添加服务实例集群 service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //删除操作的话就删除老的实例集合的数据 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { //否则添加 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { //存在原实例,则直接使用原服务InstanceId instance.setInstanceId(oldInstance.getInstanceId()); } else { //否则,则直接使用原服务InstanceId instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } //返回总的实例集合 return new ArrayList<>(instanceMap.values()); } }- Service的allIPs获取集群中的实例集合 遍历集群,获取集群里的实例集合,临时的或者是永久的。
- ServiceManager的setValid更新老的实例集合
- 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
- Cluster的init集群初始化
- 即是开启一个心跳检查的任务
4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中
@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } } @DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //将服务实例集合存入DataStore中的dataMap中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加数据变更的任务 notifier.addTask(key, DataOperation.CHANGE); } public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); /** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //在ArrayBlockingQueue中添加任务 tasks.offer(Pair.with(datumKey, action)); } } }注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费
@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private volatile Notifier notifier = new Notifier(); @PostConstruct public void init() { //当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费 GlobalExecutor.submitDistroNotifyTask(notifier); } }5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务
@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //取出注册请求任务 Pair<String, DataOperation> pair = tasks.take(); //处理任务 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { //如果是一个数据变更动作,服务注册数据数据变更 if (action == DataOperation.CHANGE) { //从dataStore中获取服务注册请求放入的服务实例集合 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } //如果是一个数据删除动作 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册
@JsonInclude(Include.NON_NULL) public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); @Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } //权重最大值边界设定 if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } //权重最小值边界设定 if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //更新IP列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } //遍历服务注册的实例列表 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { //ClusterName为空,则设置默认值 instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } //如果不包含ClusterName,则初始化 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); //初始化Cluster,即先创建服务健康检查任务, //并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制 cluster.init(); //根据ClusterName注册服务集群实例 getClusterMap().put(instance.getClusterName(), cluster); } //根据ClusterName获取集群IP集合 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { //IP列表为空,也注册IP为空的服务 clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } //新增服务实例 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //更新为临时节点 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } //设置最后更新的时间 setLastModifiedMillis(System.currentTimeMillis()); //广播,UDP通知通客户端service发生了改变 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); } } public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private HealthCheckTask checkTask; public void init() { if (inited) { return; } //创建健康检查任务 checkTask = new HealthCheckTask(this); //执行健康检查任务 HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }- 最核心的就是updateIPs
为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。
- 服务注册通过CopyOnWrite支持并发读写的能力
- Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。
Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。
7、同步实例信息到Nacos Server集群其它节点
回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:
@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DistroProtocol distroProtocol; @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } }- 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送http请求同步实例信息到Nacos Server集群其它节点
二、Nacos服务端CP模式实现:RaftConsistencyServiceImpl
Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl,简单介绍一下大概实现方式:
-
1、是阿里自己实现的CP模式的简单raft协议。
-
2、判断自己是Leader节点的话才执行逻辑,否则转发给Leader。
-
3、同步更新实例数据到磁盘,异步更新内存注册表。
-
4、用CountDownLatch实现,必须集群半数以上节点写入成功才返回客户端成功。
-
5、成功后调用/raft/datum/commit接口提交。
-
6、发布ValueChangeEvent事件
-
7、PersistentNotifier监听ValueChangeEvent事件,处理服务变更,调用Service的updateIPs方法进行服务注册完成替换原有服务IP列表。
RaftConsistencyServiceImpl的put永久实例集合一致性服务
- 和raft选举算法有关。
raftCore.signalPublish
这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leader,leader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功。
@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { private final RaftProxy raftProxy; public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //不是leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //交给leader去做/v1/ns/raft/datum raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); //是leader try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //发布数据改变通知 onPublish(datum, peers.local()); final String content = json.toString(); //只要过半的结点数 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍历所有结点 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { //自己算一次 latch.countDown(); continue; } ///v1/ns/raft/datum/commit final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode()); return; } //异步完成 latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } //等待半数完成 if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } }RaftCore中的onPublish(datum, peers.local());会发布一个ValueChangeEvent事件
@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { public void onPublish(Datum datum, RaftPeer source) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } RaftPeer local = peers.local(); if (datum.value == null) { Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } local.resetLeaderDue(); // if data should be persisted, usually this is true: if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); //发布ValueChangeEvent事件 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); } }PersistentNotifier
会发布一个ValueChangeEvent事件,在PersistentNotifier中监听并处理
public final class PersistentNotifier extends Subscriber<ValueChangeEvent> { private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>(32); public <T extends Record> void notify(final String key, final DataOperation action, final T value) { if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) { for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) { try { if (action == DataOperation.CHANGE) { listener.onChange(key, value); } if (action == DataOperation.DELETE) { listener.onDelete(key); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } } if (!listenerMap.containsKey(key)) { return; } for (RecordListener listener : listenerMap.get(key)) { try { if (action == DataOperation.CHANGE) { listener.onChange(key, value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(key); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e); } } } @Override public void onEvent(ValueChangeEvent event) { notify(event.getKey(), event.getAction(), find.apply(event.getKey())); } }总结:
- 1、通过启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费。
- 2、通过高性能的内存监听队列将服务请求的写和处理进行分割。
好处:
- 提高性能:服务提供方发起注册和注册中心处理服务注册的实现分离。
- 采用ArrayBlockingQueue内存队列,避免了并发写的处理问题。
参考: https://www.cnblogs.com/chz-blogs/p/14325288.html
https://www.cnblogs.com/guoxiaoyu/p/14248226.html
https://blog.csdn.net/wangwei19871103/article/details/105834317
https://blog.csdn.net/wangwei19871103/article/details/105835207