当前位置 : 主页 > 编程语言 > java >

Spring Cloud——Eureka多级缓存机制

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、Eureka Server Eureka Server为了避免同时读写内存数据结构造成的并发冲突问题,采用了多级缓存机制来进一步提升服务请求的响应速度。 Eureka Server存在三个变量:(registry、readWriteCa

一、Eureka Server

Eureka Server为了避免同时读写内存数据结构造成的并发冲突问题,采用了多级缓存机制来进一步提升服务请求的响应速度。

Eureka Server存在三个变量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息,默认情况下定时任务每30s将readWriteCacheMap同步至readOnlyCacheMap,每60s清理超过90s未续约的节点,Eureka Client每30s从readOnlyCacheMap更新服务注册信息,而UI则从registry更新服务注册信息。

三级缓存

缓存 类型 说明 registry ConcurrentHashMap 实时更新,类AbstractInstanceRegistry成员变量,UI端请求的是这里的服务注册信息 readWriteCacheMap Guava Cache/LoadingCache 实时更新,类ResponseCacheImpl成员变量,缓存时间180秒 readOnlyCacheMap ConcurrentHashMap 周期更新,类ResponseCacheImpl成员变量,默认每30s从readWriteCacheMap更新,Eureka client默认从这里更新服务注册信息,可配置直接从readWriteCacheMap更新

注意: readWriteCacheMap:是Guava缓存,数据主要同步于存储层即注册表registry 。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。默认180s过期,当服务下线、过期、注册、状态变更,都会来清除此缓存中的数据。

缓存工作方式:

###缓存相关配置

配置 默认 说明 eureka.server.useReadOnlyResponseCache true Client从readOnlyCacheMap更新数据,false则跳过readOnlyCacheMap直接从readWriteCacheMap更新 eureka.server.responsecCacheUpdateIntervalMs 30000 readWriteCacheMap更新至readOnlyCacheMap周期,默认30s eureka.server.evictionIntervalTimerInMs 60000 清理未续约节点周期,默认60s eureka.instance.leaseExpirationDurationInSeconds 90 清理未续约节点超时时间,默认90s

eureka server端的多级缓存机制

  • 重点看看eureka server端的多级缓存机制的过期失效机制。

在server端,关于过期,其实有3中机制,分别是主动过期,被动过期和定时过期。

1、主动过期

主动过期主要是针对RW缓存,有新的服务注册、下线、故障都会刷新RW缓存的Map。

 

比如有一个新实例来注册,在注册逻辑最后会调用invalidateCache方法,这个方法就是去过期掉RW缓存的Map。

 

Eureka Server在接受Eureka Client服务注册的流程,即AbstractInstanceRegistry类的register方法最后会调用invalidateCache方法清理缓存为入口

public abstract class AbstractInstanceRegistry implements InstanceRegistry { private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 上只读锁 read.lock(); // 从本地MAP里面获取当前实例的信息。 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); //省略中间代码。。。。。。 // 放入本地Map中 gMap.put(registrant.getId(), lease); //省略中间代码。。。。。。 // 设置注册类型为添加 registrant.setActionType(ActionType.ADDED); // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); // 清理缓存 ,传入的参数为key invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } } public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected volatile ResponseCache responseCache; private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { // 清除缓存 responseCache.invalidate(appName, vipAddress, secureVipAddress); } } public class ResponseCacheImpl implements ResponseCache { @Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } } }

在这里会调用readWriteCacheMap.invalidate(key)来过期RW缓存Map的数据,服务下线、故障都会走类似的逻辑。

public class ResponseCacheImpl implements ResponseCache { private final LoadingCache<Key, Value> readWriteCacheMap; public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } } }

2、被动过期

被动过期,主要是针对RO缓存,readOnlyCacheMap默认是每隔30秒,执行一个定时调度的线程任务,TimerTask,对readOnlyCacheMap和readWriteCacheMap中的数据进行一个比对,如果两块数据不一致的,那么就将readWriteCacheMap中的数据放到readOnlyCacheMap中来。

 

比如说readWriteCacheMap中,ALL_APPS这个key对应的缓存没了,那么最多30秒过后,就会同步到readOnelyCacheMap中去。

 

这段代码依然在ResponseCacheImpl的构造方法里,这个timer叫做一个eureka缓存填充的timer。

