当前位置 : 主页 > 编程语言 > java >

一分钟掌握Java ElasticJob分布式定时任务

来源:互联网 收集:自由互联 发布时间:2023-05-14
目录 前言 架构 功能和特性 入门角色 写个例子 任务执行流程 ScheduleJobBootstrap初始化 ScheduleJobBootstrap执行 执行流程总结 分片的策略 前言 ElasticJob 是面向互联网生态和海量任务的分布式
目录
  • 前言
  • 架构
  • 功能和特性
  • 入门角色
  • 写个例子
  • 任务执行流程
    • ScheduleJobBootstrap初始化
    • ScheduleJobBootstrap执行
    • 执行流程总结
  • 分片的策略

    前言

    ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。

    架构

    elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:

    从架构图可以看到,左上角App1和App2两个业务模块中的Elastic-Job往zk中注册了信息,右边的Elastic-Job-Lite是监听了zk的,因此,整个任务的调度是由zk来完成的。下面的console通过Rest API去获取zk中的信息,得到调度数据和日志,并存盘。

    这是ElasticJob-Cloud的架构图:

    ElasticJob-Cloud的调度是依赖Mesos的,从架构图的理解,Mesos和zk结合做好任务调度,再分发给Mesos的代理并执行。

    功能和特性

    以下是ElasticJob的特性优点

    • 支持任务在分布式场景下的分片和高可用
    • 能够水平扩展任务的吞吐量和执行效率
    • 任务处理能力随资源配备弹性伸缩
    • 优化任务和资源调度
    • 相同任务聚合至相同的执行器统一处理
    • 动态调配追加资源至新分配的任务
    • 失效转移
    • 错过任务重新执行
    • 分布式环境下任务自动诊断和修复
    • 基于有向无环图 (DAG) 的任务依赖
    • 基于有向无环图 (DAG) 的任务项目依赖
    • 可扩展的任务类型统一接口
    • 支持丰富的任务类型库--包括数据流、脚本、HTTP、文件、大数据
    • 易于对接业务任务--兼容 Spring IOC
    • 任务管控端
    • 任务事件追踪
    • 注册中心管理

    入门角色

    既然这么多优点,我们就入门试试吧。入门elasticjob-lite也继承了Quartz框架,同样的很简单,只要三个角色:

    SimpleJob:任务主体。如果用过Quartz,那么应该能够理解这个,基本上和Quartz的Job接口类似,只要实现一个execute方法就行了,入门用这个就行;

    JobConfiguration:任务配置。同样的可以理解为类似Quartz框架中的Trigger,最重要的就是配置任务的执行频率;

    ScheduleJobBootstrap:调度主体。这个一样,参考Quartz框架中的Scheduler对象,它把任务和配置结合起来,任务按照配置中的频率执行。

    写个例子

    我们创建这三种角色,首先创建任务主体:

    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    /**
     * (这个类的说明)
     *
     * @author mars酱
     */
    
    public class MarsSimpleJob implements SimpleJob {
        
        @Override
        public void execute(final ShardingContext shardingContext) {
            System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n",
                   			  shardingContext.getShardingItem(), 
                              new SimpleDateFormat("HH:mm:ss").format(new Date()), 
                              Thread.currentThread().getId(), 
                              "就是这么简单~");
        }
    }

    再创建任务配置:

    import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
    import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
    import javax.sql.DataSource;
    import java.util.Objects;
    /**
     * (这个类的说明)
     *
     * @author mars酱
     */
    public class JobConfigurationBuilder {
        public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) {
            JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3)
                    .cron(cronExpression)
                    .shardingItemParameters("0=a,1=b,2=c");
            if (Objects.nonNull(tracingConfig)) {
                builder.addExtraConfigurations(tracingConfig);
            }
            return builder.build();
        }
    }

    最后创建调度器,并执行:

    import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
    import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
    import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
    import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
    import javax.sql.DataSource;
    /**
     * (这个类的说明)
     *
     * @author mars酱
     */
    public final class SchedulerMain {
        private static final int EMBED_ZOOKEEPER_PORT = 4181;
        private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
        private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java";
        // CHECKSTYLE:OFF
        public static void main(final String[] args) {
            // 内嵌zk服务
            EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
            CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
            // 简单作业
            setUpSimpleJob(regCenter, null);
        }
        private static CoordinatorRegistryCenter setUpRegistryCenter() {
            ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
            CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
            result.init();
            return result;
        }
        private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) {
            new ScheduleJobBootstrap(regCenter,
                    new MarsSimpleJob(),
                    JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule();
        }
    }

    运行的效果:

    截图中Item是处理的分片项,Thread是当前线程的id,看到了Quartz框架的影子...。

    任务执行流程

    既然能成功运行,我们看看内部的处理逻辑吧。Mars酱本机并没有安装zk,所以copy了官方的例子,在程序运行前先启用了一个内嵌的zk服务:

    EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);

    这个只能在模拟的时候使用,千万不能拿去放生产环境。接下来就是注册中心的配置了,我们需要的是CoordinatorRegistryCenter对象:

    private static CoordinatorRegistryCenter setUpRegistryCenter() {
    	ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
        CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
        result.init();
        return result;
    }

    好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。

    ScheduleJobBootstrap初始化

    ScheduleJobBootstrap的初始化在例子中需要三个参数:

    CoordinatorRegistryCenter:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?

    ElasticJob: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJobDataflowJob,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData获取数据,一个processData处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?

    JobConfiguration:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:

    属性名说明String jobName任务名称String croncron表达式String timeZone任务运行的时区int shardingTotalCount任务分片总数String shardingItemParameters分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数String jobParameter任务自定义任务参数boolean monitorExecution是否监听执行boolean failover是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行boolean misfire不发火。哈哈,其实是是否开启错过任务重新执行int maxTimeDiffSeconds最大时差int reconcileIntervalMinutes间隔时长String jobShardingStrategyType任务分片策略类型,总共三种String jobExecutorServiceHandlerType任务执行程序服务处理程序类型String jobErrorHandlerType任务错误处理类型Collection jobListenerTypes任务监听类型Collection extraConfigurations附加配置信息String description任务描述Properties props扩展用属性值boolean disabled是否禁用boolean overwrite是否覆盖String label标签boolean staticSharding是否支持静态分片

    ScheduleJobBootstrap执行

    同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob的, 而SimpleJob的execute函数是被SimpleJobExecutor所调用:

    /**
     * Simple job executor.
     */
    public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
        @Override
        public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
            // 这里调用execute函数
            elasticJob.execute(shardingContext);
        }
        @Override
        public Class<SimpleJob> getElasticJobClass() {
            return SimpleJob.class;
        }
    }

    再继续往上找,process的核心流程就是在ElasticJobExecutor里面了,调用process的部分在ElasticJobExcutor中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:

    @SuppressWarnings("unchecked")
    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
            jobFacade.postJobExecutionEvent(startEvent);
            log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
            JobExecutionEvent completeEvent;
            try {
                // 这里调用SimpleJobExecutor的process
                jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
                completeEvent = startEvent.executionSuccess();
                log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
                jobFacade.postJobExecutionEvent(completeEvent);
                // CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                // CHECKSTYLE:ON
                completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
                jobFacade.postJobExecutionEvent(completeEvent);
                itemErrorMessages.put(item, ExceptionUtils.transform(cause));
                JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
                jobErrorHandler.handleException(jobConfig.getJobName(), cause);
            }
    }

    上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。

    调用这个process的部分,由另一个process完成,长这样的:

    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
            Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
            if (1 == items.size()) {
                int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
                JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
                process(jobConfig, shardingContexts, item, jobExecutionEvent);
                return;
            }
            CountDownLatch latch = new CountDownLatch(items.size());
            for (int each : items) {
                JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
                ExecutorService executorService = executorContext.get(ExecutorService.class);
                if (executorService.isShutdown()) {
                    return;
                }
                // 提交给线程池执行
                executorService.submit(() -> {
                    try {
                        process(jobConfig, shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                });
            }
            try {
                latch.await();
            } catch (final InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
    }

    上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:

    private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    	if (shardingContexts.getShardingItemParameters().isEmpty()) {
    		jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
            return;
        }
        // 往注册中心注册ShardingContexts信息
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        // 发送跟踪日志,标记任务正在运行
        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        try {
            // 调用process
            process(jobConfig, shardingContexts, executionSource);
        } finally {
            // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
            // 告知注册中心任务完成
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                // 没有失败信息,通知任务完成
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
            } else {
                // 否则通知失败
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                itemErrorMessages.clear();
            }
        }
    }

    方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:

    public void execute() {
        // job的配置信息
        JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
        executorContext.reloadIfNecessary(jobConfig);
        JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        // 这里有玄机
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        // 发送时间消息总线
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), 
                                              State.TASK_FINISHED, 
                                              String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", 
                                                  jobConfig.getJobName(),
                                                  shardingContexts.getShardingItemParameters().keySet()));
                return;
        }
        try {
            // 任务执行的前置流程
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
        // 调用上面的execute方法
        execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
        }
        // 故障转移
        jobFacade.failoverIfNecessary();
        try {
            // 任务执行的后置流程
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
    }

    这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。

    执行流程总结

    那么,追踪完源代码,大致的流程就应该是如下:

    1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑

    任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱

    分片的策略

    任务的分片策略,用于将任务在分布式环境下分解成为任务使用。

    SPI 名称详细说明JobShardingStrategy作业分片策略接口 已知实现类详细说明AverageAllocationJobShardingStrategy根据分片项平均分片OdevitySortByNameJobShardingStrategy根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片RoundRobinByNameJobShardingStrategy根据任务名称轮询分片

    那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration中配置的分片策略方式:

    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        JobConfiguration jobConfig = configService.load(false);
        int shardingTotalCount = jobConfig.getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
        // 获取任务分片策略
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
        jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

    如果不设置,默认使用的是平均分片策略。

    以上就是一分钟掌握Java ElasticJob分布式定时任务的详细内容,更多关于Java ElasticJob定时任务的资料请关注自由互联其它相关文章!

    上一篇:MyBatis-Plus自定义通用的方法实现
    下一篇:没有了
    网友评论