当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Nacos服务发现源码分析

来源:互联网 收集:自由互联 发布时间:2023-02-04
Nacos服务发现的领域模型 Namespace:实现各环境的隔离(如开发、测试、预发、线上),默认public Group:不同服务可以分到同一个组,默认DEFAULT_GROUP Service:微服务 Cluster:对指定微服务的一

Nacos服务发现的领域模型

  • Namespace:实现各环境的隔离(如开发、测试、预发、线上),默认public
  • Group:不同服务可以分到同一个组,默认DEFAULT_GROUP
  • Service:微服务
  • Cluster:对指定微服务的一个虚拟划分,默认DEFAULT
  • Instance:微服务实例
  • persistentInstances:持久实例集合
  • ephemeralInstances:临时实例集合

一、服务发现前

目前在Spring Cloud,RPC基本都是使用Feign去调用服务,Feign其实也是Ribbon的一个封装,主要功能,是将我们通常http请求服务这个过程帮我们封装起来,使我们使用时更加的简便,通过一个注解就能实现对服务的调用,对于ribbon的源码解析,参考这篇文章:https://www.jianshu.com/p/f3db11f045cc

ribbon最最底层也是实现spring cloud common包下的

  • org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList 主要是ServiceInstanceChooser 下的继承类。

  • org.springframework.cloud.client.loadbalancer.LoadBalancerClient 这是Ribbon实现负载均衡的父类接口,接下来一系列的接口实现最终会落到如何获取serverList这个问题是,答案在这个接口:com.netflix.loadbalancer.ServerList

接下来,就是服务发现组件的事情了

  • eureka对于这个接口的实现就是DiscoveryEnabledNIWSServerList

  • Nacos的实现就是org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList,这也是我们的重点。

二、服务发现

Nacos Client发起RPC调用请求后,通过RibbonLoadBalancerClient的getLoadBalancer方法获取负载均衡器,因为Spring Cloud默认指定了ZoneAwareLoadBalancer,但是ZoneAwareLoadBalancer的构造函数初始化父类DynamicServerListLoadBalancer。

DynamicServerListLoadBalancer

DynamicServerListLoadBalancer继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。实现了下面两个功能:

  • 服务实例在运行期间的动态更新。
  • 对服务器实例清单的过滤功能,可以通过过滤器来选择地获取一批服务实例清单。

DynamicServerListLoadBalancer构造函数初始化的时候会调用restOfInit方法

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); //定时更新Eureka Client实例列表 enableAndInitLearnNewServersFeature(); //获取所有Eureka Client实例列表 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } }

enableAndInitLearnNewServersFeature() 每30秒定时更新Nacos Client实例列表

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } } public class PollingServerListUpdater implements ServerListUpdater { //更新服务实例在初始化之后延迟1秒后开始执行 private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs; //以30秒为周期重复执行 private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; //以定时任务的方式进行服务列表的更新。 @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { //创建一个Runnable的线程wrapperRunnable final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //具体更新服务实例列表的方法 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //为wrapperRunnable线程启动一个定时任务 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, //1秒 refreshIntervalMs, //30秒 TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } }

DynamicServerListLoadBalancer#updateListOfServers()

  • 获取所有Nacos Client实例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList(); if (this.serverListImpl != null) { //实现从Nacos Server中获取服务可用实例列表 servers = this.serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); if (this.filter != null) { servers = this.filter.getFilteredListOfServers((List)servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); } } //更新服务实例列表 this.updateAllServerList((List)servers); } }

从Nacos Server中获取服务可用实例列表,最终就调用到了NacosServerList类

public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; private String serviceId; @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { String group = discoveryProperties.getGroup(); //获取服务实例列表 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); //将List<Instance>转换成List<NacosServer>数据,并返回。 return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } }

NacosNamingService.selectInstances(serviceName, new ArrayList<String>(), healthyOnly)

  • HostReactor.getServiceInfo():拿到ServiceInfo对象
  • 从ServiceInfo对象中取出List<Instance> hosts属性值并返回。
public class NacosNamingService implements NamingService { private HostReactor hostReactor; @Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { //erviceInfo封装了服务的集群实例信息 ServiceInfo serviceInfo; if (subscribe) { //获取ServiceInfo对象 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { //直接从Nacos服务端获取服务信息 serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } //从ServiceInfo对象中取出List<Instance> hosts属性值并返回。 return selectInstances(serviceInfo, healthy); } }

hostReactor.getServiceInfoDirectlyFromServer

  • 直接从Nacos服务端获取服务信息
public class HostReactor implements Closeable { private final NamingProxy serverProxy; public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return null; } } public class NamingProxy implements Closeable { public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); } }

hostReactor.getServiceInfo

  • 获取ServiceInfo对象
public class HostReactor implements Closeable { private final Map<String, ServiceInfo> serviceInfoMap; private final Map<String, Object> updatingMap; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final FailoverReactor failoverReactor; public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); //注册中心与服务提供者失联,会把该服务配置成failover状态 //从缓存serviceMap中获取信息 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //从本地缓存serviceInfoMap中获取服务信息 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); //如果本地缓存没有,则从注册中心获取,并更新本地缓存 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //生成定时任务,延时1S后执行更新任务 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } public void scheduleUpdateIfAbsent(String serviceName, String clusters) { //缓存中存在,直接返回 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } //生成定时任务,延迟1S执行,该任务会循环回调自己 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } }

HostReactor#updateServiceNow

  • 从注册中心获取,并更新本地缓存
public class HostReactor implements Closeable { private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } public void updateService(String serviceName, String clusters) throws NacosException { //先从本地缓存serviceInfoMap中获取 ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { //从注册中心拉取服务列表 /instance/list String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //解析结果,更新serviceInfoMap processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } }

UpdateTask

  • 定时更新任务
public class HostReactor implements Closeable { private static final long DEFAULT_DELAY = 1000L; public class UpdateTask implements Runnable { @Override public void run() { long delayTime = DEFAULT_DELAY; try { // 拿到当前的服务信息 ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); //如果为null,说明本地没有,需要从服务端获取 if (serviceObj == null) { //拉取最新的服务列表随后更新 updateService(serviceName, clusters); return; } // 当前服务未及时更新 进行更新操作 //判断服务是否已过期,当前服务的最后一次更新时间 <= 全局的最后一次更新 if (serviceObj.getLastRefTime() <= lastRefTime) { //调用updateService从服务端获取地址列表,更新服务列表 updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { //如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。 //因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务 refreshOnly(serviceName, clusters); } // 设置服务最新的更新时间 lastRefTime = serviceObj.getLastRefTime(); // 订阅被取消,如果没有实现订阅或者futureMap中不包含指定服务信息,则中断更新请求 if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { // 继续下一次轮询 延后10s执行,实现重复轮询 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } } }

服务端处理服务发现请求

  • /instance/list方法
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public ObjectNode list(HttpServletRequest request) throws Exception { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly); } public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); //获取服务 Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { if (udpPort > 0 && pushService.canEnablePush(agent)) { pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } //如果服务为空 if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } checkIfDisabled(service); List<Instance> srvedIPs; //从clusterMap获取集群实例的IP集合 srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); // filter ips using selector: if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.set("hosts", JacksonUtils.createEmptyArrayNode()); result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } if (isCheck) { result.put("reachProtectThreshold", false); } double threshold = service.getProtectThreshold(); if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { List<Instance> ips = entry.getValue(); if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // remove disabled instance: if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put("ip", instance.getIp()); ipObj.put("port", instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put("valid", entry.getKey()); ipObj.put("healthy", entry.getKey()); ipObj.put("marked", instance.isMarked()); ipObj.put("instanceId", instance.getInstanceId()); ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put("enabled", instance.isEnabled()); ipObj.put("weight", instance.getWeight()); ipObj.put("clusterName", instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { ipObj.put("serviceName", instance.getServiceName()); } else { ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put("ephemeral", instance.isEphemeral()); hosts.add(ipObj); } } result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } }

ServiceManager#getService

获取服务

@Component public class ServiceManager implements RecordListener<Service> { /** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); public Service getService(String namespaceId, String serviceName) { //namespaceId在注册中心不存在,直接返回null if (serviceMap.get(namespaceId) == null) { return null; } //根据namespaceId和serviceName获取服务 return chooseServiceMap(namespaceId).get(serviceName); } public Map<String, Service> chooseServiceMap(String namespaceId) { return serviceMap.get(namespaceId); } }

Service.srvIPs

  • 从clusterMap获取集群实例的IP集合
@JsonInclude(Include.NON_NULL) public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); public List<Instance> srvIPs(List<String> clusters) { if (CollectionUtils.isEmpty(clusters)) { clusters = new ArrayList<>(); clusters.addAll(clusterMap.keySet()); } return allIPs(clusters); } public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } result.addAll(clusterObj.allIPs()); } return result; } } public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private Set<Instance> persistentInstances = new HashSet<>(); @JsonIgnore private Set<Instance> ephemeralInstances = new HashSet<>(); //获取所有实例 public List<Instance> allIPs() { List<Instance> allInstances = new ArrayList<>(); allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances); return allInstances; } }

至此,Nacos服务发现源码分析完毕!

参考: https://blog.csdn.net/C18298182575/article/details/101549822

https://blog.csdn.net/liyanan21/article/details/89090482/

网友评论