public class ResponseCacheImpl implements ResponseCache { private final java.util.Timer timer = new java.util.Timer("Eureka-CacheFillTimer", true); ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } } }

可以看到它的getCacheUpdateTask()方法直接返回一个TimerTask,就是完成RW缓存和RO缓存数据交互的逻辑。

public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); //如果RO缓存中的数据和RW不一致,则put if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; } }

而这个responseCacheUpdateIntervalMs,默认30s。

@Singleton public class DefaultEurekaServerConfig implements EurekaServerConfig { @Override public long getResponseCacheUpdateIntervalMs() { return configInstance.getIntProperty( namespace + "responseCacheUpdateIntervalMs", (30 * 1000)).get(); } }

3、定时过期

这个定时过期,实际上也是针对RW缓存的那个readWriteCacheMap的,在构建的时候会指定一个自动过期的时间,默认是180s,因此放入RW缓存中的数据默认会在3分钟之内过期掉。

public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); //省略部分代码...... } }

通过源码可以明确,这个getResponseCacheAutoExpirationInSeconds()的默认值就是180s。

@Singleton public class DefaultEurekaServerConfig implements EurekaServerConfig { @Override public long getResponseCacheAutoExpirationInSeconds() { return configInstance.getIntProperty( namespace + "responseCacheAutoExpirationInSeconds", 180).get(); } }

Eureka Client获取注册信息

Eureka Client获取注册信息通过ApplicationsResource类的getContainers方法为入口

@Path("/{version}/apps") @Produces({"application/xml", "application/json"}) public class ApplicationsResource { @GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. // EurekaServer无法提供服务,返回403 if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); // 设置返回数据格式,默认JSON KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; // 如果接收到的请求头部没有具体格式信息,则返回格式为XML if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } //创建一个缓存对象 构建缓存键 Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, //全量 keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; // 返回不同的编码类型的数据,去缓存中取数据的方法基本一致 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) //获取压缩的数据 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; } }

responseCache.getGZIP(cacheKey)

