搜索
系统检测到您的用户名不符合规范:

tp自带的queue支持rabbitmq

浏览:6434 发布日期:2019年07月19日 分类:功能实现 关键字: rabbitmq amqp queue
thinkphp【5.1】自带的queue【版本2.0】增加支持rabbitmq
----------------------------
更新:
修复延时消息投递的问题。
增加消费者模式http://www.thinkphp.cn/code/7131.html
-----------------------------
队列用法示例think-queue
https://packagist.org/packages/topthink/think-queue

需要php-amqplib
https://packagist.org/packages/php-amqplib/php-amqplib

config/queue.phpreturn [
'connector' => 'Amqp',
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'username' => 'guest',
'password' => 'guest',
'port' => 5672,
'vhost' => '/',
'select' => 0,
'timeout' => 0,
'persistent' => false, // 是否是长连接
];
vendor\topthink\think-queue\src\queue\connector\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\connector;

use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Amqp as AmqpJob;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


class Amqp extends Connector
{
protected $connection;
protected $channel;


protected $options = [
'expire' => 60,
'host' => '127.0.0.1',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'timeout' => 0
];

public function __construct(array $options)
{
if (!extension_loaded('sockets')) {
throw new Exception('sockets扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}

$this->connection = new AMQPStreamConnection($this->options['host'], $this->options['port'], $this->options['username'], $this->options['password']);
$this->channel = $this->connection->channel();
}

public function __destruct()
{
$this->channel->close();
$this->connection->close();
}

public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}

public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);

$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$queue_name_delay = $queue_name.'_delay';
$topic_name_delay = $queue_name.'_topic_delay';

if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}

$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);

$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $topic_name);
//$option->set('x-dead-letter-routing-key','routing_key');

$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
$this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);

$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);
}

public function pop($queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);

//拉取模式
$msg = $this->channel->basic_get($queue_name,false);
if (!empty($msg)) {
$job = $msg->body;
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);

return new AmqpJob($this, $job, $queue);
}


//消费者模式,暂不兼容
// $callback = function($msg) use ($original){
// echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// $job = $msg->body;
// return new AmqpJob($this, $job, $original);
// };
//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
// $this->channel->basic_qos(0, 1, false);
// $this->channel->basic_consume($queue_name,'',false,false,false,false, $callback);
// while (count($this->channel->callbacks)) {
// $this->channel->wait();
// }

}

/**
* 重新发布任务
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);

$queue_name = $this->getQueue($queue);
$queue_name_delay = $queue_name.'_delay';
$topic_name_delay = $queue_name.'_topic_delay';

if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}

$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $queue_name.'_topic');
//$option->set('x-dead-letter-routing-key','routing_key');

$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
$this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);

$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);

}

public function pushRaw($payload, $queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';

$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);

$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name);

//测试延迟队列
//$this->release($queue, $payload, 0, 2);

return json_decode($payload, true)['id'];
}

protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);

return $this->setMeta($payload, 'attempts', 1);
}

/**
* 删除任务
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{

}


/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}

/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues_' . ($queue ?: $this->options['default']);
}
}
vendor\topthink\think-queue\src\queue\job\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\job;

use think\queue\Job;
use think\queue\connector\Amqp as AmqpQueue;

class Amqp extends Job
{

/**
* The redis queue instance.
* @var RedisQueue
*/
protected $amqp;

/**
* The database job payload.
* @var Object
*/
protected $job;

public function __construct(AmqpQueue $amqp, $job, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->amqp = $amqp;
}

/**
* Fire the job.
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}

/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return json_decode($this->job, true)['attempts'];
}

/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job;
}

/**
* 删除任务
*
* @return void
*/
public function delete()
{
parent::delete();

$this->amqp->deleteReserved($this->queue, $this->job);
}

/**
* 重新发布任务
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);

$this->delete();

$this->amqp->release($this->queue, $this->job, $delay, $this->attempts() + 1);
}
}
修改了获取异常,防止无限制获取的问题
vendor\topthink\think-queue\src\queue\Worker.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue;

use Exception;
use think\facade\Hook;
use think\Queue;

class Worker
{

/**
* 执行下个任务
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
//处理获取错误问题,防止无限制获取
try {
$job = $this->getNextJob($queue);
} catch (\Exception $e) {
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
//重新抛出错误
throw $e;
return ['job' => null, 'failed' => false];
}

if (!is_null($job)) {
Hook::listen('worker_before_process', $queue);
return $this->process($job, $maxTries, $delay);
}

Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);

return ['job' => null, 'failed' => false];
}

/**
* 获取下个任务
* @param string $queue
* @return Job
*/
protected function getNextJob($queue)
{
if (is_null($queue)) {
return Queue::pop();
}

foreach (explode(',', $queue) as $queue) {
if (!is_null($job = Queue::pop($queue))) {
return $job;
}
}
}

/**
* Process a given job from the queue.
* @param \think\queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array
* @throws Exception
*/
public function process(Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($job);
}

try {
$job->fire();

return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
if (!$job->isDeleted()) {
$job->release($delay);
}

throw $e;
}
}

/**
* Log a failed job into storage.
* @param \Think\Queue\Job $job
* @return array
*/
protected function logFailedJob(Job $job)
{
if (!$job->isDeleted()) {
try {
$job->delete();
$job->failed();
} finally {
Hook::listen('queue_failed', $job);
}
}

return ['job' => $job, 'failed' => true];
}

/**
* Sleep the script for a given number of seconds.
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}

}
Worker.php和mq队列没有关系。
使用的是拉取模式,效率上和消费者模式还是差点。
还在测试阶段有问题留言反馈。

附件 think-queue.zip ( 32.65 KB 下载:52 次 )

评论() 相关
后面还有条评论,
评论支持使用[code][/code]标签添加代码
您需要登录后才可以评论 登录 | 立即注册
收藏
q4323636
积分:3047 等级:LV4
热点推荐
(追記) (追記ここまで)
最新更新

我们

合作

网站

信息

ThinkPHP 是一个免费开源的,快速、简单的面向对象的 轻量级PHP开发框架 ,创立于2006年初,遵循Apache2开源协议发布,是为了敏捷WEB应用开发和简化企业应用开发而诞生的。ThinkPHP从诞生以来一直秉承简洁实用的设计原则,在保持出色的性能和至简的代码的同时,也注重易用性。并且拥有众多的原创功能和特性,在社区团队的积极参与下,在易用性、扩展性和性能方面不断优化和改进,已经成长为国内最领先和最具影响力的WEB应用开发框架,众多的典型案例确保可以稳定用于商业以及门户级的开发。

AltStyle によって変換されたページ (->オリジナル) /