当前位置 : 主页 > 编程语言 > java >

Spring Cloud Alibaba——Nacos AP一致性策略 Distro

来源:互联网 收集:自由互联 发布时间:2023-02-04
Eureka 一致性策略 Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故

Eureka 一致性策略

Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存

@Singleton public class DiscoveryClient implements EurekaClient { private boolean fetchRegistryFromBackup() { try { @SuppressWarnings("deprecation") BackupRegistry backupRegistryInstance = newBackupRegistryInstance(); if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used. backupRegistryInstance = backupRegistryProvider.get(); } if (null != backupRegistryInstance) { Applications apps = null; if (isFetchingRemoteRegionRegistries()) { String remoteRegionsStr = remoteRegionsToFetch.get(); if (null != remoteRegionsStr) { apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(",")); } } else { apps = backupRegistryInstance.fetchRegistry(); } if (apps != null) { final Applications applications = this.filterAndShuffle(apps); applications.setAppsHashCode(applications.getReconcileHashCode()); localRegionApps.set(applications); logTotalInstances(); logger.info("Fetched registry successfully from the backup"); return true; } } else { logger.warn("No backup registry instance defined & unable to find any discovery servers."); } } catch (Throwable e) { logger.warn("Cannot fetch applications from apps although backup registry was specified", e); } return false; } }

正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)

Nacos AP一致性策略——Distro

Nacos在AP模式下的一致性策略就类似于Eureka,采用Server之间互相的数据同步来实现数据在集群中的同步、复制操作。

触发数据广播

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final GlobalConfig globalConfig; private final DistroProtocol distroProtocol; @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } }

当服务注册和注销实例,即InstanceController的register和deregister方法被调用时,ServiceManager类中addInstance或removeInstance方法调用ConsistencyService接口定义的put、remove方法时,涉及到了Server端数据的变更,此时会创建一个任务DistroDelayTask,将数据的key封装到DistroDelayTask中。

