简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程

简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程

think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。

1、安装think-queue

使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue

2、配置消息队列的存储环境

执行完composer安装命令后,会自动生成一个queue.php的配置文件。
这里我们用redis驱动。当然还可以使用别的驱动,例如:database驱动(与redis相比不推荐)、sync等。

<?php
 /**  * 消息队列配置  
 * 内置驱动:redis、database、topthink、sync
*/ 
use think\facade\Env; 
return [
    'default'     => 'redis',
    'prefix'      => 'prefix_',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => Env::get('queue.queue_name', ''),
            'host'       => Env::get('redis.redis_hostname', '127.0.0.1'),
            'port'       => Env::get('redis.port', 6379),
            'password'   => Env::get('redis.redis_password', ''),
            'select'     => Env::get('redis.select', 0),
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ], 
];

3、队列生产者

修改app/api/controller/Test.php里使用Queue::later或者Queue::push发布任务。
queue::push(); //创建同步任务
Queue::later(); //创建异步任务,延时执行

<?php
namespace app\api\controller;
use think\Queue;
class Test extends Controller;
{
    public function queue(){
        //获取参数
        $data = file_get_contents("php://input");
        if(empty($data)){
            $this->error("post is null");
        }
        $params = json_decode($data, true);
        /*创建新消息并推送到消息队列*/
        // 当前任务由哪个类负责处理
        $job_handler_classname = "app\jobs\Send";
        // 当前队列归属的队列名称
        $job_queue_name = "send_job_queue";
        // 当前任务所需的业务数据
        $job_data = ["ts"=>time(),"uniqid"=>uniqid(), "params"=>$params];
        // 将任务推送到消息队列等待对应的消费者去执行
        $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
        if($is_pushed == false){
            $this->error("send job queue went wrong");
        }
        //操作成功
        $this->success('success');
    }
}

4、消息的消费和删除。

在app目录下创建jobs文件夹,并创建Send.php类。

<?php
namespace app\jobs;
use think\queue\Job;
/**
 * 消费者类
 * 用于处理 send_job_queue 队列中的任务
*/
class Send
{
    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
    */
    public function fire(Job $job, $data)
    {
        //有效消息到达消费者时可能已经不再需要执行了
        if(!$this->checkJob($data)){
            $job->delete();
            return;
        }
        //执行业务处理
        if($this->doJob($data)){
            $job->delete();//任务执行成功后删除
        }else{
            //检查任务重试次数
            if($job->attempts() > 3){
                $job->delete();
            }
        }
    }
    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */
    private function checkJob($data)
    {
        $ts = $data["ts"];
        $uniqid = $data["uniqid"];
        $params = $data["params"];
        return true;
    }
    /**
     * 根据消息中的数据进行实际的业务处理
    */
    private function doJob($data)
    {
      // 实际业务流程处理
      return true;
    }
}

5、监听任务并执行

work和listen两种,具体的用法和可选参数可以输入命令加 –help 查看

php think queue:listen
php think queue:work 

以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。

6、安装Supervisor

yum install -y supervisor

7、查看Supervisor安装位置

supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。

[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisord
supervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_conf
echo_supervisord_conf: /usr/bin/echo_supervisord_conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl    
supervisorctl: /usr/bin/supervisorctl
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 

8、修改配置文件

vi /etc/supervisord.conf
[include]
files = supervisord.d/*.ini

9、自定义待守护进程配置文件

/etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:
[program:shop-queue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/shop/think queue:listen
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/shop/public/queue/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/shop/public/queue/worker.log  ;输出日志文件

10、Surpervisor的启动

# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update

欢迎留言讨论。

                       

点击阅读全文

上一篇 2023年 5月 26日 am10:48
下一篇 2023年 5月 26日 am10:49