  • 从缓存中读取GZIP压缩数据。
public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; public byte[] getGZIP(Key key) { Value payload = getValue(key, shouldUseReadOnlyResponseCache); if (payload == null) { return null; } return payload.getGzipped(); } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { //首先从只读缓存中获取, 即readOnlyCacheMap final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { //只读缓存readOnlyCacheMap中没有,从readWriteCacheMap缓存中获取 payload = readWriteCacheMap.get(key); //回写只读缓存readOnlyCacheMap readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; } }

responseCache.get(cacheKey)

  • 从缓存中读取数据。
public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; public String get(final Key key) { return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { //首先从只读缓存中获取, 即readOnlyCacheMap final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { //只读缓存readOnlyCacheMap中没有,从readWriteCacheMap缓存中获取 payload = readWriteCacheMap.get(key); //回写只读缓存readOnlyCacheMap readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; } }

二、Eureka Client

Eureka Client存在两种角色:服务提供者和服务消费者,作为服务消费者一般配合Ribbon或Feign(Feign内部使用Ribbon)使用。Eureka Client启动后,作为服务提供者立即向Eureka Server注册,默认情况下每30s续约;作为服务消费者立即向Server全量更新服务注册信息,默认情况下每30s增量更新服务注册信息;Ribbon延时1s向Client获取使用的服务注册信息,默认每30s更新使用的服务注册信息,只保存状态为UP的服务。

二级缓存

缓存 类型 说明 localRegionApps AtomicReference 周期更新,类DiscoveryClient成员变量,Eureka Client保存服务注册信息,启动后立即向Server全量更新,默认每30s增量更新 upServerListZoneMap ConcurrentHashMap 周期更新,类LoadBalancerStats成员变量,Ribbon保存使用且状态为UP的服务注册信息,启动后延时1s向Client更新,默认每30s更新

缓存相关配置

配置 默认 说明 eureka.instance.leaseRenewalIntervalInSeconds 30 Eureka Client续约周期,默认30s eureka.client.registryFetchIntervalSeconds 30 Eureka Client增量更新周期,默认30s(正常情况下增量更新,超时或与Server端不一致等情况则全量更新) ribbon.ServerListRefreshInterval 30000 Ribbon更新周期,默认30s

EurekaClient 缓存

EurekaClient也存在缓存,应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的,Ribbon的LoadBalancer会读取这个缓存,来知道当前有哪些实例可以调用,从而进行负载均衡。这个loadbalancer同样也有缓存。

 

首先看这个LoadBalancer的缓存更新机制,相关类是PollingServerListUpdater:

public class PollingServerListUpdater implements ServerListUpdater { @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //从EurekaClient缓存中获取服务实例列表,保存在本地缓存 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; // 使用线程池周期性的执行wrapperRunnable任务 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } }

DynamicServerListLoadBalancer.updateListOfServers()代码逻辑

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { public DynamicServerListLoadBalancer(IClientConfig clientConfig) { class NamelessClass_1 implements UpdateAction { public void doUpdate() { DynamicServerListLoadBalancer.this.updateListOfServers(); } } } @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList(); if (this.serverListImpl != null) { servers = this.serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); if (this.filter != null) { servers = this.filter.getFilteredListOfServers((List)servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); } } this.updateAllServerList((List)servers); } }

serverListImpl.getUpdatedListOfServers()会调用DiscoveryEnabledNIWSServerList.obtainServersViaDiscovery()方法获取servers集合

DiscoveryEnabledNIWSServerList.obtainServersViaDiscovery()方法

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{ @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; } }

从代码中可以看到,listOfInstanceInfo持有从DiscoveryClient.LocalRegionApps/remoteRegionVsApps获取到的信息后,与region和zone结合形成DiscoveryEnabledServer实例,流入到List集合返回

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected void updateAllServerList(List<T> ls) { if (this.serverListUpdateInProgress.compareAndSet(false, true)) { try { Iterator var2 = ls.iterator(); while(var2.hasNext()) { T s = (Server)var2.next(); s.setAlive(true); } //调用setServersList方法 this.setServersList(ls); super.forceQuickPing(); } finally { this.serverListUpdateInProgress.set(false); } } } public void setServersList(List lsrv) { // 赋值给BaseLoadBalacer.upServerList super.setServersList(lsrv); Map<String, List<Server>> serversInZones = new HashMap(); Iterator var4 = lsrv.iterator(); while(var4.hasNext()) { Server server = (Server)var4.next(); // 赋值给LoadBalancerStats.zoneStatsMap this.getLoadBalancerStats().getSingleServerStat(server); String zone = server.getZone(); if (zone != null) { zone = zone.toLowerCase(); List<Server> servers = (List)serversInZones.get(zone); if (servers == null) { servers = new ArrayList(); serversInZones.put(zone, servers); } ((List)servers).add(server); } } this.setServerListForZones(serversInZones); } protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) { LOGGER.debug("Setting server list for zones: {}", zoneServersMap); //更新upServerListZoneMap缓存 this.getLoadBalancerStats().updateZoneServerMapping(zoneServersMap); } } public class LoadBalancerStats implements IClientConfigAware { volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>(); volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(); public void updateZoneServerMapping(Map<String, List<Server>> map) { upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(map); // make sure ZoneStats object exist for available zones for monitoring purpose for (String zone: map.keySet()) { //更新zoneStatsMap getZoneStats(zone); } } private ZoneStats getZoneStats(String zone) { zone = zone.toLowerCase(); ZoneStats zs = zoneStatsMap.get(zone); if (zs == null){ zoneStatsMap.put(zone, new ZoneStats(this.getName(), zone, this)); zs = zoneStatsMap.get(zone); } return zs; } }

这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表,保存在BaseLoadBalancer的本地缓存,入口在DynamicServerListLoadBalancer的setServersList方法的super.setServersList(lsrv)方法处:

public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) { //写入allServerList的代码,这里略 } @Override public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); } }

这里的getAllServers会在每个负载均衡规则中被调用,例如RoundRobinRule:

public class RoundRobinRule extends AbstractLoadBalancerRule { public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); //获取服务实例列表,调用的就是刚刚提到的getAllServers List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } }

这个缓存需要注意下,有时候我们只修改了EurekaClient缓存的更新时间,但是没有修改这个LoadBalancer的刷新本地缓存时间,就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小,因为没有从网络读取,就是从一个本地缓存刷到另一个本地缓存。

 

然后我们来看一下EurekaClient本身的缓存,直接看关键类DiscoveryClient的相关源码,我们这里只关心本地Region的,多Region配置我们先忽略:

@Singleton public class DiscoveryClient implements EurekaClient { //本地缓存,可以理解为是一个软链接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); /** * 初始化所有计划的任务 */ private void initScheduledTasks() { //如果配置为需要拉取服务列表,则设置定时拉取任务,这个配置默认是需要拉取服务列表 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } //其他定时任务初始化的代码,忽略 } //定时从EurekaServer拉取服务列表的任务 class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { //多Region配置处理代码,忽略 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //日志代码,忽略 } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } //定时从EurekaServer拉取服务列表的核心方法 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); //判断,如果是第一次拉取,或者app列表为空,就进行全量拉取,否则就会进行增量拉取 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); //全量拉取更新 getAndStoreFullRegistry(); } else { //增量拉取更新 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //缓存更新完成,发送个event给观察者 onCacheRefreshed(); // 检查下远端的服务实例列表里面包括自己,并且状态是否对 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } }

全量更新本地缓存的服务列表

getAndStoreFullRegistry方法负责全量更新,代码如下所示,非常简单的逻辑:

@Singleton public class DiscoveryClient implements EurekaClient { //本地缓存,可以理解为是一个软链接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; //由于并没有配置特别关注的region信息, //因此会调用eurekaTransport.queryClient.getApplications方法从服务端获取服务列表 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //返回对象就是服务列表 apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); //考虑到多线程同步,只有CAS成功的线程,才会把自己从Eureka server获取的数据来替换本地缓存 } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //localRegionApps就是本地缓存,是个AtomicReference实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } }

获取服务列表信息的增量更新

获取服务列表信息的增量更新是通过getAndUpdateDelta方法完成的,具体分析请看下面的中文注释:

@Singleton public class DiscoveryClient implements EurekaClient { //本地缓存,可以理解为是一个软链接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; //增量信息是通过eurekaTransport.queryClient.getDelta方法完成的 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //delta中保存了Eureka server返回的增量更新 delta = httpResponse.getEntity(); } if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); //如果增量信息为空,就直接发起一次全量更新 getAndStoreFullRegistry(); } //考虑到多线程同步问题,这里通过CAS来确保请求发起到现在是线程安全的, //如果这期间fetchRegistryGeneration变了,就表示其他线程也做了类似操作,因此放弃本次响应的数据 else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { //用Eureka返回的增量数据和本地数据做合并操作,这个方法稍后会细说 updateDelta(delta); //用合并了增量数据之后的本地数据来生成一致性哈希码 reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } //Eureka server在返回增量更新数据时,也会返回服务端的一致性哈希码, //理论上每次本地缓存数据经历了多次增量更新后,计算出的一致性哈希码应该是和服务端一致的, //如果发现不一致,就证明本地缓存的服务列表信息和Eureka server不一致了,需要做一次全量更新 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { //一致性哈希码不同,就在reconcileAndLogDifference方法中做全量更新 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } } }

上面就是对于EurekaClient拉取服务实例信息的源代码分析:

  • EurekaClient第一次全量拉取,定时增量拉取应用服务实例信息,保存在缓存中。
  • EurekaClient增量拉取失败,或者增量拉取之后对比hashcode发现不一致,就会执行全量拉取,这样避免了网络某时段分片带来的问题。
  • 同时对于服务调用,如果涉及到ribbon负载均衡,那么ribbon对于这个实例列表也有自己的缓存,这个缓存定时从EurekaClient的缓存更新

参考: https://blog.csdn.net/qq_40378034/article/details/103850144

https://www.processon.com/view/5d4e871ce4b04399f59f9e22

https://blog.csdn.net/Josh_scott/article/details/119150421

https://my.oschina.net/u/3747772/blog/1588958

https://yaoyuanyy.github.io/2019/04/10/springcloud%20ribbon%E5%90%8C%E6%AD%A5eureka%20server%E6%9C%8D%E5%8A%A1%E5%88%97%E8%A1%A8%E4%B8%8E%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1%E7%AE%97%E6%B3%95%E5%88%86%E6%9E%90/

https://www.pianshen.com/article/96662003058/

https://www.cnblogs.com/ZenoLiang/p/13677493.html

上一篇:Spring Cloud——Eureka服务续约(心跳机制)
下一篇:没有了
网友评论