ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spring quartz集群搭建

2021-10-28 16:30:00  阅读:211  来源: 互联网

标签:quartz String jobGroupName spring jobName 集群 scheduler org


文章目录

背景

quartz 可用于管理调度定时任务,有集群模式和单机模式,quartz 的单机模式部署,所有任务执行信息都在内存中保存,存在单点故障,quartz 的集群模式具备高可用,自动负载均衡等特点,可保障定时任务的执行。

1.1 SpringBoot + Mysql + Quartz 集群模式搭建

注: 集群模式依赖实例所在机器之间的时间同步,请自行部署 ntp 服务进行时间同步。
1.1 Quartz 相关表建立

  • 去官网下载 quartz,下载地址,需要下载2.2.3或者更低版本
  • 解压后,执行 docs/dbTables/tables_mysql_innodb.sql 脚本建表
  • 检查 db 中是否存在以下 11 个表
+--------------------------+
| QRTZ_BLOB_TRIGGERS       |
| QRTZ_CALENDARS           |
| QRTZ_CRON_TRIGGERS       |
| QRTZ_FIRED_TRIGGERS      |
| QRTZ_JOB_DETAILS         |
| QRTZ_LOCKS               |
| QRTZ_PAUSED_TRIGGER_GRPS |
| QRTZ_SCHEDULER_STATE     |
| QRTZ_SIMPLE_TRIGGERS     |
| QRTZ_SIMPROP_TRIGGERS    |
| QRTZ_TRIGGERS            |
+--------------------------+

1.2 maven 中引入 Quartz 相关包

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>

1.3 创建quartz配置文件

#默认或是自己改名字都行
org.quartz.scheduler.instanceName=DefaultQuartzScheduler

#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.dataSource=qzDS
# 开启集群模式
org.quartz.jobStore.isClustered=true
# 集群实例检测时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval=5000

# misfire 任务的超时阈值 ms
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate


org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false

# 工作线程的线程池设置
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=5
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

#============================================================================
# Configure Datasources
#============================================================================
#配置数据源
org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/dbName?characterEncoding=utf8&useSSL=true
org.quartz.dataSource.qzDS.user=xxx
org.quartz.dataSource.qzDS.password=xxx
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual

特别解释一下这个参数 org.quartz.jobStore.misfireThreshold = 60000, misfire 任务为错过调度触发时间的任务,而 misfireThreshold 为判定触发任务为 misfire 的判定条件,比如规定 11:30 要执行一次 Job, 如果因为实例挂掉或者线程池忙导致 11:33 才触发调度,超时了 3 分钟,超时时间 > 60000ms, 因此判定为 misfire。

判定为 misfire 的处理规则在后面的原理介绍相关文章会提及。

1.4 创建job 实例工厂,解决spring注入问题,如果使用默认会导致spring的@Autowired 无法注入问题(很重要

@Component
public class MyJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

    private transient AutowireCapableBeanFactory beanFactory;

    @Override
    public void setApplicationContext(final ApplicationContext context) {
        beanFactory = context.getAutowireCapableBeanFactory();
    }

    @Override
    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
        final Object job = super.createJobInstance(bundle);
        beanFactory.autowireBean(job);
        return job;
    }
}

1.5 quartz的初始化配置,生成 ScheduleFactory Bean

@Configuration
public class SchedulerConfiguration {

    @Autowired
    private MyJobFactory myJobFactory;

    @Bean(name = "schedulerFactoryBean")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        //获取配置属性
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("quartz.properties"));
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        //创建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        Properties pro = propertiesFactoryBean.getObject();
        factory.setOverwriteExistingJobs(true);
        factory.setAutoStartup(true);
        factory.setQuartzProperties(pro);
        factory.setJobFactory(myJobFactory);
        return factory;
    }

}

1.6 任务管理实现类

package com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.schedules;


