标签:info task log -- 任务 缓存 Task 精准 头条
1.添加任务
1.1.每次创建文章,就添加到任务中去
文章提交中调用添加任务方法
代码
@Override
@Async
public void addNewsToTask(Integer id, Date NEWS_SCAN_TIME) {
log.info("开始添加任务------->");
// 创建任务对象
Task task=new Task();
// 设置优先级 和类型
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
task.setExecuteTime(NEWS_SCAN_TIME.getTime());
// 创建文章对象
WmNews wmNews=new WmNews();
wmNews.setId(id);
byte[] serialize = ProtostuffUtil.serialize(wmNews);
// 序列化
task.setParameters(serialize);
// 添加任务
scheduleClient.addTask(task);
log.info("任务添加完成------->");
}
这里把文章id存入到了任务中的Parameters
属性里,方便后期拉去任务时获取Id然后调用自动审核的方法,把此Id传进去。
2.缓存任务定时转换刷新
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
// 添加Redis分布式锁
cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有zset里的键
Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : scan) {
// 换成topic_key
String topicKey=ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()){
// 不为空 从future同步到topic
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
把未来时间
的任务如果小于了当前时间
就转换状态供调用者拉去消费
3.清空缓存,Mysql每五分钟向Redis同步一次,保证数据同步
代码
/**
* 每5分钟刷新一次数据到缓存
*/
@PostConstruct
@Scheduled(cron = "0 */5 * * * ?")
public void reloadTask(){
log.info("-------开始刷新进缓存-------");
// 清理缓存中的数据
clearTask();
// 查询数据库中小于未来5分钟的数据
// 获取5分钟后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
// 添加进Redis缓存中
if (allTasks!=null){
for (Taskinfo allTask : allTasks) {
Task task=new Task();
BeanUtils.copyProperties(allTask,task);
task.setExecuteTime(allTask.getExecuteTime().getTime());
//调用添加缓存的方法
addTaskToRedis(task);
}
log.info("以从数据库刷新------->缓存<--------");
}
}
4.消费者每过一秒拉去任务进行审核
代码
拉取成功后会把数据库中的任务给删除,相当于消费掉了
/**
* 按照频率拉取任务,自动审核
*/
@Override
@Scheduled(fixedRate = 1000) //每一秒拉去一次
public void scanNewsByTask() throws Exception {
log.info("文章审核---消费任务执行---begin---");
// 拉取任务
ResponseResult poll = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if (poll.getCode().equals(200)&&poll.getData()!=null){
// 获取Task对象
Task task = JSON.parseObject(JSON.toJSONString(poll.getData()), Task.class);
byte[] parameters = task.getParameters();
// 获取 WmNews
WmNews deserialize = ProtostuffUtil.deserialize(parameters, WmNews.class);
// 自动审核
wmNewsAutoScanService.autoScanWmNews(deserialize.getId());
}
log.info("文章审核---消费任务执行---end---");
}
标签:info,task,log,--,任务,缓存,Task,精准,头条 来源: https://www.cnblogs.com/sy2022/p/16552312.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。