服务同步
服务同步是Server节点之间的数据同步。分为启动时同步,运行时同步。
启动同步
在EurekaServerBootstrap类中的初始化上下文方法initEurekaServerContext()方法中会调用PeerAwareInstanceRegistry.syncUp()方法从邻近的eureka节点复制注册表。
启动同步时,会先遍历Applications中获取的服务信息,并将服务信息注册到registry中。可以参考PeerAwareInstanceRegistryImpl类中的syncUp方法:
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public int syncUp() { // 获取到的注册节点数量 int count = 0; // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0) for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { // 从第二次开始,每次默认沉睡30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 从本地内存里面获取注册实例信息 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { // 判断是否可以注册 if (isRegisterable(instance)) { // 注册到当前Eureka Server里面 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; } }参数说明:
-
regirstrySyncRetries:当eureka服务器启动时尝试去获取集群里其他服务器上的注册信息的次数,默认为5,只有当 eureka.client.register-with-eureka = true 的时候才会是5,如果是false ,则为0。
-
registrySyncRetryWaitMs:当eureka服务器启动时获取其他服务器的注册信息失败时,会再次尝试获取,期间需要等待的时间,默认为30 * 1000毫秒。
-
count:获取到的注册实例数量,如果为0 则根据重试次数进行重试,每次重试前沉默 30秒。
讲过Eureka Client启动的时候默认会自动从Eureka Server获取注册信息, 要想Eureka Server在启动的时候可以同步其他集群节点的注册信息,那么必须开启客户端配置。
# 是否作为一个Eureka Client 注册到Eureka Server上去 eureka.client.register-with-eureka = true # 是否需要从Eureka Server上拉取注册信息到本地。 eureka.client.fetch-registry = true只有开启了上面两个配置,那么集群节点在启动的时候,会初始化Eureka Client端的配置 ,会从其他Eureka Server拉取注册信息到本地,同时在初始化Eureka Server的时候,会从本地内存里面读取 注册信息,自动注册到本身的服务上。
集群同步类型
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { public enum Action { Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride; private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name()); public com.netflix.servo.monitor.Timer getTimer() { return this.timer; } } } Heartbeat : 心跳续约 Register : 注册 Cancel : 下线 StatusUpdate : 添加覆盖状态 DeleteStatusOverride : 删除覆盖状态运行时同步
server端当有reigster、renew、cancel请求进来时,会将这些请求封装到一个task中,然后放到一个队列当中,然后经过一系列的处理后,在放到另一个队列中。 可以查看PeerAwareInstanceRegistryImpl类中的BatchWorkerRunnable类。
发起同步
这里以注册的代码为例
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { //默认租约90s,如果用户更改了心跳周期等,使用用户自定义的租约 leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 发起注册 调用父类的注册,注册到本地双层Map中 super.register(info, leaseDuration, isReplication); // 注册完成后,在这里发起同步,同步类型为Register //本地注册完成后,向其他节点复制,注意isReplication这个属性 //用来判断是client发来的注册,还是其他Eureka Server临节点复制过来的注册 //如果是复制过来的注册信息,那么就不再向其他Eureka Server节点进行传播复制 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数 if (isReplication) { //记录每分钟收到的复制次数 numberOfReplicationsLastMin.increment(); } // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { //如果没有Eureka Server临节点,或者是别的Eureka Server复制过来的信息 //那么就不再向其他临节点进行复制, //也就是说既然收到了复制过来的信息,那么其他eureka server节点也会收到 //所以就没必要再去发送一遍复制了,return。 return; } // 循环相邻的Eureka Server Node, 分别发起请求同步 // 遍历所有的Eureka Server邻节点,向它们复制register、cancel等信息 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 判断是否是自身的URL,过滤掉 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 发起同步请求 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } }步骤说明:
-
1、判断集群节点是否为空,为空则返回
-
2、isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求,这个时候是不需要继续往下同步的。否则会陷入同步死循环。
-
3、循环集群节点,过滤掉自身的节点。
-
4、发起同步请求 ,调用replicateInstanceActionsToPeers。
一个Eureka Server在收到了Client的注册等信息时,会挨个通知其他Eureka Server临节点,复制的流程图也就是下面这个样子
发起同步请求 ,调用replicateInstanceActionsToPeers
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 下线 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); // 获取本地最新的实例信息 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: // 设置覆盖状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: //删除覆盖状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { //由于此方法是循环复制操作,如果发生异常不进行处理,直接抛出,那么就不会向后面的节点复制了 //比如有10个Eureka Server节点,再向第2个复制的时候抛出异常,后面8个节点就收不到复制信息 //这个地方,只是做log,并没有抛出异常 logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } } }PeerEurekaNode的register方法如下:
public class PeerEurekaNode { public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); // 默认采用的是批处理 batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); } }默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率。
批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor
class ReplicationTaskProcessor implements TaskProcessor<ReplicationTask> { @Override public ProcessingResult process(List<ReplicationTask> tasks) { // 构建ReplicationInstance放入ReplicationList ReplicationList list = createReplicationListOf(tasks); try { // 发起批量处理请求 EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list); int statusCode = response.getStatusCode(); if (!isSuccess(statusCode)) { if (statusCode == 503) { logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId); return ProcessingResult.Congestion; } else { // Unexpected error returned from the server. This should ideally never happen. logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size()); return ProcessingResult.PermanentError; } } else { // 处理执行结果,成功则调用handleSuccess ,失败则调用handleFailure。 handleBatchResponse(tasks, response.getEntity().getResponseList()); } } catch (Throwable e) { if (maybeReadTimeOut(e)) { logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e); //read timeout exception is more Congestion then TransientError, return Congestion for longer delay return ProcessingResult.Congestion; } else if (isNetworkConnectException(e)) { logNetworkErrorSample(null, e); return ProcessingResult.TransientError; } else { logger.error("Not re-trying this exception because it does not seem to be a network exception", e); return ProcessingResult.PermanentError; } } return ProcessingResult.Success; } }请求批量处理的接口地址 : peerreplication/batch/
handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循环调用处理结果,成功则调用handleSuccess. , 失败则调用handleFailure , 比如hearbeat的时候,调用返回码为404的时候,会重新发起注册。
public class PeerEurekaNode { public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable { if (primeConnection) { // We do not care about the result for priming request. replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); return; } ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { @Override public EurekaHttpResponse<InstanceInfo> execute() throws Throwable { return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); if (info != null) { logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", getTaskName(), info.getId(), info.getStatus()); // 重新发起注册。 register(info); } } else if (config.shouldSyncWhenTimestampDiffers()) { InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; if (peerInstanceInfo != null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } }; long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime); } }Eureka Server接收同步
程序入口 : com.netflix.eureka.resources.PeerReplicationResource
@Path("/{version}/peerreplication") @Produces({"application/xml", "application/json"}) public class PeerReplicationResource { @Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); // 循环请求的任务 for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { // 分发任务,同时将处理结果收集起来,等会统一返回 batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error("{} request processing failed for batch item {}/{}", instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { // 创建实例 ApplicationResource applicationResource = createApplicationResource(instanceInfo); // 创建实例 InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); //获取客户端instance的lastDirtyTimestamp ,有点类似于版本号的概念。 String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); // 获取覆盖状态 String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); // 获取instance的状态 String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: // 注册 singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: // 心跳续约 singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: // 下线 singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: // 修改覆盖状态 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: // 删除覆盖状态 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); } }以上五个场景,这里就不一一说了,就说一下注册吧:
@Path("/{version}/peerreplication") @Produces({"application/xml", "application/json"}) public class PeerReplicationResource { private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) { applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION); return new Builder().setStatusCode(Status.OK.getStatusCode()); } @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } }EPLICATION = “true” ,此次请求为true,表示是一个服务端的复制请求。
由上面可以知道,集群同步走的和客户端注册的后续流程是一样的,只不过isReplication=true , 表明这是一个集群同步的请求。
服务剔除
服务剔除其实是一个兜底的方案,目的就是解决非正常情况下的服务宕机或其他因素导致不能发送cancel请求的服务信息清理的策略。
服务剔除分为:
- 判断剔除条件
- 找出过期服务
- 清理过期服务
剔除条件:
-
关闭自我保护
-
自我保护如果开启,会先判断是server还是client出现问题,如果是client的问题就会进行删除
-
自我保护机制:Eureka的自我保护机制是为了防止误杀服务提供的一种保护机制。Eureka的自我保护机制认为如果有大量的服务都续约失败,则认为自己出现了问题(例如:自己断网了),也就不剔除了。反之,则是它人的问题,就进行剔除。
自我保护的阈值分为server和client,如果超出阈值就是表示大量服务可用,部分服务不可用,这判定为client端出现问题。如果未超出阈值就是表示大量服务不可用,则判定是自己出现了问题。
阈值的计算:
- 自我保护阈值 = 服务总数 每分钟续约数 自我保护阈值因子;
- 每分钟续约数 = (60s / 客户端续约时间);
过期服务:
找出过期服务会遍历所有的服务,判断上次续约时间距离当前时间大于阈值就标记为过期,同时会将这些过期的服务保存的过期的服务集合中。
剔除服务:
剔除服务之前会先计算要是剔除的服务数量,然后遍历过期服务,通过洗牌算法确保每次都公平的选择出要剔除的服务,然后进行剔除。
执行剔除服务后:
- 从register中删除服务信息;
- 更新队列;
- 清空二级缓存,保证数据的一致性;
开启服务剔除
在EurekaServerBootstrap类中的初始化上下文方法initEurekaServerContext()方法中会调用PeerAwareInstanceRegistry.openForTraffic()方法开启服务剔除。
public class EurekaServerBootstrap { protected void initEurekaServerContext() throws Exception { //省略。。。。。。 // 从邻近的eureka节点复制注册表 int registryCount = this.registry.syncUp(); // 默认每30秒发送心跳,1分钟就是2次 // 修改eureka状态为up // 同时,这里面会开启一个定时任务,用于清理 60秒没有心跳的客户端。自动下线 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 注册所有监控统计信息 EurekaMonitors.registerAllStats(); } }registry.openForTraffic
修改eureka状态为up,同时开启一个定时任务,用于清理 60秒没有心跳的客户端
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 计算每分钟最大续约数 this.expectedNumberOfClientsSendingRenews = count; // 每分钟最小续约数 updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 设置实例的状态为UP applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例 super.postInit(); } // 每分钟最小续约数 protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); } }Eureka Server服务剔除
从上面代码可知Eureka Server 的服务剔除,它是通过定时任务完成的,在EurekaBootStrap启动引导的initEurekaServerContext上下文初始化方法中,调用了这么一行代码registry.openForTraffic(applicationInfoManager, registryCount);在该方法中又调用了com.netflix.eureka.registry.AbstractInstanceRegistry#postInit方法来初始化服务剔除的定时任务。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { //如果服务剔除任务不为空,就执行cancel方法,该方法把任务的状态修改为了cancel任务取消 evictionTaskRef.get().cancel(); } //创建新的服务剔除任务 evictionTaskRef.set(new EvictionTask()); //交给调度器去执行,延迟60s,每60s执行一次驱逐任务 evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), //60s 逐出间隔计时器 serverConfig.getEvictionIntervalTimerInMs()); } }EvictionTask定时任务
public abstract class AbstractInstanceRegistry implements InstanceRegistry { class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { //计算任务执行的时间偏差:补偿时间 long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); //执行驱逐 evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } } }在驱逐任务中,计算了任务执行的时间偏差即补偿时间,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)执行服务的剔除逻辑
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { //如果没启用租约到期,直接返回 logger.debug("DS: lease expiration is currently disabled."); return; } //首先收集所有过期的服务,以随机顺序将其逐出 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); //循环注册表中的所有的服务 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { //获取到租约 for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); //如果服务过期,就把服务添加到expiredLeases map中 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } //为了补偿GC暂停或本地时间差异导致的剔除任务执行时间差异,使用当前注册表大小作为触发自我保存的基础 //否则,将清除完整的注册表。 //注册表大小 int registrySize = (int) getLocalRegistrySize(); //注册表中服务的续约阈值 = 注册大小 * 0.85 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); //驱逐极限 = 注册表大小 - 注册表续约阈值 int evictionLimit = registrySize - registrySizeThreshold; //过期的服务数 和 evictionLimit 取最小 ,如果大于 0 说明需要有服务要剔除 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { //剔除 toEvict 个 logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); //取随机值 Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { //选择一个随机项目(Knuth随机算法),随机剔除 int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); //获取剔除服务的Lease Lease<InstanceInfo> lease = expiredLeases.get(i); //应用名 String appName = lease.getHolder().getAppName(); //实例ID String id = lease.getHolder().getId(); //expired Counter 过期计数增加 EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } } }这里做了如下事情
- 1、判断是否开启过期驱逐
- 2、获取到所有的过期的服务,通过Lease.isExpired判断过期
- 3、计算一个驱逐极限值 :min( 过期数 ,注册表服务数 - 注册表服务数 * 0.85(续约阈值百分比) )
- 4、如果驱逐极限值 > 0 ,那就从过期的服务中随机驱逐 “驱逐极限”个服务
- 5、调用internalCancel方法消息服务
我们可以看下Lease.isExpired是如何判断实例过期的
public class Lease<T> { public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } }这里给的过期计算方式是: evictionTimestamp (剔除时间戳) > 0 || 最后更新时间戳 + 租期(90s) + 补偿时间 。
但是有意思的是这个方法上的注释说了一个问题:它说由于renew()做了“错误”的事情,将lastUpdateTimestamp设置为+duration,超过了它应该的值,因此到期实际上是2 * duration,这个是个小问题,没有什么影响就没做修改。意思是renew方法中的lastUpdateTimestamp时间 不应该 + duration租期时间,这超过了它应该的值,因此到期实际上是2 * duration。
public class Lease<T> { public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } }当然这个问题没有太大影响,所以没做改正,注释上也说明白了这个问题,这个方法我们就看到这。
继续看一下internalCancel内部取消服务的方法
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected boolean internalCancel(String appName, String id, boolean isReplication) { try { //上锁 read.lock(); //服务取消数增加 CANCEL.increment(isReplication); //获取当前剔除的服务 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { //从服务注册的map中移除掉当前服务 leaseToCancel = gMap.remove(id); } //添加到最近取消队列 recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); //overriddenInstanceStatusMap 服务状态map中移除当前服务 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { //没找到服务 CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { //调用Lease.cancel方法 leaseToCancel.cancel(); //获取服务实例信息 InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { //实例状态修改为删除 instanceInfo.setActionType(ActionType.DELETED); //添加最近修改队列 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); //实例信息对象修改最后修改时间 instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } //使缓存无效,调用responseCache.invalidate让服务在缓存中失效 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { read.unlock(); } synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; } }这里主要是根据当前要取消的服务名从registry中查询出服务之后做了这些事情
- 1、从registry中移除服务,
- 2、从overriddenInstanceStatusMap状态map中移除服务状态
- 3、添加到最近取消队列
- 4、调用Lease.cancel方法,将租约对象中的逐出时间修改为当前时间
- 5、修改服务的InstanceInfo的状态为DELETE
- 6、添加到最近修改队列
- 7、更新服务最后修改时间
- 8、使ReponseCache缓存无效
服务剔除总结
参考: https://segmentfault.com/a/1190000021284890
https://blog.csdn.net/qq_36960211/article/details/85278254
https://blog.csdn.net/u014494148/article/details/108900884
https://www.jianshu.com/p/b8c614c442e0