@Component public class DistroProtocol { public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { //遍历所有服务ip。进行数据同步 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); //创建延迟任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //其实也是一个定时任务 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } } }

然后将DistroDelayTask提交到Nacos的延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中存储,并且NacosDelayTaskExecuteEngine在其构造方法中初始化ScheduledExecutorService线程池并提交一个ProcessRunnable任务去取出tasks中的AbstractDelayTask任务进行处理。

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> { private final ScheduledExecutorService processingExecutor; protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; protected final ReentrantLock lock = new ReentrantLock(); //NacosDelayTaskExecuteEngine实例化 public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); //初始化tasks tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); //创建定时任务线程池 processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //每间隔一段时间定时执行任务ProcessRunnable processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } @Override public void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); //存在相同任务则合并 if (null != existTask) { newTask.merge(existTask); } //将AbstractDelayTask存入tasks中 tasks.put(key, newTask); } finally { lock.unlock(); } }

ProcessRunnable线程处理存入延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中的任务

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> { private final ScheduledExecutorService processingExecutor; protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; /** * process tasks in execute engine. */ protected void processTasks() { //获取tasks中的所有key Collection<Object> keys = getAllTaskKeys(); //遍历任务key集合 for (Object taskKey : keys) { //删除tasks中的AbstractDelayTask任务 AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //获取NacosTaskProcessor,这里实质获取的是DistroDelayTaskProcessor //在DistroHttpRegistry中的doRegister方法中提前设置进去了DistroHttpDelayTaskProcessor //但是taskKey最为key会取不到DistroHttpDelayTaskProcessor //而DistroTaskEngineHolder类的构造函数中设置了DistroDelayTaskExecuteEngine类 //中的DefaultTaskProcessor为DistroDelayTaskProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //DistroHttpDelayTaskProcessor执行AbstractDelayTask任务 if (!processor.process(task)) { //执行失败,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行 retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); //执行异常,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行 retryFailedTask(taskKey, task); } } } private void retryFailedTask(Object key, AbstractDelayTask task) { //更新最后执行时间 task.setLastProcessTime(System.currentTimeMillis()); //并将该任务重新设置进tasks中,便于后续继续执行 addTask(key, task); } private class ProcessRunnable implements Runnable { @Override public void run() { try { //处理任务 processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } } }

在DistroTaskEngineHolder类的构造函数中设置了DistroDelayTaskExecuteEngine类中的DefaultTaskProcessor为DistroDelayTaskProcessor

public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implements NacosTaskExecuteEngine<T> { private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>(); private NacosTaskProcessor defaultTaskProcessor; @Override public NacosTaskProcessor getProcessor(Object key) { //获取任务处理器 return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor; } } @Component public class DistroHttpRegistry { private final DistroComponentHolder componentHolder; private final DistroTaskEngineHolder taskEngineHolder; @PostConstruct public void doRegister() { componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroDataStorageImpl(dataStore, distroMapper)); componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager)); componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpCombinedKeyTaskFailedHandler(globalConfig, taskEngineHolder)); //DistroHttpRegistry实例化后设置了DistroHttpDelayTaskProcessor taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder)); componentHolder.registerDataProcessor(consistencyService); } } @Component public class DistroTaskEngineHolder { private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine(); public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); //设置了DistroDelayTaskExecuteEngine类中的DefaultTaskProcessor为DistroDelayTaskProcessor delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor); } }

DistroDelayTaskProcessor执行AbstractDelayTask任务

  • 在DistroDelayTaskProcessor中process方法中创建了DistroSyncChangeTask任务
public class DistroDelayTaskProcessor implements NacosTaskProcessor { private final DistroTaskEngineHolder distroTaskEngineHolder; @Override public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { //又创建了一个定时任务 DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false; } }

DistroSyncChangeTask任务会被添加进TaskExecuteWorker持有的BlockingQueue队列中用于异步执行。

TaskExecuteWorker中的构造函数开启InnerWorker线程,实时监听queue中的任务,进行消费。

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> { private final TaskExecuteWorker[] executeWorkers; @Override public void addTask(Object tag, AbstractExecuteTask task) { NacosTaskProcessor processor = getProcessor(tag); if (null != processor) { processor.process(task); return; } TaskExecuteWorker worker = getWorker(tag); //执行任务 worker.process(task); } } public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { /** * Max task queue size 32768. 队列大小 */ private static final int QUEUE_CAPACITY = 1 << 15; private final Logger log; private final String name; private final BlockingQueue<Runnable> queue; public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { this.name = name + "_" + mod + "%" + total; //初始化队列 this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY); this.closed = new AtomicBoolean(false); this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger; //开启InnerWorker线程,实时监听queue中的任务,进行消费 new InnerWorker(name).start(); } @Override public boolean process(NacosTask task) { //DistroSyncChangeTask 是AbstractExecuteTask的子类 if (task instanceof AbstractExecuteTask) { //添加任务 putTask((Runnable) task); } return true; } private void putTask(Runnable task) { try { //向队列中添加任务 queue.put(task); } catch (InterruptedException ire) { log.error(ire.toString(), ire); } } }

TaskExecuteWorker中的构造函数开启InnerWorker线程,实时监听queue中的任务,进行消费,执行任务。

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { private final BlockingQueue<Runnable> queue; public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { this.name = name + "_" + mod + "%" + total; //初始化队列 this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY); this.closed = new AtomicBoolean(false); this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger; //开启InnerWorker线程,实时监听queue中的任务,进行消费 new InnerWorker(name).start(); } private class InnerWorker extends Thread { InnerWorker(String name) { setDaemon(false); setName(name); } @Override public void run() { while (!closed.get()) { try { //获取队列中的任务 Runnable task = queue.take(); long begin = System.currentTimeMillis(); //执行任务 task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { log.error("[DISTRO-FAILED] " + e.toString(), e); } } } } }

DistroSyncChangeTask线程

  • 首先通过DistroKey中的key集合去DataStore中去查询key所对应的数据集合,然后对数据进行序列化操作,转为byte[]数组后,执行Http请求操作——NamingProxy.syncData(data, task.getTargetServer());如果数据广播失败,则重试失败任务,重新包装为DistroDelayTask任务进行执行。
public class DistroSyncChangeTask extends AbstractDistroExecuteTask { private final DistroComponentHolder distroComponentHolder; @Override public void run() { Loggers.DISTRO.info("[DISTRO-START] {}", toString()); try { String type = getDistroKey().getResourceType(); //key集合去DataStore中去查询key所对应的数据集合 DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); //是调用接口地址,通知其他服务,同步服务变更数据 boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); if (!result) { //重试失败任务 handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e); handleFailedTask(); } } } public class DistroHttpAgent implements DistroTransportAgent { private final ServerMemberManager memberManager; @Override public boolean syncData(DistroData data, String targetServer) { if (!memberManager.hasMember(targetServer)) { return true; } byte[] dataContent = data.getContent(); //同步数据 return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer()); } } public class NamingProxy { private static final String DATA_ON_SYNC_URL = "/distro/datum"; public static boolean syncData(byte[] data, String curServer) { Map<String, String> headers = new HashMap<>(128); headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version); headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION); headers.put(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch"); headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive"); headers.put(HttpHeaderConsts.CONTENT_ENCODING, "gzip"); try { //请求/distro/datum接口 RestResult<String> result = HttpClient.httpPutLarge( "http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data); if (result.ok()) { return true; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.getCode()) { return true; } throw new IOException("failed to req API:" + "http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.getCode() + " msg: " + result.getData()); } catch (Exception e) { Loggers.SRV_LOG.warn("NamingProxy", e); } return false; } }

这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns/distro/datum中,而该URL对应的处理器为DistroController中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)方法

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro") public class DistroController { @Autowired private DistroProtocol distroProtocol; @Autowired private ServiceManager serviceManager; @PutMapping("/datum") public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception { if (dataMap.isEmpty()) { Loggers.DISTRO.error("[onSync] receive empty entity!"); throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!"); } for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) { if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) { String namespaceId = KeyBuilder.getNamespace(entry.getKey()); String serviceName = KeyBuilder.getServiceName(entry.getKey()); if (!serviceManager.containService(namespaceId, serviceName) && switchDomain .isDefaultInstanceEphemeral()) { serviceManager.createEmptyService(namespaceId, serviceName, true); } DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue()); distroProtocol.onReceive(distroHttpData); } } return ResponseEntity.ok("ok"); } }

这里会调用DistroConsistencyServiceImpl.processData(DistroData distroData)方法,然后接着调用DistroConsistencyServiceImpl.onPut(String key, Record value)方法,将数据添加到Notifier中的ArrayBlockingQueue队列tasks中进行服务的注册。

 

Nacos服务端注册流程:https://www.jianshu.com/p/99f67f6f2577

至此完成了Nacos Server在AP模式下的数据的最终一致性操作。

参考: https://www.liaochuntao.cn/2019/05/09/java-web-32/

网友评论