ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

TP6(thinkphp6)队列与延时队列

2021-12-17 20:01:46  阅读:321  来源: 互联网

标签:return 队列 type thinkphp6 queue TP6 job data


 

安装

在此我就不再略过TP6的项目创建过程了,大致就是安装composer工具,安装成功以后,再使用composer去创建项目即可。

think-queue 安装

composer require topthink/think-queue

 

项目中添加驱动配置

我们需要在安装好的config下找到 queue.php

<?php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 4,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

 

生产者

<?php
namespace app\controller;

use app\BaseController;
use think\facade\Queue;

class Index extends BaseController
{
    public function queue()
    {
        //当前任务将由哪个类来负责处理。
        //当轮到该任务时,系统将生成一个该类的实例,并默认调用其 fire 方法
        $jobHandlerClassName = 'app\Job\Order';

        //当前任务归属的队列名称,如果为新队列,会自动创建
        //php think queue:work --queue orderJobQueue
        //php think queue:work --queue orderJobQueue --daemon

        $jobQueueName = "orderJobQueue";

        //数组数据
        $orderData = [
            'id'      => uniqid(),
            'time'    => time(),
            'message' => 'later message83'
        ];

        //将该任务推送到消息队列,等待对应的消费者去执行
        //这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系

        //立即执行
        $isPushed = Queue::push($jobHandlerClassName, $orderData, $jobQueueName);
        //延迟10秒后执行
        //$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName);

        if ($isPushed !== false) {
            echo date('Y-m-d H:i:s') . " 队列添加成功";
        } else {
            echo '队列添加失败';
        }
    }
}

 

消费者

<?php
namespace app\Job;

use think\facade\Log;
use think\queue\Job;

/**
 * @Title: app\task\job$Order
 * @Package package_name
 * @Description: todo(测试订单消费者)
 * @author Jack
 */
class Order
{
    /**
     * @Title: fire
     * @Description: todo(fire方法是消息队列默认调用的方法)
     * @param Job $job
     * @param array $data
     * @author Jack
     * @throws
     */
    public function fire(Job $job, array $data)
    {
        //有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        $jobId =  $job->getJobId();
        $isJobDone = $this->orders($data, $jobId);
        if ($isJobDone) {
            //如果任务执行成功,记得删除任务
            $job->delete();
        } else {
            //通过这个方法可以检查这个任务已经重试了几次了
            if ($job->attempts() > 3){
                Log::error('试了3次了');
                $job->delete();

                //也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * @Title: checkDatabaseToSeeIfJobNeedToBeDone
     * @Description: todo(有些消息在到达消费者时,可能已经不再需要执行了)
     * @param array $data
     * @return boolean
     * @author Jack
     * @throws
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data)
    {
        return true;
    }

    /**
     * @Title: orders
     * @Description: todo(数据处理)
     * @param array $data
     * @author Jack
     * @throws
     */
    public function orders(array $data,  $jobId)
    {
        //对订单进行数据库操作或其他等等
        Log::info(date('Y-m-d H:i:s') . ' - data:' . json_encode($data));
        return true;
    }
}

 

服务器执行常驻命令

php think queue:work --queue orderJobQueue

 

标签:return,队列,type,thinkphp6,queue,TP6,job,data
来源: https://www.cnblogs.com/dawuge/p/15703634.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有