1 背景
我们在实际开发的过程中,多多少少都需要用到定时任务来处理一些问题,比如老数据的迁移同步,月终的报表统计等等,在这篇文章中我们会通过传统定时任务和分布式任务调度中心,来分析传统定时任务的痛点以及分布式任务调度中心的设计和使用。
2 常见的传统定时任务
这里我们主要介绍3种常见的传统定时任务,分别为通过Timer实现定时任务、通过多线程实现定时任务、通过ScheduledExecutorService实现定时任务。
2.1 通过 Timer 实现定时任务
2.1.1 Timer 介绍
Timer 是 java.utils 包提供的一种原生定时器工具,内部通过多线程处理,使用时需要和 TimerTask 配合,Timer 负责计时,在设置的时间调度注册的 TimerTask,TimerTask 是一个实现了 Runnable 接口的抽象类,每个 TimerTask 对象代表一个待执行的任务。
2.1.2 Timer 常用场景
Timer 因为实现简单,通常会被用来执行一些简单的单机定时任务,如备份数据、刷新缓存、生成报表等。
2.1.3 Timer 正常调度任务
示例代码:
class MyTimer1 { public static void main(String[] args){ TimerTask timerTask = new TimerTask { @Override public void run { System.out.println("Timer方式调度任务,当前时间:"+ new Date); } }; Timer timer = new Timer; timer.schedule(timerTask,10,500); }}
执行结果:
图1 Timer 方式调度正常任务结果
2.1.4 Timer 调度任务异常
class MyTimer2 { public static boolean flag = false; public static void main(String[] args) throws InterruptedException { TimerTask timerTask = new TimerTask { @Override public void run { System.out.println("Timer方式调度任务1,当前时间:"+ new Date); } }; TimerTask timerTask2 = new TimerTask { @Override public void run { if(flag) { throw new RuntimeException("抛出异常"); } System.out.println("Timer方式调度任务2,当前时间:" + new Date); } }; Timer timer = new Timer; timer.schedule(timerTask,10,500); timer.schedule(timerTask2,10,500); Thread.sleep(1000); flag = true; Thread.sleep(1000); flag = false; }}
执行结果:
图2 Timer 方式调度异常任务结果
2.1.5 Timer 调度任务执行时间过长
class MyTimer3 { public static void main(String[] args) throws InterruptedException { TimerTask timerTask = new TimerTask { @Override public void run { System.out.println("Timer方式调度任务1,当前时间:"+ new Date); } }; TimerTask timerTask2 = new TimerTask { @Override public void run { System.out.println("Timer方式调度任务2,当前时间:" + new Date); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace; } } }; Timer timer = new Timer; timer.schedule(timerTask,10,500); Thread.sleep(2000); timer.schedule(timerTask2,10,500); }}
执行结果:
图3 Timer 方式调度时间过长任务结果
2.1.6 Timer 总结
Timer 中也存在一些潜在的问题,比如可能会导致程序的性能下降、容易引起死锁等。因此,在使用 Timer 定时器时需要注意其使用方式和场景,以确保程序的正确性和稳定性,比如减少 Timer 的个数,减少数据库操作等。
2.2 通过多线程实现定时任务
2.2.1 多线程休眠式定时任务
代码示例:
class MyTimer4{ public static boolean flag = false; public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>); Runnable task = -> { long step = 1000L; while(!flag) { try { long sleepTime = step - System.currentTimeMillis % step; Thread.sleep(sleepTime); System.out.println("多线程休眠式调度任务,当前时间:" + new Date); } catch (InterruptedException e) { e.printStackTrace; } } }; executor.execute(task); Thread.sleep(5000); flag = true; executor.shutdown; }}
执行结果:
图4 多线程休眠式调度正常任务结果
2.2.2 多线程休眠式调度异常任务
代码示例:
class MyTimer5{ public static boolean flag = false; public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>); Runnable task = -> { int times = 5; long step = 1000L; while(!flag) { int currIndex = times--; if(currIndex == 1) { System.out.println("多线程休眠式调度任务,抛出异常,当前时间:" + new Date); throw new RuntimeException("抛出异常"); } try { long sleepTime = step - System.currentTimeMillis % step; Thread.sleep(sleepTime); System.out.println("多线程休眠式调度任务,当前时间:" + new Date); } catch (InterruptedException e) { e.printStackTrace; } } }; executor.execute(task); Thread.sleep(5000); flag = true; executor.shutdown; }}
执行结果:
图5 多线程休眠式调度异常任务结果
2.2.3 多线程轮询式定时任务
class MyTimer6{ public static boolean flag = false; public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>); Runnable task = -> { long nextTime = 0; long delayTime = 1000L; while(!flag) { long currTime = System.currentTimeMillis; if(nextTime <= currTime){ nextTime = currTime + delayTime; System.out.println("多线程轮询式调度任务,当前时间:" + new Date); } } }; executor.execute(task); Thread.sleep(5000); flag = true; executor.shutdown; }}
执行结果:
图6 多线程轮询式调度正常任务结果
2.3 通过 ScheduledExecutorService 实现定时任务
2.3.1 ScheduledExecutorService 介绍
ScheduledExecutorService 是一个 ScheduledThreadPoolExecutor 线程池,和普通线程池不同的是调度时间不是由线程内的逻辑处理,而是由 DelayedWorkQueue 控制延时。
2.3.2 ScheduledExecutorService 正常调度任务
代码示例:
class MyTimer7{ public static void main(String[] args) { ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10); Runnable task = -> System.out.println("ScheduledExecutorService方式调度任务,当前时间:"+ new Date); service.scheduleAtFixedRate(task,10L, 1000L,TimeUnit.MILLISECONDS); }}
执行结果:
图7 ScheduledExecutorService方式调度正常任务结果
2.3.3 ScheduledExecutorService 调度异常任务
代码示例:
class MyTimer8{ public static void main(String[] args) { ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10); Runnable task = new Runnable { int times = 5; @Override public void run { int currIndex = times--; if(currIndex == 1) { System.out.println("ScheduledExecutorService方式调度异常任务,抛出异常,当前时间:" + new Date); throw new RuntimeException("抛出异常"); } System.out.println("ScheduledExecutorService方式调度异常任务,当前时间:" + new Date); } }; service.scheduleAtFixedRate(task,10L, 1000L,TimeUnit.MILLISECONDS); }}
执行结果:
图8 ScheduledExecutorService 方式调度异常任务结果
2.4 小结
Timer+TimerTask 方式可以作为处理定时任务的一个实现方式,但是对于一个 Timer 管理多个定时任务的场景,因为所有的任务是串行执行,所以如果其中的一个任务出现异常,会导致所有任务都终止,且当其中一个任务占用的时间过长时,会导致后续任务时间出现延迟,因此要使用 Timer 方式实现需要处理这几种情况。
多线程方式是比较安全的单机实现方式,但是因为各个任务都需要启动一个线程来计时,定时任务的增加会导致系统的线程数暴增。
3 传统定时任务的缺陷
通过对上面几个常见定时任务的分析,我们发现传统定时任务存在以下问题:
占用业务系统服务器资源
服务重启会导致定时任务调度异常
无法动态配置任务,修改任务信息需要重新发布
对业务系统代码侵入过多,难以维护
因此在定时任务较多的场景下,可以考虑使用定时任务框架来处理上述问题。
4 常见的定时任务框架选型
4.1 常见框架比较
Elastic-job | xxl-job | Quartz | |
说明 | 分布式任务调度框架,需要单独部署调度中心客户端,可视化界面可选部署 | 分布式任务调度框架,需要单独部署调度中心客户端,集成可视化界面 | 说明:基础的单体定时任务框架,是Java实现上的定时任务标准。 对任务的控制需要直接操作数据库或者调用api操作; 对任务的持久化需要引入数据库,在不考虑单服务多数据源的情况下需要写入业务库; 对任务的调度逻辑需要集成到业务系统中,数量增多会占用系统资源,影响业务性能以及自身调度精度。 |
强依赖组件 | Zookeeper | MySQL | |
业务侵入性 | 较低 | 较低 | |
高可用 | 由Zookeeper和实例共同控制 | 由数据库锁保证高可用 | |
并行调度 | 通过分片任务控制并发调度 | 系统多线程控制并发调度 | |
分片策略 | 支持多种分片策略,可自定义分片策略 | 以执行器为维度进行分片 | |
失败处理策略 | 选取空闲分片再次执行失败任务 | 策略包括:失败告警(默认)、失败重试 | |
弹性扩容 | 通过zk实现各服务的注册、控制及协调 | 通过数据库实现动态扩容,执行器超出一定数量则会影响性能 |
4.2 选型背景及结果
因为业务需求,需要定时清洗数据,整体由1次全量任务和持续的增量任务组成,因此需要使用定时任务来实现增量任务的调度,不存在较高的并发场景,但是单次任务的处理量可能较大。
就功能性来说,Xxl-job 和 Elastic-job 相差不多,且都有较大的用户基础和技术文档,而 xxl-job 只依赖数据库作为集群注册中心,不需要额外引入 Zookeeper 组件,且 xxl-job 更加侧重于业务实现的简单和管理的方便,学习成本更低,且集成的可视化界面使用起来比较友好,结合系统的并发场景来看,xxl-job是一个更好的选择。
5 分布式任务调度中心的组成结构
一般情况下,定时任务的主要包括3个组件,计时器、调度器以及执行器,计时器负责计算时间,并在目标时间通知调度器,调度器负责遍历任务,并通知目标执行器执行任务,其中计时器和调度器影响任务调度时间,执行器影响任务结果。
5.1 常见的计时器设计
作为定时任务最主要的组件之一,计时器的性能直接影响到定时任务的调度精度,常见的计时器有以下两种设计,时间轴和时间轮。
5.1.1 时间轴计时器
时间轴计时器是最基础的计时器算法,每隔一段时间比较当前时间和目标时间,优点是实现方式简单,缺点是因为长度不固定,不方便将定时任务分组。
5.1.2 时间轮计时器
时间轮就是将时间轴维护成了类似钟表的结构,低级计时器每走完一轮则使高级计时器往前一格,而各个时间的跨度可以各自定义。为了方便设置,也会定义成时间单位的跨度,如60秒为1分钟,24小时为1天,示意图如下。时间轮的好处是可以减少同时运行的计时器数量,更方便将任务分组,减少调度器需要遍历的任务数量。
图9 时间轮结构示意图
5.2 调度器的功能
调度中心作为分布式定时任务的管理者,经常会出现一个时间点内处理很多任务的场景,如果由计时器来负责任务的调度会导致计时出现偏差,比如调度10:00:00的所有任务需要10s,那么如果存在需要在10:00:05执行的任务则最早会在10:00:10开始调度,会导致后续任务出现偏差。
因此,为了保证计时器时间的准确性,需要将调度功能从计时器的功能中拆解出来,由调度器来进行某个时间点的任务调度,让计时器只需要专注于计时,降低后续调度任务的误差。
6 调度中心的设计
在设计时,将计时器、调度器以及执行器组成的关联结构称为一个任务单元,一般情况下,调度中心都是由多个任务单元组成,后面我们将会通过 xxl-job 来了解一个调度中心的设计。
图10 xxl-job架构图
6.1 定时任务计时器精度的保证机制
上文中提到传统多线程定时任务的两种实现方式,其中休眠式所占用的资源较少,但误差较大,而轮询式的资源占用较多但误差较小,而在 xxl-job 的实现中将二者结合,在保证误差较少的情况下降低了调度所需要的资源,下面截取了几段代码来展示实现逻辑
代码示例:
public class JobScheduleHelper { private class ScheduleThread extends Thread { @Override public void run { try { //线程在4~5秒后执行任务 TimeUnit.MILLISECONDS.sleep(PRE_READ_MS - System.currentTimeMillis % 1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage, e); } } while(!scheduleThreadToStop){ //计算任务执行时间 long start = System.currentTimeMillis; boolean preReadSuc = true;//读取到数据则为true List
从代码中可以看到,扫描线程在无任务的情况下,线程休眠1s来降低资源的开销,而通过预读任务的方式来提前将任务写入待执行列表,扫描到任务之后扫描线程休眠,启动轮询式计时器 RingThread,用于更高精度的计时,最终实现毫秒级的调度时机误差。
6.2 定时任务调度精度的保证机制
上文已经说完了 xxl-job 的计时器精度保证机制,在这里我们来讲讲调度器的精度保证机制。从调度器的功能上我们可以知道,影响调度器精度的两个方面是待遍历任务数和遍历效率,其中遍历效率因为涉及到各类遍历算法,这里不做赘述。
传统定时中的调度部分是通过遍历待执行列表来控制任务执行的,实现较为简单,但是这种方式可能因为调度器需要遍历的任务数量较多,导致任务的执行时机出现误差,而 xxl-job 中则通过将任务分组,提高了这种场景出现的数量条件。
代码示例:
public void resolveSchedule(long nowTime, List
从代码中可以看到,在读取待执行任务后根据任务执行时间分组,可以理解成维护了一个5s的时间轮,通过将任务分组,分散到时间轮的各个刻度上,降低调度器需要处理的任务数量。
6.3 执行器的注册和任务执行
调度中心既然只是作为一个定时任务的管理平台,那自然不会去维护对应定时任务的逻辑,各个任务的执行逻辑都由各自服务自己负责,而调度中心怎么知道对应的执行器呢?
xxl-job 中一共提供了两种执行器的注册方式,手动录入和自动注册,因为手动录入的逻辑较为简单,这里我们主要针对自动注册逻辑来进行讲解。
6.3.1 执行器的自动注册
执行器启动时会读取配置,通过遍历配置中的调度中心地址列表来向任务调度中心注册该执行器。
代码如下:
public class XxlJobExecutor { private String adminAddresses; private String accessToken; public void start throws Exception { initAdminBizList(adminAddresses, accessToken); initEmbedServer(address, ip, port, appname, accessToken); } //初始化服务管理 private static List
启动内嵌服务器:
public class EmbedServer { private ExecutorBiz executorBiz; public void start(String address, int port, String appName, String accessToken) { startRegistry(appName, address); } public void startRegistry(String appName, String address) { //启动注册线程 ExecutorRegistryThread.getInstance.start(appName, address); }}public class ExecutorRegistryThread { public void start(String appName, String address) { RegistryParam registryParam = new RegistryParam(RegistryType.EXECUTOR.name, appName, address); List
6.3.2 执行器的任务调度
任务调度由调度中心和执行器共同处理。客户端的代码如下:
public class XxlJobTrigger { public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList.size); } private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ ReturnT
在这里我们可以看出,调度中心经过一系列处理,最终访问执行器提供的 run 接口来通知执行器。执行器的代码如下:
public class EmbedServer { private ExecutorBiz executorBiz = new ExecutorBizImpl; public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler
从源码可以看到,最终的执行逻辑是由 XxlJobExecutor 中注册的 Handler 执行。
6.4 定时任务的失败处理
上面已经说完定时任务的正常调度逻辑,这里说说定时任务失败时的处理逻辑。源码如下:
public class JobFailMonitorHelper { public void start{ monitorThread = new Thread( -> { while (!toStop) { XxlJobAdminConfig adminConfig = XxlJobAdminConfig.getAdminConfig; //查询出现异常的日志Id List
根据源码可以看出,失败任务每10秒处理一次,通过 log 状态扫描执行失败的任务,对于失败任务,有两个处理逻辑:失败重试和失败告警,两者互不影响。
失败重试较为简单,将重试任务重新写入待调度列表即可,这里我们主要讲一下失败告警的逻辑。源码如下:
public class JobAlarmer implements ApplicationContextAware, InitializingBean { private List
JobAlarmer 的逻辑较为简单,处理待告警任务,通过配置的告警方式调用目标告警类的 doAlarm 方法。目前开源的版本只处理了邮件告警的实现类 EmailJobAlarm。对应源码如下:
public class EmailJobAlarm implements JobAlarm { public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){ boolean alarmResult = true; if(ObjectUtils.isEmpty(info)){ return true; } //如果设置了告警邮箱地址,则在发生异常时发送告警邮件 if (StringUtils.isEmpty(info.getAlarmEmail)) { return true; } // 拼接邮件内容 String alarmContent = ""; //处理邮箱地址,用","分隔 List
7 总结
通过本篇文章的分析,对于少量任务,完全可以使用单机的定时任务管理,但是对于定时任务数量较大或者涉及服务较多的场景,则是建议使用中间件来统一管理,因为如果选择各个服务自行管理的话,需要在每个服务都维护一块相同的管理代码,并且需要长时间占用一部分资源来维护。
另外,因为分布式任务调度平台基本都提供了可视化的操作页面,对于任务的配置和管理相对单机管理要更加简便和直观,当然,最终方案还是需要根据业务情况来选择。
叶刀刀,来自缦图互联网中心后端团队。