ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

TP5 redis 延迟队列

2021-06-12 15:05:39  阅读:187  来源: 互联网

标签:function return getInstance 队列 redis param queue TP5 key


TP5.1 下载安装Redis

配置redis信息

<?php
namespace app\common\redis;

class RedisHandler
{
    public $provider;

    //创建实例子
    private static $_instance = null;
    //创建redis实例子
    private function __construct()
    {
        $this->provider = new \Redis();

        $this->provider->connect(config('redis.redis_host'),config('redis.redis_port'));
    }

    final private function __clone()
    {
        
    }

    public static function getInstance(){
        if(!self::$_instance){
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key,$score,$value){
        return $this->provider->zAdd($key,$score,$value);
    }

    /**
     * 获取有序集合数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withsscores
     * @param array
     */
    public function zRange($key,$start,$end,$withsscores = null){
        return $this->provider->zRange($key,$start,$end,$withsscores);
    }

    /**
     * 删除有序集合
     * @param $key
     * @param $member
     * @param int
     */
    public function zRem($key,$member){
        return $this->provider->zRem($key,$member);
    }
}

创建一个命令

目录为
namespace app\command;

<?php
namespace app\command;

use app\common\delayqueue\DelayQueue;

use think\console\Command;

use think\console\Input;

use think\console\Output;

 

class DelayQueueWorker extends Command

{

    const COMMAND_ARGV_1 = 'queue';

 

    protected function configure()

    {

        $this->setName('delay-queue')->setDescription('延迟队列任务进程');

        $this->addArgument(self::COMMAND_ARGV_1);

    }

 

    protected function execute(Input $input, Output $output)

    {

        $queue = $input->getArgument(self::COMMAND_ARGV_1);

        //参数1 延迟队列表名,对应与redis的有序集key名
        //这边是使用while死循环 来监听 
        while (true) {
            echo $queue."===".time()."\n";

            DelayQueue::getInstance($queue)->perform();

            usleep(1000000);

        }

    }

}

队列创建方法

<?php
namespace app\common\delayqueue;

use app\common\redis\RedisHandler;

class DelayQueue
{
    //默认参数
    private $prefix = "delay_queue:";

    private $queue;
    //创建一个实例
    private static $_instance = null;
    //初始化
    private function __construct($queue)
    {
        $this->queue = $queue;   
    }

    //最终类不允许被克隆
    final private function __clone()
    {
        
    }

    //创建工厂方法
    public static function getInstance($queue=''){
        if(!self::$_instance){
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     *@Description: 
     *@MethodAuthor: lijian
     *@Date: 2021-06-12 11:44:12
     *@param:
     *@return:
    */
    public function addTask($jobClass,$runTime,$args = null){
        
        $key = $this->prefix.$this->queue;
        //导入数据到redis中
        $params = [
            'class'=> $jobClass,
            'args' => $args,
            'runtime'=>$runTime
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     *@Description: 执行job
     *@MethodAuthor: lijian
     *@Date: 2021-06-12 11:49:21
    */
    public function perform(){
        
        $key = $this->prefix.$this->queue;

        //取出有序集合第一个元素
        $result = RedisHandler::getInstance()->zRange($key,0,0);

        if(!$result) return false;
        //序列化
        $jobInfo = unserialize($result['0']);

        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);

        $jobClass = $jobInfo['class'];

        if(!@class_exists($jobClass)){
            print_r($jobClass.'undefined',PHP_EOL);
            RedisHandler::getInstance()->zRem($key,$result[0]);

            return false;
        }

        //到时间执行
        if(time() >= $jobInfo['runtime']){
            $job = new $jobClass;
            //获取订单号
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->perform();
            if($jobResult){
                //将任务移除
                RedisHandler::getInstance()->zRem($key,$result[0]);
                return true;
            }
        }
        return false;
    }
}

检验数据

    DelayQueue::getInstance('delay_job')->addTask(

        'app\common\delayqueue\CloseOrder', // 自己实现的job
   
        time()+50, // 订单失效时间
   
        ['order_id'=>123456] // 传递给job的参数
   
    );

标签:function,return,getInstance,队列,redis,param,queue,TP5,key
来源: https://www.cnblogs.com/corvus/p/14878257.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有