import com.alibaba.fastjson.JSONObject;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.contract.SysScheduleTaskDTO;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.util.LogUtils;
import lombok.extern.log4j.Log4j2;
import org.joda.time.DateTime;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.*;

/**
 *
 * @ClassName: DistributeQuartzManager
 * @Description 分布式集群quartz定时任务管理增删改
 * @date 2019/10/1211:04
 */
@Log4j2
@Component
public class DistributeQuartzManager {

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean schedulerFactory;

    /**
     * 判断一个job是否存在
     *
     * @param jobName
     *            任务名
     * @param jobGroupName
     *            任务组名
     * @return
     */
    public  boolean isExistJob(String jobName, String jobGroupName) {
        boolean exist = false;
        try {

            Scheduler sched = schedulerFactory.getScheduler();
            JobKey jobKey = new JobKey(jobName, jobGroupName);
            exist = sched.checkExists(jobKey);
        }
        catch (SchedulerException e) {
            e.printStackTrace();
        }
        if (exist) {
            log.debug("触发器[" + jobName + "]重复");
        }
        else {
            log.debug("触发器[" + jobName + "]可用");
        }
        return exist;

    }

    /**
     * @Description: 添加一个定时任务
     *
     * @param jobName
     *            任务名
     * @param jobGroupName
     *            任务组名
     * @param triggerName
     *            触发器名
     * @param triggerGroupName
     *            触发器组名
     * @param jobClass
     *            任务
     * @param cron
     *            时间设置,参考quartz说明文档
     */
    public JobDetail addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                   @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        return doAddJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
    }

    private JobDetail doAddJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, JobDataMap jMap, String cron) {
        JobDetail jobDetail = null;
        if(StringUtils.isEmpty(jobGroupName)){
            jobGroupName = Scheduler.DEFAULT_GROUP;
        }

        if(StringUtils.isEmpty(triggerGroupName)){
            triggerGroupName = Scheduler.DEFAULT_GROUP;
        }

        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 任务名,任务组,任务执行类
            JobBuilder jobBuilder = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName);
            if(jMap != null && jMap.size() > 0){
                jobBuilder = jobBuilder.usingJobData(jMap);
            }
            jobDetail = jobBuilder.build();

            // 触发器
            TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
            // 触发器名,触发器组
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 触发器时间设定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 创建Trigger对象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);
            Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
            // |-NONE 无
            // |-NORMAL 正常状态
            // |-PAUSED 暂停状态
            // |-COMPLETE 完成
            // |-ERROR 错误
            // |-BLOCKED 堵塞

            log.debug("JobName:" + jobName + ",状态:" + triggerState + ",GroupName:" + jobGroupName);
            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }

            // 按新的trigger重新设置job执行
