当前位置 : 主页 > 编程语言 > java >

Spring Cloud——Eureka服务续约(心跳机制)

来源:互联网 收集:自由互联 发布时间:2023-02-04
前言 Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事: 1、周期性更新服务列表; 3、周期性服务续约; 3、服务注册逻辑; 概览 以

前言

Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事:

  • 1、周期性更新服务列表;
  • 3、周期性服务续约;
  • 3、服务注册逻辑;

概览

以下图片来自Netflix官方,图中显示Eureka Client会发起Renew向注册中心做周期性续约,这样其他Eureka client通过Get Registry请求就能获取到新注册应用的相关信息:

来自官方文档的指导信息

最准确的说明信息来自Netflix的官方文档,地址: https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew

关于续约的理解:

  • 1、Eureka client每隔三十秒发送一次心跳到Eureka server,这就是续约;
  • 2、Eureka client续约的目的是告诉Eureka server自己还活着;
  • 3、Eureka server若90秒内未收到心跳,就从自己的服务列表中剔除该Eureka client;
  • 4、建议不要改变心跳间隔,因为Eureka server是通过心跳来判断Eureka client是否正常;

服务续约执行简要流程图

下面这张图大致描述了服务续约从Client端到Server端的大致流程,详情如下:

Eureka 续约源码分析

1、Eureka Client发起续约

Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期。

 

默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会。

1)、初始化定时任务

Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下

@Singleton public class DiscoveryClient implements EurekaClient { /** * 初始化所有计划的任务 */ private void initScheduledTasks() { //获取注册信息的定时任务 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); //心跳定时任务 // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); //服务实例同步定时任务 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; // 注册应用实例状态变更监听器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //初始化定时服务注册任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } }

2)、发起续约

@Singleton public class DiscoveryClient implements EurekaClient { //最后成功向Eureka Server心跳时间戳 private volatile long lastSuccessfulHeartbeatTimestamp = -1; private class HeartbeatThread implements Runnable { public void run() { // 调用续约方法 if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } //服务续约 boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //发Restful请求,即心跳 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); //404错误会触发注册逻辑 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } //返回码200表示心跳成功 return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } } }

AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约。

 

继续展开上面代码段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源码在EurekaHttpClientDecorator类中:

@Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus) { return execute(new RequestExecutor<InstanceInfo>() { @Override public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) { //网络处理委托给代理类完成 return delegate.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public RequestType getRequestType() { //请求类型为心跳 return RequestType.SendHeartBeat; } }); }

继续展开delegate.sendHeartBeat,多层调用一路展开,最终由JerseyApplicationClient类来完成操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息PUT到Eureka server,注意:这里不是POST,也不是GET,而是PUT:

@Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { //请求参数有两个:Eureka client自身状态、自身关键信息(状态、元数据等)最后一次变化的时间 WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //注意:这里不是POST,也不是GET,而是PUT response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity() && !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }

至此,Eureka client向服务续租的源码就分析完毕了,过程相对简单,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等实例各司其职,定时发送PUT请求到Eureka server。

2、Eureka Server接收续约

Eureka Server接收续约核心流程如下图:

1)、接收续约请求

@Produces({"application/xml", "application/json"}) public class InstanceResource { @PUT public Response renewLease( // 是否是Replication模式 复制,同步 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, // 实例的覆盖状态 @QueryParam("status") String status, // 实例状态 // 实例信息在EurekClient端上次被修改的时间 @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); // 续约 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register // 续租失败,返回404,EurekaClient端收到404后会发起注册请求 if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value // 比较InstanceInfo的lastDirtyTimestamp属性 Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { // 验证传入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同 response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { // 续约成功,返回200 response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; } }

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息

@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { public boolean renew(final String appName, final String id, final boolean isReplication) { // 调用父类里的renew(appName, id, isReplication)方法续约 if (super.renew(appName, id, isReplication)) { // 如果是续约请求则向其他EurekaServer节点同步续约信息 // 如果是同步信息请求则直接返回 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; } }

2)、续约应用实例信息

调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry { public boolean renew(String appName, String id, boolean isReplication) { // 增加续约次数到监控 RENEW.increment(isReplication); // 获取应用名对应的租约,即根据实例名称取出实例信息集合 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { // 根据实例id取出具体实例租约信息 leaseToRenew = gMap.get(id); } // 租约不存在 if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); // 获得实例的覆盖状态 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); // 实例覆盖状态为UNKNOWN,续租失败 if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } // 实例状态与覆盖状态不一致 if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); // 强行把实例的覆盖状态设为实例状态 // 即status = overriddenInstanceStatus instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } // 新增续租每分钟次数 renewsLastMin.increment(); // 续租(设置lastUpdateTimestamp(租约最后更新时间)) leaseToRenew.renew(); return true; } } } public class Lease<T> { enum Action { Register, Cancel, Renew }; private volatile long lastUpdateTimestamp; public void renew() { // 设置租约最后更新时间戳 lastUpdateTimestamp = System.currentTimeMillis() + duration; } }

续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁。

3)、eureka引入overriddenstatus用来解决状态被覆盖问题

客户端调用updateStatus方法时,同时更新server端实例的status和overriddenStatus状态。

 

客户端调用renew方法时,也要更新server端实例的status和overriddenstatus状态,但是有以下规则的

  • (1):如果客户端上传的实例状态是down或者starting,表明客户端是重启或者healthCheck失败。此时这个实例不能作为服务提供服务。因此即使客户端调用updateStatus把实例状态更新为up,也是没用的。此时客户端实例的准确状态就是down或者starting。

  • (2):如果客户端的实例是up或者out_of_service,此时是不可信的。就像第二大节介绍的那样。有可能client端的实例状态已被改变,此时要使用overriddenstatus状态作为当前实例的状态,避免被覆盖。

  • (3):(2)中的overriddenstatus有可能不存在,缓存失效,此时要使用server端已经存在的实例的状态。

参考: https://xinchen.blog.csdn.net/article/details/82915355

https://blog.csdn.net/qq_40378034/article/details/119079180

https://blog.csdn.net/NEW_BUGGER/article/details/93710797

https://www.cnblogs.com/liujunj/p/13401808.html

上一篇:Spring Cloud——Eureka服务实例下线
下一篇:没有了
网友评论