ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java高并发专题之37、如何实现一个通用的延迟队列?

2022-05-05 00:00:24  阅读:199  来源: 互联网

标签:Java Thread 队列 37 订单 main public 延迟


电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,比如15分钟,若时间到了之后,还未支付的,订单将被关闭,库存将被释放。

这种业务就需要用到延迟队列的功能,将任务丢到延迟队列、设置一个延迟时间、回调函数,到了时间之后,延迟队列将回调指定的函数消费指定的任务。

下面代码是一个通用的延迟队列的实现,大家可以直接拿去用。

代码还是比较简单的,技术要点:

  • 调用addTask方法将任务丢到延迟队列中,主要参数(延迟时间、任务信息、回调【任务到期后会进行回调】)
  • 使用到了java中的延迟队列DelayQueue来存放延迟任务
  • 下面的构造方法会自动调用一个start方法,start方法中会自动启动一个线程,线程轮询从延迟队列中拉取到期的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建延迟任务中指定的回调函数
  • main方法中有使用步骤
  1. import java.util.concurrent.*;
  2. import java.util.function.Consumer;
  3. import java.util.logging.Logger;
  4. public class DelayQueueService<T> {
  5. Logger logger = Logger.getLogger(DelayQueueService.class.getName());
  6. //延迟队列名称
  7. private String delayQueueName;
  8. private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
  9. //处理队列中任务的线程池
  10. private ExecutorService executorService;
  11. public DelayQueueService(String delayQueueName) {
  12. this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4));
  13. }
  14. public DelayQueueService(String delayQueueName, ExecutorService executorService) {
  15. this.delayQueueName = delayQueueName;
  16. this.executorService = executorService;
  17. //启动队列消费
  18. this.start();
  19. }
  20. /**
  21. * 添加任务
  22. *
  23. * @param delayedTimeUnit 延迟时间单位
  24. * @param delayedTime 延迟时间
  25. * @param task 任务
  26. * @param consumer 任务消费者(到期了会回调)
  27. */
  28. public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {
  29. this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer));
  30. }
  31. private void start() {
  32. //轮询从延迟队列中拉取任务,然后调用线程池进行处理
  33. Thread pollThread = new Thread(() -> {
  34. while (true) {
  35. try {
  36. DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS);
  37. if (this.executorService.isShutdown()) {
  38. break;
  39. }
  40. if (delayedTask != null) {
  41. executorService.submit(() -> {
  42. delayedTask.consumer.accept(delayedTask.task);
  43. });
  44. }
  45. } catch (InterruptedException e) {
  46. logger.warning(e.getMessage());
  47. }
  48. }
  49. });
  50. pollThread.setDaemon(Thread.currentThread().isDaemon());
  51. pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName);
  52. pollThread.start();
  53. }
  54. public void close() {
  55. if (!this.executorService.isShutdown()) {
  56. this.executorService.shutdown();
  57. }
  58. }
  59. public class DelayedTask implements Delayed {
  60. //延迟时间单位
  61. private TimeUnit delayedTimeUnit;
  62. //延迟时间
  63. private long delayedTime;
  64. //到期时间(毫秒)
  65. private long endTime;
  66. //延迟任务信息
  67. private T task;
  68. //消费者
  69. private Consumer<T> consumer;
  70. public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {
  71. this.delayedTimeUnit = delayedTimeUnit;
  72. this.delayedTime = delayedTime;
  73. this.task = task;
  74. this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime);
  75. this.consumer = consumer;
  76. }
  77. @Override
  78. public long getDelay(TimeUnit unit) {
  79. return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  80. }
  81. @Override
  82. public int compareTo(Delayed o) {
  83. DelayedTask task = (DelayedTask) o;
  84. return Long.compare(this.endTime, task.endTime);
  85. }
  86. }
  87. public static void main(String[] args) {
  88. //创建一个延迟队列:用来对超过支付日期的订单进行关闭
  89. String delayQueueName = "orderCloseDelayQueue";
  90. //1、创建延迟队列
  91. DelayQueueService<String> orderCloseDelayQueue = new DelayQueueService<String>(delayQueueName);
  92. for (int i = 1; i <= 10; i++) {
  93. //2、调用addTask将延迟任务加入延迟队列
  94. orderCloseDelayQueue.addTask(TimeUnit.SECONDS, i, "订单" + i, new Consumer<String>() {
  95. @Override
  96. public void accept(String s) {
  97. System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s);
  98. }
  99. });
  100. }
  101. //3、系统关闭的时候,调用延迟队列的close方法
  102. //orderCloseDelayQueue.close();
  103. }
  104. }

main方法中模拟了10个延迟任务,运行看看效果,输出

  1. 1614346780438,Thread[pool-1-thread-1,5,main],关闭订单:订单1
  2. 1614346781437,Thread[pool-1-thread-2,5,main],关闭订单:订单2
  3. 1614346782436,Thread[pool-1-thread-3,5,main],关闭订单:订单3
  4. 1614346783437,Thread[pool-1-thread-4,5,main],关闭订单:订单4
  5. 1614346784437,Thread[pool-1-thread-5,5,main],关闭订单:订单5
  6. 1614346785437,Thread[pool-1-thread-6,5,main],关闭订单:订单6
  7. 1614346786437,Thread[pool-1-thread-7,5,main],关闭订单:订单7
  8. 1614346787436,Thread[pool-1-thread-8,5,main],关闭订单:订单8
  9. 1614346788437,Thread[pool-1-thread-9,5,main],关闭订单:订单9
  10. 1614346789437,Thread[pool-1-thread-10,5,main],关闭订单:订单10
来源:http://itsoku.com/course/1/205

标签:Java,Thread,队列,37,订单,main,public,延迟
来源: https://www.cnblogs.com/konglxblog/p/16223042.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有