//            sched.rescheduleJob(trigger.getKey(), trigger);
        } catch (Exception e) {
            log.error("添加一个定时任务发生异常:" +  e);
        }

        return jobDetail;
    }

    /**
     * 启动一个定时作业,如果原来已经启动该作业,先进行停止,删除操作,然后再重新添加启动作业
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     * @param jobClass
     * @param jMap
     * @param cron
     */
    public JobDetail startJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                     @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        //存在定时作业,先进行删除
        if(isExistJob(jobName, jobGroupName) == true) {
            removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
        }
        //添加并启动job
        return addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
    }

    public void startJob(JobDetail jobDetail, CronTrigger trigger) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);
            Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
            // |-NONE 无
            // |-NORMAL 正常状态
            // |-PAUSED 暂停状态
            // |-COMPLETE 完成
            // |-ERROR 错误
            // |-BLOCKED 堵塞


            log.info("addJob JobKey:" + jobDetail.getKey() + ",状态:" + triggerState);
            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description: 修改一个任务的触发时间
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     *            触发器名
     * @param triggerGroupName
     *            触发器组名
     * @param cron
     *            时间设置,参考quartz说明文档
     */
    public void modifyJobTime(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                     @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        /** 方式一 :调用 rescheduleJob 开始 */
        // 触发器
        // TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
        // 触发器名,触发器组
        // triggerBuilder.withIdentity(triggerName, triggerGroupName);
        // triggerBuilder.startNow();
        // 触发器时间设定
        // triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
        // 创建Trigger对象
        // trigger = (CronTrigger) triggerBuilder.build();
        // 方式一 :修改一个任务的触发时间
        // sched.rescheduleJob(triggerKey, trigger);
        /** 方式一 :调用 rescheduleJob 结束 */

        /** 方式二:先删除,然后在创建一个新的Job */
        removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
        addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
        log.info(String.format("修改【%s】定时任务成功!",jobName));

        /** 方式二 :先删除,然后在创建一个新的Job */
    }

    /**
     * @Description: 移除一个任务
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     */
    public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) {
        try {
           /* ApplicationContext context = SpringContextUtils.getApplicationContext();
            RedisDistributedLock redLock = context.getBean(RedisDistributedLock.class);
            String lockKey = DOS + CacheConstant.LOCK_KEY + CacheConstant.SEPARATOR + jobGroupName + CacheConstant.SEPARATOR + jobName + CacheConstant.SEPARATOR + "Execute";
            redLock.unlockAsync(lockKey);*/

            Scheduler sched = schedulerFactory.getScheduler();

            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);

            sched.pauseTrigger(triggerKey);// 停止触发器
            sched.unscheduleJob(triggerKey);// 移除触发器
            sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务

            List<String> jobGroupNames = sched.getJobGroupNames();
            log.debug("移除任务组开始-->groupsNames=[");
            for (String string : jobGroupNames) {
                GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(string);
                Set<JobKey> jobKeys = sched.getJobKeys(matcher);
                log.debug(string + "下的JOB为[");
                for (JobKey jobKey : jobKeys) {
                    log.debug(jobKey.getName() + ",");
                }
                log.debug("]");

            }
            log.debug("]移除任务组结束。");
        } catch (Exception e) {
            log.error("移除job任务发生异常:" + e);
        }
    }

    public void getSchedulerStatus() {
        try {
                Scheduler scheduler = schedulerFactory.getScheduler();
                List<String> jobGroupNames = scheduler.getJobGroupNames();
                for (String jobGroupName : jobGroupNames) {
                    GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
                    Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
                    for (JobKey jobKey : jobKeys) {
                        List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                        String cron = "";
                        for (Trigger trigger : triggers) {
                            if (trigger instanceof CronTrigger) {
                                CronTrigger cronTrigger = (CronTrigger) trigger;
                                cron = cronTrigger.getCronExpression();
                            }
                        }

                        log.info("-------------job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName() + ",cron=" + cron);
                    }
                }
                List<JobExecutionContext> jobExecutionContexts = scheduler.getCurrentlyExecutingJobs();
                for(JobExecutionContext jobExecutionContext : jobExecutionContexts){
                    JobDetail jobDetail = jobExecutionContext.getJobDetail();
                    JobKey jobKey = jobDetail.getKey();
                    String fireTime = new DateTime(jobExecutionContext.getFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    String previousTime = new DateTime(jobExecutionContext.getPreviousFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    String nextFireTime = new DateTime(jobExecutionContext.getNextFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    log.info("---------current running job key=" + jobKey.getName() + ",group name=" + jobKey.getGroup() + ",scheduler name=" + scheduler.getSchedulerName()
                            + LogUtils.formatScheduledJobLogInfo(jobExecutionContext) + ",class=" + jobKey.getClass().getSimpleName() +
                            ",description=" + jobDetail.getDescription());

                }

                Set<String> pauseGroupNames = scheduler.getPausedTriggerGroups();
                for (String jobGroupName : pauseGroupNames) {
                    GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
                    Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
                    for (JobKey jobKey : jobKeys) {
                        log.info("-------------pause job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName());
                    }
                }


        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description:启动所有定时任务
     */
    public void startAllJobs() {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            sched.start();
        }
        catch (Exception e) {
            log.error("启动所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
    }

    public static Map<String,String> parseJobDataMap(String jsonStr){
        Map<String,String> map = new HashMap<>();
        if(StringUtils.isEmpty(jsonStr)){
            return map;
        }
        try{
            JSONObject json = JSONObject.parseObject(jsonStr);
            for (String key : json.keySet()) {
                String value = json.getString(key);
                map.put(key,value);
            }
        }catch (Exception e){
            log.error("parseJobDataMap error is:{}", e);
        }
        return map;
    }

    /**
     * @Description:关闭所有定时任务
     */
    public void shutdownAllJobs() {
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            if (!scheduler.isShutdown()) {
                scheduler.shutdown();
            }
        }
        catch (Exception e) {
            log.error("关闭所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 计划日程待调度的任务
     * @return
     */
    public List<SysScheduleTaskDTO> queryAllJobs(){

        List<SysScheduleTaskDTO> jobConfigs = new ArrayList<>();
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            for(String groupJob: scheduler.getJobGroupNames()){
                for(JobKey jobKey: scheduler.getJobKeys(GroupMatcher.groupEquals(groupJob))){
                    List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                    for (Trigger trigger: triggers) {
                        Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                        JobDetail jobDetail = scheduler.getJobDetail(jobKey);

                        SysScheduleTaskDTO jobConfig = new SysScheduleTaskDTO();
                        String cronExpression = "";
                        if (trigger instanceof CronTrigger) {
                            CronTrigger cronTrigger = (CronTrigger) trigger;
                            cronExpression = cronTrigger.getCronExpression();
                            TriggerKey triggerKey =cronTrigger.getKey();
                            jobConfig.setTriggerName(triggerKey.getName());
                            jobConfig.setTriggerGroupName(triggerKey.getGroup());
                        }

                        Class jobClazz = jobDetail.getJobClass();
                        String classCode = JobClassEnum.getCodeByClass(jobClazz);
                        jobConfig.setJobClass(classCode);
                        jobConfig.setJobName(jobKey.getName());
                        jobConfig.setJobGroupName(jobKey.getGroup());
                        jobConfig.setDescription(jobDetail.getDescription());
                        jobConfig.setStatus(triggerState.name());
                        jobConfig.setCron(cronExpression);
                        jobConfigs.add(jobConfig);
                    }
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("查询所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
        return jobConfigs;
    }

    /**
     * 正在运行中的任务
     * @return
     */
    public  List<SysScheduleTaskDTO> getRunningJobs(){
        List<SysScheduleTaskDTO> jobList = new ArrayList<>();
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();

            for (JobExecutionContext executingJob : executingJobs) {
                SysScheduleTaskDTO job = new SysScheduleTaskDTO();
                Trigger trigger = executingJob.getTrigger();
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                TriggerKey triggerKey =trigger.getKey();
                job.setTriggerName(triggerKey.getName());
                job.setTriggerGroupName(triggerKey.getGroup());

                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();

                Class jobClazz = jobDetail.getJobClass();
                String classCode = JobClassEnum.getCodeByClass(jobClazz);
                job.setJobClass(classCode);
                job.setJobName(jobKey.getName());
                job.setJobGroupName(jobKey.getGroup());
                job.setDescription(jobDetail.getDescription());
                job.setStatus(triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    job.setCron(cronExpression);
                }
                job.setDescription("触发器:" + trigger.getKey());
                jobList.add(job);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }
}

1.7 启动程序

quartz 集群和其他分布式集群不一样,集群实例之间不需要互相通信,只需要和DB 交互,通过 DB 感知其他势力,实现 Job 调度。因此只需要按照普通 java 程序启动即可,扩容也只需要新启动实例,不需要做额外配置。

标签:quartz,String,jobGroupName,spring,jobName,集群,scheduler,org
来源: https://blog.csdn.net/DuShiJiMoReDeHuo/article/details/121017331

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有