当前位置 : 主页 > 编程语言 > java >

Spring Cloud——Eureka服务注册流程

来源:互联网 收集:自由互联 发布时间:2023-02-04
前言 作为Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事: 周期性更新服务列表; 周期性服务续约; 服务注册逻辑; 概览 以下图

前言

作为Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事:

  • 周期性更新服务列表;
  • 周期性服务续约;
  • 服务注册逻辑;

概览

以下图片来自Netflix官方,图中显示Eureka Client会发起Register请求将自身注册到注册中心,这样其他Eureka client通过Get Registry请求就能获取到新注册应用的相关信息:

Eureka-Client注册服务

Eureka-Client在两种情况下客户端会主动向服务端发送自己的注册信息

  • 1、当客户端的instance信息发生改变时,Eureka-Client和Server端信息不一致时。
  • 2、当客户端刚刚启动的时候。

源码分析

com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330规范。

 

首先回顾com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法,Eureka client在启动的时侯都会执行此方法,如下方所示,已经略去了周期性更新服务列表相关的代码:

@Singleton public class DiscoveryClient implements EurekaClient { private void initScheduledTasks() { //省略, 刷新缓存的定时器 //来自EurekaClientConfigBean,默认为true if (clientConfig.shouldRegisterWithEureka()) { //续租间隔 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); //周期性任务处理超时后,下一次执行时将超时事件翻倍,但是不可超过expBackOffBound的设定范围 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); //指定时间后启动周期性续租的任务 heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); //上报自身信息到Eureka server的操作委托给InstanceInfoReplicator实例发起, //如果有多个场景需要上报,都由InstanceInfoReplicator进行调度和安排, //并且还有限流逻辑,避免频繁先服务端请求 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //监听和响应应用状态变化,包括从停止状态恢复或者进入停止状态, statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //将自身状态上报都Eureka server(有限流逻辑避免频繁上报) instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { //注册状态变化的监听 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //更新信息并注册到Eureka server instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } }

从上述代码可以看出,主动更新和状态变化触发的更新,都委托给成员变量instanceInfoReplicator执行,InstanceInfoReplicator是个辅助类。

 

initScheduledTasks() 方法是在DiscoverClient的构造函数初始化的时候被调用。 主要作用就是:

  • 1、开启缓刷新定时器
  • 2、开启发送心跳的定时器
  • 3、开启实例instance状态变更监听
  • 4、开启应用状态复制器(主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册)
class InstanceInfoReplicator implements Runnable { public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { // 首次进来设置一下。 instanceInfo.setIsDirty(); // for initial register // 开启定时线程,每停顿initialDelayMs秒执行一次该任务。服务注册也是由这个任务完成 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } // 这个方法主要是在上面提到的监听器里面被调用。 public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); // 这个地方用来获取定时线程的执行Future, // 如果该线程还没有执行完毕,则取消掉,释放资源,因为下面也会执行run方法 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } public void run() { try { // 刷新实例信息。 discoveryClient.refreshInstanceInfo(); // 判断实例信息是否不一致 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 注册自己的服务 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } }

从上面可以看到InstanceInfoReplicator是一个负责服务注册的线程任务, 有两个地方可以执行这个任务

  • 1、定时线程,每40秒执行一次。
  • 2、当instance的状态发生变更(除去DOWN这个状态)的时候,会有statusChangeListener 这个监听器监听到去执行服务注册。

注意:

