您现在的位置是:首页 > 数码 > 

think\queue 消息队列

2025-07-26 15:55:29
think\queue 消息队列 简介 TP 中使用 think-queue 可以实现普通队列和延迟队列。 think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性: 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,

think\queue 消息队列

简介

TP 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

  • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
  • 队列的多队列, 内存限制 ,启动,停止,守护等
  • 消息队列可降级为同步执行
消息队列实现过程
  1. 通过生产者推送消息到消息队列服务中
  2. 消息队列服务将收到的消息存入redis队列中(zset)
  3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
  4. 处理获取下来的消息调用业务类进行处理相关业务
  5. 业务处理后,需要从队列中删除消息

安装queue

composer 安装 think-queue

composer require topthink/think-queue

配置文件

在 \application\extra 新建queue.php 为 queue 的配置文件
声明启动redis (服务器redis需要先开启安装redis扩展)


return [	connector	  => 	Redis	,          // Redis 驱动	expire	     => null,             // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null	default	    => 	default	,        // 默认的队列名称	host	       => 	127.0.0.1	,      // redis 主机ip	port	       => 679,             // redis 端口	password	   => 		,               // redis 密码	select	     => 0,                // 使用哪一个 db,默认为 db0	timeout	    => 0,                // redis连接的超时时间	persistent	 => false,            // 是否是长连接
];

queue服务基类

首先要写一个声明一个服务基类 命名空间及文件夹名 自己定义好 到直接位置
我这里是在 \application\common\library\task\OrderClose.php 这个路径

namespace app\common\library\task;use app\common\controller\Frontend;
use think\Lang;
use think\Respe;
use think\queue\Job;/*** 未支付订单到期自动关闭任务*/
class OrderClose
{/*** fire方法是消息队列默认调用的方法* @param Job            $job      当前的任务对象* @param array|mixed    $data     发布任务时自定义的数据*/public function fire(Job $job, $data){// 此处做一些 check,提前判断是否需要执行// $isJobStilleedToBeDone = $this->checkJob($data);// if(! $isJobStilleedToBeDone){//     $job->delete();//     return;// }// if ($job->attempts() > 6) {//     $job->delete();//     // 也可以重新发布这个任务//     //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行// }// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();// $job->release(10);} else {//删除任务$job->delete();// 通过这个方法可以检查这个任务已经重试了几次了}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param $data 发布任务时自定义的数据* @return bool 任务执行的结果*/private function checkJob($data){// $order_id = !empty($data[	order_id	]) ? $data[	order_id	] : 		;// if(!$order_id) return false;// $orderModel = new \app\api\model\order\Order;// $orderInfo = $orderModel->where([	id	=>$order_id])->find();// if(!$orderInfo) return false;// if($orderInfo->status != 	1	) return false;// //更新订单数据// $updateData = [//     	status	=>	10	,//     	close_time	=>time(),//     	remarks	=>	未支付订单超时自动关闭	// ];// $ret = $orderModel->save($updateData);// if($ret){//     return true;// }else{//     return false;    // }}/*** 根据消息中的数据进行实际的业务处理...* @param $data* @return bool*/private function doJob($data){$order_id = !empty($data[	order_id	]) ? $data[	order_id	] : 		;if(!$order_id) return false;$orderInfo = \app\api\model\order\Order::where([	id	=>$order_id])->find();// \think\Log::write([	msg	=>	测试队列7878	,	data	=>$orderInfo,	order_id	=>$order_id,	activity_id	=>$orderInfo[	activity_id	]],	error	);if(!$orderInfo) return false;if($orderInfo->status != 	1	) return false;//更新订单数据$updateData = [	status	=>	10	,	close_time	=>time(),	remarks	=>	未支付订单超时自动关闭	];$ret = \app\api\model\order\Order::where([	id	=>$order_id])->update($updateData);//归还库存if($orderInfo[	activity_id	]){// \think\Log::write([	msg	=>	测试队列返还库存	,	data	=>$orderInfo,	order_id	=>$order_id,	activity_id	=>$orderInfo[	activity_id	]],	error	);$orderProductInfo = \app\api\model\order\OrderProduct::where(	order_id	,$order_id)->find();$backStock = \app\api\model\activity\Activity::where(	id	,$orderInfo[	activity_id	])->setInc(	stock	,$orderProductInfo[	number	]);}if($ret){return true;}else{return false;    }}}

$job->release(2); 重新调用任务 2是延迟2秒执行
$job->delete(); 删除任务

调用

在实际应用中调用该基类

//创建订单自动关闭任务
$jobHandlerClassame = 	app\common\library\task\OrderClose	;
$jobData = [	order_id	=>$orderProductDatas[	order_id	]];
$jobQueueame = 	order_close	;
//延迟执行
$isPushed = \think\Queue::later(0 * 60,$jobHandlerClassame, $jobData,$jobQueueame);
//立即执行
$isPushed = \think\Queue::push($jobHandlerClassame, $jobData,$jobQueueame);

两个方法,前者是(0 * 60)秒后执行,后者立即执行
注意的是
later 延迟执行 无反参 我执行成功 返回了null
push 有反参 返回随机的字符串

开启守护进程

我服务器是用宝塔的守护进程

在这里安装Supervisor管理器
安装完需要配置一下守护进程的命令行


名称必须是英文
运行目录要选择项目根目录
命令行如下 注意order_close 是你项目基类名称

php think queue:listen --queue order_close

填写完成之后 保存就可以了

如果不是宝塔环境 请看一下 supervisor—进程管理神器
给我大哥点点关注

结束

这里的配置就基本结束了 基类中的 注释 表明了一下需要参数和方法 需要自己多揣摩一下
还有自己的业务逻辑也要修改 这里只作为参考 大家自行删除就可以

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/shuma/731244.html

相关标签:无
上传时间: 2023-12-02 18:30:29
留言与评论(共有 13 条评论)
本站网友 android开发工具
2分钟前 发表
本站网友 车辆购置税纳税申报表
26分钟前 发表
order_id =>$order_id
本站网友 mpeg转mp4
8分钟前 发表
data =>$orderInfo
本站网友 cmct论坛
13分钟前 发表
where([ id =>$order_id])->find();// \think\Log
本站网友 鄂州美食
2分钟前 发表
$data){// 此处做一些 check,提前判断是否需要执行// $isJobStilleedToBeDone = $this->checkJob($data);// if(! $isJobStilleedToBeDone){// $job->delete();// return;// }// if ($job->attempts() > 6) {// $job->delete();// // 也可以重新发布这个任务// //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行// }// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();// $job->release(10);} else {//删除任务$job->delete();// 通过这个方法可以检查这个任务已经重试了几次了}}/*** 有些消息在到达消费者时
本站网友 马尾二手房
16分钟前 发表
本站网友 婴儿奶粉排名
29分钟前 发表
activity_id =>$orderInfo[ activity_id ]]
本站网友 丝瓜的药用价值
30分钟前 发表
order_id =>$order_id
本站网友 镭射去痘印
26分钟前 发表
where([ id =>$order_id])->update($updateData);//归还库存if($orderInfo[ activity_id ]){// \think\Log
本站网友 igcc
19分钟前 发表
where([ id =>$order_id])->update($updateData);//归还库存if($orderInfo[ activity_id ]){// \think\Log
本站网友 奇迹按键精灵脚本
16分钟前 发表
error );if(!$orderInfo) return false;if($orderInfo->status != 1 ) return false;//更新订单数据$updateData = [ status => 10
本站网友 低回的意思
12分钟前 发表
push($jobHandlerClassame