ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

TP5延迟队列

2022-07-19 19:32:54  阅读:171  来源: 互联网

标签:ad 队列 user job TP5 ecpm total data 延迟


今天在项目中遇到并发插入的问题。

实际场景

计算用户当当天的累计广告收益,这时候出现了并发问题,一个用户当天出现多条记录。
这时候该如何操作呢?加锁?加缓存?

分析

用户上报的广告数据都是真实有用的,出现多条数据的原因是同一时间用户上报两天记录,这时候我们需要先让一条写入库,后面的数据是修改该条记录

解决

使用延迟队列

   // 延迟队列
    $is_lock = Cache::store('redis')->handler()->setnx('user_ad_data:' . $this->_userId, 1);
    if($is_lock == 0){
        $data = [
            'user_id'             => $this->_userId,
            'day'                 => date('Y-m-d'),
            'type'                => $type,
            'ecpm'                => $ecpm,
        ];
        QueueService::pushLaterQueue(3,"app\job\UserAdData", $data, 'UserAdData');
    }else{
        Cache::store('redis')->handler()->expire('user_ad_data:' . $this->_userId, 60);
        UserAdDataModel::create([
            'user_id'             => $this->_userId,
            'ad_total_ecpm'       => $ecpm,
            'ad_total_show'       => 1,
            'video_ad_total_ecpm' => $type == 7 ? $ecpm : 0,
            'video_ad_total_show' => $type == 7 ? 1 : 0,
            'day'                 => date('Y-m-d'),
            'create_time'         => time(),
        ]);
        Cache::rm('user_ad_data:' . $this->_userId);
    }
  • 封装延迟方法
  // 延迟队列
    public static function pushLaterQueue($delay = 3, $job, $data, $queue)
    {
        $isPushed = Queue::later($delay, $job, $data, $queue);
        if ($isPushed == false) {
            Log::error($job . ':Error ' . $queue . ' MQ false');
        }
    }
  • 队列文件UserAdData.php
<?php

namespace app\job;

use app\common\model\UserAdDataModel;
use think\Db;
use think\queue\Job;

class UserAd
{
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        print("<info>Start Time:" . date('Y-m-d H:i:s') . "</info>\n");

        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if (!$isJobStillNeedToBeDone) {
            $job->delete();
            return;
        }

        //执行任务
        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            // 如果任务执行成功, 记得删除任务
            $job->delete();
//            print("<info>Hello Job has been done and deleted" . "</info>\n");
        } else {
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
//                print("<warn>Hello Job has been retried more than 3 times!" . "</warn>\n");
                $job->delete();
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
        print("<info>End Time:" . date('Y-m-d H:i:s') . "</info>\n");
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data)
    {
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理...
     */
    private function doHelloJob($data)
    {
        /*
         *  $data = [
                    'user_id'             => $this->_userId,
                    'day'                 => date('Y-m-d'),
                    'type'                => $type,
                    'ecpm'                => $ecpm,
                ];
         * */

        $ecpm = $data['ecpm'];

        $update = [
            'ad_total_ecpm' => Db::raw("ad_total_ecpm+{$ecpm}"),
            'ad_total_show' => Db::raw("ad_total_show+1"),
        ];
        if ($data['type'] == 7) {
            $update['video_ad_total_ecpm'] = Db::raw("video_ad_total_ecpm+{$ecpm}");
            $update['video_ad_total_show'] = Db::raw("video_ad_total_show+1");
        }

        $res = UserAdDataModel::where(['user_id' => $data['user_id'], 'day' => $data['day']])->update($update);
        if($res){
            return true;
        }
    }
}

这样前一个进程获取到redis锁,后面进程进来的数据无法持有锁,将要修改的数据插入队列,延迟三秒后,第一个进程的数据已经入口了,然后修改

标签:ad,队列,user,job,TP5,ecpm,total,data,延迟
来源: https://www.cnblogs.com/obeigong/p/16495363.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有