  • 上面finally代码块中,注册完成后又会提交一个一次性的延时任务,这就相当于周期性的执行run方法了,这么一来岂不是会周期性注册?
  • 其实并不会,在执行discoveryClient.register()方法之前有个判断条件的:if (dirtyTimestamp != null),只要成员变量instanceInfo的isDirtyWithTime方法返回为空,就不会执行注册;

先看代码discoveryClient.refreshInstanceInfo(),弄清楚即将上报到Eureka server的信息是如何更新的,如下代码所示,信息更新的操作是委托给ApplicationInfoManager实例来完成的:

@Singleton public class DiscoveryClient implements EurekaClient { void refreshInstanceInfo() { //更新数据 applicationInfoManager.refreshDataCenterInfoIfRequired(); //如果续租时间有变化就要及时更新 applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); //如果获取状态异常,就设置当前状态为DOWN status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } } }

DiscoveryClient.register()

接下来看看服务注册相关的代码,也就是DiscoveryClient类的register方法,如下所示,源码注释中说到是注册请求类型是Restful的,Eureka server的返回码如果是204表示注册成功,然而在前面的discoveryClient.register()方法内,其实并不关注这个返回值:

@Singleton public class DiscoveryClient implements EurekaClient { /** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { //注册操作 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); } }

继续展开注册操作的源码eurekaTransport.registrationClient.register(instanceInfo),多层调用一路展开,最终由JerseyApplicationClient类来完成注册操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息POST到Eureka server:

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } } }

POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

Eureka-Server接收注册

ApplicationResource

接下来可以看一下,服务端是接收到请求之后是如何处理的。 程序入口: com.netflix.eureka.resources.ApplicationResource.addInstance()

@Produces({"application/xml", "application/json"}) public class ApplicationResource { @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields // 参数校验,不符合验证规则的,返回400状态码,此处不做详解 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } // 重点在这里 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } }

PeerAwareInstanceRegistryImpl

上面的register方法,最终调用的是PeerAwareInstanceRegistryImpl的方法

@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void register(final InstanceInfo info, final boolean isReplication) { // 租约的过期时间,默认90秒,也就是说当服务端超过90秒没有收到客户端的心跳,则主动剔除该节点。 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { // 如果客户端自定义了,那么以客户端为准 leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 节点注册 super.register(info, leaseDuration, isReplication); // 复制到同等服务节点上去 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } }

由上面可以知道,调用的是父类的register方法, 其父类是AbstractInstanceRegistry ,在了解具体的注册方法之前,需要先了解一下Lease这个对象,因为Eureka-Server最终处理注册信息的时候,都会转化为这个对象来处理。

Lease

public class Lease<T> { enum Action { Register, Cancel, Renew }; public static final int DEFAULT_DURATION_IN_SECS = 90; private T holder; private long evictionTimestamp; private long registrationTimestamp; private long serviceUpTimestamp; private volatile long lastUpdateTimestamp; private long duration; public Lease(T r, int durationInSecs) { holder = r; registrationTimestamp = System.currentTimeMillis(); lastUpdateTimestamp = registrationTimestamp; //durationInSecs为秒单位, 换算成毫秒 duration = (durationInSecs * 1000); } // 客户端续约时,更新最后的更新时间 , 用当前系统加上过期的时间 public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } // 服务下线时,更新服务下线时间 public void cancel() { if (evictionTimestamp <= 0) { evictionTimestamp = System.currentTimeMillis(); } } public void serviceUp() { if (serviceUpTimestamp == 0) { serviceUpTimestamp = System.currentTimeMillis(); } } public void setServiceUpTimestamp(long serviceUpTimestamp) { this.serviceUpTimestamp = serviceUpTimestamp; } public boolean isExpired() { return isExpired(0l); } public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } }

DEFAULT_DURATION_IN_SECS : 租约过期的时间常量,默认未90秒,也就说90秒没有心跳过来,那么这边将会自动剔除该节点holder :这个租约是属于谁的, 目前占用这个属性的是 instanceInfo,也就是客户端实例信息。

  • evictionTimestamp : 租约是啥时候过期的,当服务下线的时候,会过来更新这个时间戳
  • registrationTimestamp : 租约的注册时间
  • serviceUpTimestamp :服务启动时间 ,当客户端在注册的时候,instanceInfo的status 为UP的时候,则更新这个时间戳
  • lastUpdateTimestamp :最后更新时间,每次续约的时候,都会更新这个时间戳,在判断实例是否过期时,需要用到这个属性。
  • duration:过期时间,毫秒单位

AbstractInstanceRegistry

public abstract class AbstractInstanceRegistry implements InstanceRegistry { private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>(); protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder .newBuilder().initialCapacity(500) .expireAfterAccess(1, TimeUnit.HOURS) .<String, InstanceStatus>build().asMap(); public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 上只读锁 read.lock(); // 从本地MAP里面获取当前实例的信息。 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); // 增加注册次数到监控信息里面去。 REGISTER.increment(isReplication); if (gMap == null) { // 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); // putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。 // 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。 // 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { // 表明map中确实不存在,则设置gMap为最新创建的那个 gMap = gNewMap; } } // 从MAP中查询已经存在的Lease信息 (比如第二次来) Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease // 当Lease的对象不为空时。 if (existingLease != null && (existingLease.getHolder() != null)) { // 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。 // Eureka-Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数 synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } // 构建一个最新的Lease信息 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { // 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 放入本地Map中 gMap.put(registrant.getId(), lease); // 添加到最近的注册队列里面去,以时间戳作为Key,名称作为value,主要是为了运维界面的统计数据。 recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } // 分析instanceStatus InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // 得到instanceStatus,判断是否是UP状态, if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } // 设置注册类型为添加 registrant.setActionType(ActionType.ADDED); // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); // 清理缓存 ,传入的参数为key invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } }

至此,Eureka client向Eureka Server进行服务注册的源码就分析完毕了。

 

参考: https://xinchen.blog.csdn.net/article/details/82861618

https://www.jianshu.com/p/99621c342d06

https://www.cnblogs.com/zooking/p/13795257.html

上一篇:Spring Cloud——Eureka多级缓存机制
下一篇:没有了
网友评论