在Spring Boot
中使用定时任务,只需要@EnableScheduling
开启定时任务支持,在需要调度的方法上添加@Scheduled
注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
2.1 @EnableScheduling
引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
2.2 SchedulingConfiguration
只配置了一个bean,ScheduledAnnotationBeanPostProcessor
从名字就知道该类实现BeanPostProcessor
接口
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
2.3 ScheduledAnnotationBeanPostProcessor
的postProcessAfterInitialization
实现,可见具体处理@Scheduled
实现定时任务的是processScheduled
方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { // 获取bean的方法及@Scheduled映射关系 Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> // 处理@Scheduled注解 scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled
处理cron
表达式的关键实现,
private final ScheduledTaskRegistrar registrar; public ScheduledAnnotationBeanPostProcessor() { this.registrar = new ScheduledTaskRegistrar(); } protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 将定时任务方法,转为Runnable Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 处理 scheduled.initialDelay()的值,略过... // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { // ${}变量值表达式的转换 cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 创建cron触发器CronTrigger对象,并注册CronTask tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // 处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过... } // 略过 catch Exception ... }
以上通过this.registrar.scheduleCronTask
实现cron定时任务注册或初始化
实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled
方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask
注册或初始化定时任务
DisposableBean+destroy() : voidDynamicCronScheduleTaskManager+Map<String, ScheduledTask> dynamicScheduledTaskMap-ScheduledTaskRegistrar registrar+addTriggerTask(String cronName, TriggerTask task) : ScheduledTask+contains(String cronName) : boolean+updateTriggerTask(String cronName) : void+removeTriggerTask(String cronName) : voidEnvironmentAware+setEnvironment()EnvironmentDynamicCronHandler+Environment environment+environmentChangeEvent(EnvironmentChangeEvent event) : voidAbstractDynamicCronHandler-DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager+getCronExpression(String cronName) : String+updateTriggerTash(String cronName) : voidTrigger+nextExecutionTime(TriggerContext triggerContext) : DateDynamicCronTrigger-String cronName-AbstractDynamicCronHandler dynamicCronHandler-String cronExpression-CronSequenceGenerator sequenceGeneratorScheduledDynamicCron+value() : String+cronName() : Stringextends AbstractDynamicCronHandler
import org.springframework.beans.factory.DisposableBean; import org.springframework.scheduling.config.ScheduledTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.config.TriggerTask; import java.util.HashMap; import java.util.Map; /** * @author HuangJS * @date 2021-12-11 3:04 下午 */ public class DynamicCronScheduleTaskManager implements DisposableBean { private Map<String, ScheduledTask> dynamicScheduledTaskMap = new HashMap<>(); ScheduledTaskRegistrar registrar; // 添加定时任务 public ScheduledTask addTriggerTask(String cronName, TriggerTask task) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName); if (scheduledTask != null) { scheduledTask.cancel(); } scheduledTask = this.registrar.scheduleTriggerTask(task); dynamicScheduledTaskMap.put(cronName, scheduledTask); return scheduledTask; } public boolean contains(String cronName){ return this.dynamicScheduledTaskMap.containsKey(cronName); } // 更新定时任务的触发时机 public void updateTriggerTask(String cronName) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName); if (scheduledTask == null) { throw new IllegalStateException("Invalid cronName "" + cronName + "",no fund ScheduledTask"); } scheduledTask.cancel(); scheduledTask = this.registrar.scheduleTriggerTask((TriggerTask) scheduledTask.getTask()); dynamicScheduledTaskMap.put(cronName, scheduledTask); } // 移除定时任务 public void removeTriggerTask(String cronName) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.remove(cronName); if (scheduledTask != null) { scheduledTask.cancel(); } } @Override public void destroy() throws Exception { for (ScheduledTask value : dynamicScheduledTaskMap.values()) { value.cancel(); } this.dynamicScheduledTaskMap.clear(); } }
public abstract class AbstractDynamicCronHandler { @Autowired protected DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; /** * 获取cron表达式 * @return */ public abstract String getCronExpression(String cronName); /** * 更新cronName对应的定时任务的触发时机 * @param cronName */ public void updateTriggerTask(String cronName) { dynamicCronScheduleTaskManager.updateTriggerTask(cronName); } }
基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent
事件实现了定时任务的触发时机的更新
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.context.environment.EnvironmentChangeEvent; import org.springframework.context.EnvironmentAware; import org.springframework.context.event.EventListener; import org.springframework.core.env.Environment; /** * @author HuangJS * @date 2021-12-11 11:46 上午 */ public class EnvironmentDynamicCronHandler extends AbstractDynamicCronHandler implements EnvironmentAware { private final Logger logger = LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class); private Environment environment; @Override public String getCronExpression(String cronName) { try { return environment.getProperty(cronName); } catch (Exception e) { logger.error(e.getMessage(), e); } return null; } @Override public void setEnvironment(Environment environment) { this.environment = environment; } @EventListener public void environmentChangeEvent(EnvironmentChangeEvent event) { for (String key : event.getKeys()) { if (this.dynamicCronScheduleTaskManager.contains(key)) { this.dynamicCronScheduleTaskManager.updateTriggerTask(key); } } } }
public class DynamicCronTrigger implements Trigger { private final static Logger LOGGER = LoggerFactory.getLogger(DynamicCronTrigger.class); private String cronName; private AbstractDynamicCronHandler dynamicCronHandler; private String cronExpression; private CronSequenceGenerator sequenceGenerator; public DynamicCronTrigger(String cronName, AbstractDynamicCronHandler dynamicCronHandler) { this.cronName = cronName; this.dynamicCronHandler = dynamicCronHandler; } @Override public Date nextExecutionTime(TriggerContext triggerContext) { String cronExpression = dynamicCronHandler.getCronExpression(cronName); if (cronExpression == null) { return null; } if (this.sequenceGenerator == null || !cronExpression.equals(this.cronExpression)) { try { this.sequenceGenerator = new CronSequenceGenerator(cronExpression); this.cronExpression = cronExpression; } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } Date date = triggerContext.lastCompletionTime(); if (date != null) { Date scheduled = triggerContext.lastScheduledExecutionTime(); if (scheduled != null && date.before(scheduled)) { // Previous task apparently executed too early... // Let's simply use the last calculated execution time then, // in order to prevent accidental re-fires in the same second. date = scheduled; } } else { date = new Date(); } return this.sequenceGenerator.next(date); } }
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ScheduledDynamicCron { /** * 动态cron名称 * @return */ @AliasFor("cronName") String value() default ""; /** * 动态cron名称 * @return */ @AliasFor("value") String cronName() default ""; /** * 动态cron处理类,默认使用基于 Environment 实现的处理类 * @return */ Class<? extends AbstractDynamicCronHandler> handler() default EnvironmentDynamicCronHandler.class; }
import org.springframework.beans.factory.BeanFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; import org.springframework.scheduling.config.*; import org.springframework.scheduling.support.CronTrigger; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.util.StringValueResolver; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.TimeZone; /** * * @author HuangJS * @date 2020/5/14 17:23 **/ public class DynamicScheduledAnnotationBeanPostProcessor extends ScheduledAnnotationBeanPostProcessor { private StringValueResolver embeddedValueResolver; private BeanFactory beanFactory; private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; private final ScheduledTaskRegistrar registrar = (ScheduledTaskRegistrar) getFieldValueFromParentClass("registrar"); private final Map<Object, Set<ScheduledTask>> scheduledTasks = (Map) getFieldValueFromParentClass("scheduledTasks"); public DynamicScheduledAnnotationBeanPostProcessor(DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager) { this.dynamicCronScheduleTaskManager = dynamicCronScheduleTaskManager; this.dynamicCronScheduleTaskManager.registrar = this.registrar; } @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { super.setEmbeddedValueResolver(resolver); this.embeddedValueResolver = resolver; } @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); this.beanFactory = beanFactory; } /** * @param scheduled * @param method * @param bean * @see ScheduledAnnotationBeanPostProcessor */ @Override protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long"); } } } // 处理ScheduledDynamicCron注解 if (method.isAnnotationPresent(ScheduledDynamicCron.class)) { ScheduledDynamicCron dynamicCron = method.getAnnotation(ScheduledDynamicCron.class); String cronName = dynamicCron.value(); if (StringUtils.hasText(dynamicCron.cronName())) { cronName = dynamicCron.cronName(); } else { Assert.isTrue(StringUtils.hasText(cronName), "@ScheduledDynamicCron 'cronName' or 'value' attributes is required"); } DynamicCronTrigger trigger = new DynamicCronTrigger(cronName, this.beanFactory.getBean(dynamicCron.handler())); ScheduledTask scheduledTask = this.dynamicCronScheduleTaskManager.addTriggerTask(cronName, new TriggerTask(runnable, trigger)); tasks.add(scheduledTask); processedSchedule = true; } else { // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } } private Object getFieldValueFromParentClass(String registrar) { try { Field field = ScheduledAnnotationBeanPostProcessor.class.getDeclaredField(registrar); field.setAccessible(true); Object fieldVaule = field.get(this); return fieldVaule; } catch (Exception e) { logger.error("通过反射读取ScheduledAnnotationBeanPostProcessor.{} 的时候出现错误", e); throw new IllegalArgumentException(e); } } private static long parseDelayAsLong(String value) throws RuntimeException { return value.length() <= 1 || !isP(value.charAt(0)) && !isP(value.charAt(1)) ? Long.parseLong(value) : Duration.parse(value).toMillis(); } private static boolean isP(char ch) { return ch == 'P' || ch == 'p'; } }
@EnableScheduling public class SchedulerConfiguration implements SchedulingConfigurer { /** * 执行任务的线程数量 */ @Value("${app.scheduler.thread.count:10}") private int schedulerThreadCount; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { TaskScheduler taskScheduler = new ConcurrentTaskScheduler(new ScheduledThreadPoolExecutor(schedulerThreadCount)); taskRegistrar.setTaskScheduler(taskScheduler); } @Bean public EnvironmentDynamicCronHandler environmentDynamicCronHandler() { return new EnvironmentDynamicCronHandler(); } @Bean public DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager() { return new DynamicCronScheduleTaskManager(); } @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Primary public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor(DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager) { return new RedisScheduledAnnotationBeanPostProcessor(dynamicCronScheduleTaskManager); } }
SchedulerConfiguration
如下示例:
@SpringBootApplication @Import(SchedulerConfiguration.class) // 导入配置类 public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
nacos的配置
cron: name1: "0/5 * * * * ?" # 每5秒执行一次
使用@ScheduledDynamicCron
指定cron表达式的配置名cron.name1
,不指定handler()
默认使用EnvironmentDynamicCronHandler
,该类会根据指定的配置名cron.name1
获取nacos
上的cron
表达式
@Component public class DynamicTask { @Scheduled @ScheduledDynamicCron("cron.name1") public void dynamicCronForEnvironment() { System.out.println("dynamicCronForEnvironment:" + DateUtils.format(LocalDateTime.now())); } }
@Scheduled
仍需要添加,但会忽略其中的cron属性配置nacos
的cron.name1
配置为0/2 * * * * ?
并发布,定时任务会立即由原来的5秒执行一次,变为2秒执行一次扩展AbstractDynamicCronHandler,实现从数据库查询cron表达式
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DynamicCronHandler extends AbstractDynamicCronHandler { @Autowired private ParametersService parametersService; @Override public String getCronExpression(String cronName) { // 通过cronName查询保存在数据库的cron表达式 Parameters ap = parametersService.getByName(cronName); return ap.getValue(); } }
定时任务类
@Component public class DynamicTask { @Scheduled // 需要指定handler为上面定义的DynamicCronHandler @ScheduledDynamicCron(cronName = "cron.name2", handler = DynamicCronHandler.class) public void dynamicCronForDb() { System.out.println("dynamicCronForDb:" + LocalDateTime.now()); } }
定时任务触发时机更新,需要在更新数据库配置时进行更新
@RestController @RequestMapping("parameters") public class ParametersController { @Autowired private ParametersService parametersService; @Autowired private DynamicCronHandler dynamicCronHandler; @PostMapping("/update") public Result update(Parameters parameters){ if (parametersService.update(parameters)) { if ("cron.name2".equals(parameters.getName())) { // 更新数据库配置后,更新定时任务的触发时机 dynamicCronHandler.updateTriggerTask(cronName); } } return Result.success(); } }
上面更新数据库配置后,同步更新任务的触发时机,仅在本服务生效,集群中的其他服务节点并不会更新
其他节点的更新可以通过消息总线的方式进行更新,如通过MQ发送广播消息,其它服务节点消费消息后调用以下方法更新任务触发时机
dynamicCronHandler.updateTriggerTask(cronName);
添加任务的web接口
@RestController @RequestMapping("dynamicCron") public class DynamicCronController { @Autowired private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; @Autowired private EnvironmentDynamicCronHandler environmentDynamicCronHandler; @PostMapping("/addTask") public Result addTask(String cronName){ // 创建要定时运行的Runnable Runnable runnable = () -> System.out.println("run task:" + LocalDateTime.now()); // 使用EnvironmentDynamicCronHandler,创建触发器 DynamicCronTrigger trigger = new DynamicCronTrigger(cronName, environmentDynamicCronHandler); // 添加定时任务 dynamicCronScheduleTaskManager.addTriggerTask(cronName, new TriggerTask(runnable, trigger)); return Result.success(); } }
接口执行完成后,定时任务并不会执行,因为还没配置cron.name2
,在nacos配置cron表达式后,定时任务将开始调度
配置nacos后的控制台输出