ThinkAdmin V6 官方文档

⚡ 异步任务

ThinkAdmin 的异步任务处理是其核心特性之一,支持在 WindowsLinux 平台上并行执行多个任务,显著提高了任务处理效率。

📚 基础概念

🤔 基本介绍

异步任务是指不需要立即返回结果的操作,可以在后台慢慢处理。

简单理解:就像餐厅点餐,同步任务是"现做现吃"(用户要等待),异步任务是"下单后先给你号牌,菜好了再通知你"(用户无需等待)。

实际场景:

  • 发送邮件:发送 1000 封邮件可能需要 10 分钟,如果同步执行,用户要等 10 分钟
  • 生成报表:处理大量数据生成报表可能需要 5 分钟,如果同步执行,用户要等 5 分钟
  • 图片处理:压缩 100 张图片可能需要 3 分钟,如果同步执行,用户要等 3 分钟

使用异步任务后:

  • 用户提交请求后立即返回(1 秒内)
  • 后台慢慢处理任务(可能需要几分钟)
  • 处理完成后通知用户(通过进度条或消息)

🤔 使用优势

问题1:用户体验差

如果使用同步任务,用户需要等待很长时间:

// ❌ 不推荐:同步处理(用户要等待)
public function sendEmails()
{
 $users = User::select(); // 1000 个用户
 foreach ($users as $user) {
 // 发送邮件,每个需要 1 秒
 Mail::send($user->email, '通知');
 }
 // 用户需要等待 1000 秒(约 17 分钟)!
 $this->success('发送完成');
}

使用异步任务后:

// ✅ 推荐:异步处理(用户立即返回)
public function sendEmails()
{
 // 创建异步任务,立即返回
 $this->_queue('批量发送邮件', 'email:send', 0, ['user_ids' => $ids]);
 // 用户立即看到"任务已创建"的提示,无需等待
 $this->success('任务已创建,正在后台处理');
}

问题2:系统性能

同步任务会长时间占用 Web 进程,影响其他用户访问:

// ❌ 不推荐:占用 Web 进程
public function processData()
{
 // 处理大量数据,需要 5 分钟
 // 这 5 分钟内,这个 Web 进程被占用,无法处理其他请求
 processLargeData();
}

使用异步任务后:

// ✅ 推荐:独立进程处理
public function processData()
{
 // 创建异步任务,Web 进程立即释放
 $this->_queue('处理数据', 'data:process', 0, []);
 // Web 进程可以立即处理其他用户的请求
}

🔄 同步 vs 异步

同步任务流程:

用户提交请求

服务器处理(用户等待中...)

处理完成(可能需要几分钟)

返回结果给用户

异步任务流程:

用户提交请求

立即返回"任务已创建"

后台独立进程处理(用户无需等待)

处理完成后更新状态

用户可以通过进度条查看进度

⚙️ 工作原理

ThinkAdmin 使用队列机制处理异步任务,整个过程是自动的:

步骤1:创建任务

将任务信息保存到数据库:

// 在控制器中创建任务
$this->_queue('任务标题', 'command:name', 0, ['param' => 'value']);

系统自动执行:

// 系统自动将任务保存到数据库
// 表名:system_queue
// 字段:title(标题)、command(命令)、data(数据)、status(状态)等

步骤2:监听进程扫描

后台监听进程每 0.5 秒扫描一次数据库:

# 监听进程自动执行(无需手动操作)
php think xadmin:queue listen

扫描逻辑(系统自动执行):

// 每 0.5 秒执行一次
while (true) {
 // 查询待处理的任务(status = 1)
 $tasks = Queue::where('status', 1)->select();

 foreach ($tasks as $task) {
 // 创建独立进程执行任务
 exec("php think {$task->command} > /dev/null 2>&1 &");
 }

 sleep(0.5); // 等待 0.5 秒后继续扫描
}

步骤3:执行任务

系统创建独立的 PHP-CLI 进程执行任务:

# 系统自动创建进程(无需手动操作)
php think command:name --data='{"param":"value"}'

步骤4:更新状态

任务执行完成后,自动更新状态:

// 在任务中调用(系统自动处理)
$this->setQueueSuccess('任务完成'); // 更新状态为成功
// 或
$this->setQueueError('任务失败'); // 更新状态为失败

📖 相关概念

队列(Queue)

  • 待处理任务的列表
  • 存储在数据库 system_queue 表中
  • 每个任务有唯一编号(以 Q 开头)

进程(Process)

  • 执行任务的独立程序
  • 与 Web 进程分离,互不影响
  • Windows 使用 wmic 创建,Linux 使用 & 创建

监听(Listen)

  • 定期检查是否有新任务
  • 默认每 0.5 秒扫描一次
  • 发现任务后自动创建进程执行

命令(Command)

  • 执行任务的具体代码
  • 继承 think\admin\Command
  • 或继承 think\admin\Queue

🎯 使用场景

场景1:批量发送邮件

// 需要发送 1000 封邮件,每封需要 1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢发送
$this->_queue('批量发送邮件', 'email:batch', 0, ['user_ids' => $ids]);

场景2:处理大量数据

// 需要处理 10000 条数据,每条需要 0.1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('处理数据', 'data:process', 0, ['data' => $data]);

场景3:生成报表

// 需要生成复杂的 Excel 报表,需要 5 分钟
// 同步:用户等待 5 分钟
// 异步:用户立即返回,生成完成后通知下载
$this->_queue('生成报表', 'report:generate', 0, ['params' => $params]);

场景4:图片处理

// 需要压缩 100 张图片,每张需要 2 秒
// 同步:用户等待 200 秒(约 3 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('压缩图片', 'image:compress', 0, ['images' => $images]);

🚀 主要特性

  • 并行处理: 支持多个任务同时执行
  • 跨平台: 兼容 Windows 和 Linux 系统
  • 自动监控: 自动扫描和执行待处理任务
  • 进程管理: 独立的 PHP-CLI 进程处理任务
  • 异常恢复: 支持自动重启和异常处理

⚙️ 工作原理

ThinkAdmin 的异步任务机制基于队列和进程管理,整个过程是自动化的。

🔍 任务扫描机制

监听进程定期扫描数据库,查找待处理的任务:

扫描频率:

  • 默认每 0.5 秒 扫描一次
  • 可以通过配置调整扫描频率
  • 扫描不会影响系统性能(查询很快)

扫描过程(系统自动执行):

// 系统自动执行的逻辑(无需手动编写)
while (true) {
 // 1. 查询待处理的任务(status = 1)
 $tasks = Db::name('system_queue')
 ->where('status', 1)
 ->where('exec_at', '<=', time())
 ->select();

 // 2. 为每个任务创建独立进程
 foreach ($tasks as $task) {
 // 更新任务状态为处理中
 $task->status = 2;
 $task->save();

 // 创建独立进程执行任务
 if (PHP_OS === 'WINNT') {
 // Windows 系统使用 wmic
 exec("wmic process call create \"php think {$task->command}\"");
 } else {
 // Linux 系统使用 &
 exec("php think {$task->command} > /dev/null 2>&1 &");
 }
 }

 // 3. 等待 0.5 秒后继续扫描
 sleep(0.5);
}

多任务并行:

  • 系统支持同时执行多个任务
  • 每个任务都是独立的进程
  • 互不影响,可以并行处理

🎮 进程管理

ThinkAdmin 提供了完整的进程管理命令:

LISTEN - 监听进程

启动监听进程,负责扫描和执行任务:

# 启动监听进程(前台运行,可以看到日志)
php think xadmin:queue listen

特点:

  • 前台运行,可以看到实时日志
  • 适合开发环境调试
  • 进程异常退出后不会自动重启

START - 启动守护进程

启动守护进程,自动管理监听进程:

# 启动守护进程(推荐方式)
php think xadmin:queue start

特点:

  • 后台运行,自动管理监听进程
  • 监听进程异常退出后会自动重启
  • 适合生产环境使用

STOP - 停止进程

停止所有正在执行的任务进程:

# 停止所有任务进程
php think xadmin:queue stop

使用场景:

  • 系统维护时需要停止所有任务
  • 任务执行异常需要强制停止
  • 更新代码后需要重启进程

QUERY - 查询进程

查询当前正在执行的任务进程:

# 查询当前运行的任务
php think xadmin:queue query

输出示例:

正在执行的任务:
- Q20241110123456: 批量发送邮件 (进度: 50%)
- Q20241110123457: 处理数据 (进度: 30%)

STATUS - 查看状态

查看守护进程的运行状态:

# 查看守护进程状态
php think xadmin:queue status

输出示例:

守护进程状态:运行中
监听进程状态:运行中
当前任务数:2

🔄 完整工作流程

流程1:创建任务

// 1. 用户在控制器中创建任务
public function sync()
{
 $this->_queue('同步数据', 'data:sync', 0, []);
}

流程2:任务入库

// 2. 系统自动将任务保存到数据库
// system_queue 表
// - title: '同步数据'
// - command: 'data:sync'
// - status: 1 (待处理)
// - exec_at: 当前时间

流程3:监听扫描

# 3. 监听进程扫描数据库(每 0.5 秒)
# 发现待处理任务,创建独立进程执行

流程4:执行任务

// 4. 独立进程执行任务
class DataSync extends Command
{
 protected function execute()
 {
 // 处理业务逻辑
 $this->setQueueProgress('处理中...', 50);
 // ...
 $this->setQueueSuccess('完成');
 }
}

流程5:更新状态

// 5. 任务完成后,自动更新数据库状态
// status: 3 (成功) 或 4 (失败)

流程6:前端显示

// 6. 前端通过任务编号查询进度
$.loadQueue('Q20241110123456');
// 显示进度条和状态

⚠️ 重要提示

单进程限制: 为避免重复执行任务,请确保只启动一个监听进程 LISTEN

推荐启动方式: 使用 START 指令启动进程,异常时会自动重启 LISTEN 进程。

Linux 环境: 任务主进程会定期执行 ps ax 命令检查进程状态,某些安全工具可能发出警告,可放心忽略。

PHP 函数要求: 需要启用 execproc_openposix_kill 等函数。

🔧 守护进程管理

进程监控

  • 定时检查: 设置定时任务以定期检查监听进程 LISTEN 的状态
  • 自动重启: 如果发现 LISTEN 进程未运行或异常时,则自动执行指令 START 来重新启动它
  • 状态监控: 实时监控进程运行状态和健康度
  • 异常处理: 自动处理进程异常和重启

用户权限

  • 指定用户: 在 Linux 服务器上运行时,建议使用指定用户执行
  • 权限避免: 避免 CLI、CGI 和 FPM 这三种运行方式的缓存文件权限冲突
  • 日志检查: 在排查问题时,请检查相关日志
  • 用户命令: 建议使用 sudo -u www php think xadmin:queue start 命令

环境要求

  • exec 函数: 任务进程的创建是通过 PHP 的 exec 函数实现的
  • 函数启用: 请确保 exec 函数未被禁用
  • 异常处理: 在执行过程中遇到异常时,请根据错误提示调整 PHP 的运行环境设置
  • 权限配置: 确保 PHP 有足够的权限执行系统命令

进程管理相关指令

  • 执行 php think xadmin:queue stop # 停止所有正在执行的异步任务进程
  • 执行 php think xadmin:queue query # 查询当前所有正在执行的任务进程
  • 执行 php think xadmin:queue start # 开启异步任务守护进程(后台进程)
  • 执行 php think xadmin:queue listen # 启动异步任务监听进程(前台进程)
  • 执行 php think xadmin:queue status # 查看当前守护进程的后台运行状态

➕ 创建任务

ThinkAdmin 提供了两种方式创建异步任务:函数方式和控制器方式。

🎯 控制器方式

在控制器中使用 $this->_queue() 方法创建任务,这是最常用的方式:

<?php
declare(strict_types=1);

namespace app\admin\controller;

use think\admin\Controller;

class Member extends Controller
{
 /**
 * 同步会员数据
 */
 public function sync()
 {
 // 创建异步任务
 $this->_queue(
 '同步会员数据', // 参数1:任务标题
 'member:sync', // 参数2:执行命令
 0, // 参数3:延时时间(秒)
 ['type' => 'all'], // 参数4:任务数据
 0, // 参数5:允许重复创建(0=不允许,1=允许)
 0 // 参数6:是否循环执行(0=不循环,1=循环)
 );
 }
}

参数说明:

参数说明示例
$title任务标题'同步会员数据'
$command执行命令'member:sync''app\store\queue\MemberSync'
$later延时时间(秒)0 表示立即执行,60 表示 60 秒后执行
$data任务数据(数组)['type' => 'all', 'ids' => [1,2,3]]
$rscript允许重复创建0 不允许,1 允许
$loops是否循环执行0 不循环,1 循环

返回值:

  • 成功:自动返回 success 响应,data 字段包含任务编号(如 Q20241110123456)
  • 失败:自动返回 error 响应

🎯 函数方式

使用 sysqueue() 函数创建任务,可以获取任务编号:

<?php
// 创建任务并获取任务编号
$code = sysqueue(
 '同步会员数据', // 任务标题
 'member:sync', // 执行命令
 0, // 延时时间
 ['type' => 'all'], // 任务数据
 1, // 允许重复创建
 0 // 是否循环执行
);

// $code 是任务编号,如 'Q20241110123456'
echo "任务编号:{$code}";

使用场景:

  • 需要在代码中获取任务编号
  • 需要在非控制器环境中创建任务
  • 需要手动处理返回值

⏰ 延时执行

可以设置任务延时执行,适用于定时任务场景:

// 立即执行
$this->_queue('任务1', 'command:name', 0, []);

// 60 秒后执行
$this->_queue('任务2', 'command:name', 60, []);

// 1 小时后执行(3600 秒)
$this->_queue('任务3', 'command:name', 3600, []);

实际应用:

  • 订单支付后,30 分钟后检查是否发货
  • 用户注册后,5 分钟后发送欢迎邮件
  • 数据备份,每天凌晨 2 点执行

🔄 防止重复创建

某些任务不应该重复创建,可以通过 $rscript 参数控制:

// 不允许重复创建(推荐)
$this->_queue('同步会员数据', 'member:sync', 0, [], 0);
// 如果任务已存在(待处理或处理中),则不会创建新任务

// 允许重复创建
$this->_queue('发送邮件', 'email:send', 0, [], 1);
// 即使任务已存在,也会创建新任务

判断规则:

  • 系统根据任务标题判断是否重复
  • 如果标题相同且状态为"待处理"或"处理中",则视为重复
  • 建议在标题中加入唯一标识,如:"同步会员数据-{$userId}"

🔁 循环执行

某些任务需要定期执行,可以设置循环执行:

// 循环执行(每 60 秒执行一次)
$this->_queue('定时检查', 'check:status', 0, [], 0, 1);

注意事项:

  • 循环任务会定期自动重新创建
  • 需要确保任务执行时间小于循环间隔
  • 建议在任务中控制循环次数,避免无限循环

📦 任务数据传递

可以通过 $data 参数传递数据给任务:

// 传递数据
$this->_queue('处理订单', 'order:process', 0, [
 'order_id' => 12345,
 'user_id' => 67890,
 'action' => 'refund'
]);

// 在任务中获取数据
class OrderProcess extends Command
{
 protected function execute(Input $input, Output $output)
 {
 // 获取任务数据
 $data = $this->queue->data;
 $orderId = $data['order_id'];
 $userId = $data['user_id'];
 $action = $data['action'];

 // 处理业务逻辑
 // ...
 }
}

🎯 创建子任务

如果要在任务中创建新任务,需要创建新的任务对象:

<?php
class ParentTask extends Command
{
 protected function execute(Input $input, Output $output)
 {
 // 处理主任务
 $this->setQueueProgress('处理中...', 50);

 // 创建子任务(注意:需要创建新对象)
 $queue = QueueService::instance([], true); // 创建新任务服务
 $queue->register('子任务', 'child:task', 0, [], 0, 0);

 // 继续处理主任务
 $this->setQueueSuccess('完成');
 }
}

为什么需要创建新对象?

  • 避免替换当前任务对象
  • 确保主任务和子任务互不影响
  • 保持任务数据的独立性

📊 获取任务数据

在任务执行过程中,可以获取当前任务的数据:

class MyTask extends Command
{
 protected function execute(Input $input, Output $output)
 {
 // 获取任务编号
 $code = $this->queue->code;
 // 输出:Q20241110123456

 // 获取任务参数
 $data = $this->queue->data;
 // 输出:['type' => 'all', 'ids' => [1,2,3]]

 // 获取任务记录
 $record = $this->queue->record;
 // 输出:完整的任务记录对象

 // 使用数据
 $type = $data['type'];
 $ids = $data['ids'];
 }
}

🎨 两种任务类型

ThinkAdmin 支持两种任务类型:

类型1:Command 指令(推荐)

继承 think\admin\Command 类,使用 ThinkPHP 的命令机制:

class MemberSync extends Command
{
 protected function configure()
 {
 $this->setName('member:sync')
 ->setDescription('同步会员数据');
 }

 protected function execute(Input $input, Output $output)
 {
 // 处理逻辑
 }
}

类型2:Queue 类

继承 think\admin\Queue 类,直接实现任务逻辑:

class MemberSyncQueue extends Queue
{
 public function execute(array $data)
 {
 // 处理逻辑
 }
}

创建任务时:

// 使用 Command 指令
$this->_queue('同步会员', 'member:sync', 0, []);

// 使用 Queue 类
$this->_queue('同步会员', 'app\store\queue\MemberSyncQueue', 0, []);

📊 任务进度

ThinkAdmin 提供了完整的任务进度跟踪功能,前端可以实时显示任务执行进度。

🎯 任务编号

创建任务后,系统会自动生成任务编号:

// 创建任务
$this->_queue('同步数据', 'data:sync', 0, []);

// 返回的任务编号格式:Q + 时间戳 + 随机数
// 示例:Q20241110123456

任务编号格式:

  • 以字母 Q 开头
  • 后面是时间戳和随机数
  • 用于唯一标识任务

📡 前端显示进度

前端可以使用 $.loadQueue() 方法显示任务进度:

<!-- 方式1:使用 data-queue 属性(推荐) -->
<button data-queue="{:url('sync')}" class="layui-btn">
 同步数据
</button>

<!-- 方式2:手动调用 -->
<button onclick="startSync()" class="layui-btn">
 同步数据
</button>

<script>
function startSync() {
 // 1. 调用后端接口创建任务
 $.post('{:url("sync")}', {}, function(result) {
 if (result.code === 1) {
 // 2. 获取任务编号
 var queueCode = result.data;

 // 3. 显示进度弹窗
 $.loadQueue(queueCode);
 }
 });
}
</script>

进度弹窗功能:

  • 实时显示任务进度(百分比)
  • 显示任务状态信息
  • 任务完成后自动关闭
  • 任务失败时显示错误信息

📈 后端更新进度

在任务执行过程中,可以更新任务进度:

class DataSync extends Command
{
 protected function execute(Input $input, Output $output)
 {
 $total = 1000;
 $count = 0;

 // 处理数据
 foreach ($data as $item) {
 $count++;

 // 更新进度(百分比)
 $progress = intval($count / $total * 100);
 $this->setQueueProgress(
 "处理中:{$count}/{$total}", // 提示信息
 $progress // 进度百分比(0-100)
 );

 // 处理业务逻辑
 processItem($item);
 }

 // 设置任务完成
 $this->setQueueSuccess("完成 {$count} 条数据处理");
 }
}

进度更新方法:

// 更新进度(继续执行)
$this->setQueueProgress('处理中...', 50); // 50% 完成

// 设置任务成功(结束执行)
$this->setQueueSuccess('任务完成');

// 设置任务失败(结束执行)
$this->setQueueError('任务失败:错误信息');

参数说明:

  • setQueueProgress($message, $progress): 更新进度,任务继续执行
    • $message: 提示信息(字符串)
    • $progress: 进度百分比(0-100 的整数)
  • setQueueSuccess($message): 设置任务成功,任务结束执行
    • $message: 成功提示信息(可选)
  • setQueueError($message): 设置任务失败,任务结束执行
    • $message: 错误提示信息(必填)

🔄 完整示例

前端代码:

<!-- 触发任务的按钮 -->
<button data-queue="{:url('member/sync')}" class="layui-btn">
 <i class="layui-icon layui-icon-refresh"></i> 同步会员数据
</button>

后端代码:

<?php
namespace app\admin\controller;

use think\admin\Controller;

class Member extends Controller
{
 public function sync()
 {
 // 创建任务,自动返回任务编号
 $this->_queue('同步会员数据', 'member:sync', 0, []);
 }
}

任务代码:

<?php
namespace app\store\command;

use think\admin\Command;

class MemberSync extends Command
{
 protected function configure()
 {
 $this->setName('member:sync')
 ->setDescription('同步会员数据');
 }

 protected function execute(Input $input, Output $output)
 {
 $total = 1000;
 $count = 0;

 // 分批处理
 Db::name('StoreMember')->chunk(100, function($members) use (&$count, $total) {
 foreach ($members as $member) {
 $count++;

 // 更新进度
 $progress = intval($count / $total * 100);
 $this->setQueueProgress(
 "处理会员:{$member['id']} ({$count}/{$total})",
 $progress
 );

 // 处理业务逻辑
 MemberService::sync($member['id']);
 }
 });

 // 任务完成
 $this->setQueueSuccess("完成 {$count} 个会员的同步");
 }
}

执行效果:

  1. 用户点击按钮
  2. 前端自动调用后端接口
  3. 后端创建任务并返回任务编号
  4. 前端显示进度弹窗
  5. 后端任务执行,实时更新进度
  6. 任务完成后,前端显示完成提示

📝 任务案例

前端代码(属性 data-queue 触发 $.loadQueue(Code) 展示任务进度)

<button data-queue="{:url('sync')}" class='layui-btn layui-btn-sm layui-btn-primary'>
 <i class="layui-icon layui-icon-refresh"></i> 刷新会员数据
</button>

说明: 点击按钮后,会自动调用后端接口创建任务,并显示任务进度弹窗。

后端代码(调用$this->_queue()会返回响应对象,响应对象data默认为任务编号)

<?php
declare(strict_types=1);

namespace app\admin\controller;

use think\admin\Controller;

/**
 * 会员管理
 * @class Member
 * @package app\admin\controller
 */
class Member extends Controller
{
 /**
 * 刷新会员数据
 * @auth true
 */
 public function sync()
 {
 // 创建异步任务
 // 参数1: 任务标题
 // 参数2: 执行命令(指令名称)
 // 参数3: 延时时间(秒)
 // 参数4: 任务数据(数组)
 // 参数5: 允许重复创建(0=不允许,1=允许)
 $this->_queue('重新计算所有会员级别', "xsync:member", 1, [], 0);
 }
}

对应指令(指令需要注册,通过 sys.php 注册或 Service.php配置,执行 php think 可查看是否已生效)

<?php
// app/admin/sys.php 动态注册操作指令
use think\Console;

\think\Console::starting(function (Console $console) {
 $console->addCommand(\app\store\command\MemberRun::class);
});

在任务处理时可以使用 $this->queue 获取到数据参数(实际对象为 QueueService ,具体可以查阅对象代码)

<?php
declare(strict_types=1);

namespace app\store\command;

use think\admin\Command;
use think\admin\Command\Input;
use think\admin\Command\Output;
use think\Collection;

/**
 * 重新计算会员数据
 * @class MemberRun
 * @package app\store\command
 */
class MemberRun extends Command
{
 /**
 * 配置命令
 */
 protected function configure()
 {
 $this->setName('xsync:member')
 ->setDescription('[ 商城 - 不需执行 ] 重新计算所有会员的等级');
 }

 /**
 * 执行命令
 * @param Input $input
 * @param Output $output
 * @throws \think\admin\Exception
 */
 protected function execute(Input $input, Output $output)
 {
 // 获取总数
 $total = $this->app->db->name('StoreMember')->count();
 $count = 0;

 // 分批处理数据
 $this->app->db->name('StoreMember')->chunk(100, function (Collection $data) use (&$count, $total) {
 foreach ($data->toArray() as $vo) {
 $count++;
 // 同步会员等级
 $state = MemberService::instance()->syncLevel($vo['id']) ? '成功' : '失败';

 // 更新任务进度(百分比)
 $this->setQueueProgress(
 "调整会员 {$vo['id']} {$vo['phone']} {$vo['nickname']} 级别{$state}", 
 intval($count / $total * 100)
 );
 }
 });

 // 设置任务完成
 $this->setQueueSuccess("重新计算 {$count} 个会员的级别!");
 }
}

📖 Command 类详解

继承要求:

  • 命令类必须继承 think\admin\Command
  • 需要实现 configure() 方法配置命令
  • 需要实现 execute() 方法执行命令逻辑

常用方法:

// 1. 配置命令名称和描述
protected function configure()
{
 $this->setName('xsync:member')
 ->setDescription('重新计算所有会员的等级');
}

// 2. 获取命令参数
$openid = $input->getArgument('openid');
$code = $input->getArgument('autocode');

// 3. 设置任务进度(百分比,0-100)
$this->setQueueProgress(string $message, int $progress = 0);

// 4. 设置任务成功
$this->setQueueSuccess(string $message = '');

// 5. 设置任务失败
$this->setQueueError(string $message = '');

// 6. 获取当前任务数据(在任务中)
$this->queue->code; // 任务编号
$this->queue->data; // 任务参数
$this->queue->record; // 任务记录

完整示例:

<?php
declare(strict_types=1);

namespace app\wechat\command;

use think\admin\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;

class Auto extends Command
{
 private $openid;

 protected function configure()
 {
 $this->setName('xadmin:fansmsg');
 $this->addArgument('openid', Argument::OPTIONAL, 'wechat user openid', '');
 $this->addArgument('autocode', Argument::OPTIONAL, 'wechat auto message', '');
 $this->setDescription('Wechat Users Push AutoMessage for ThinkAdmin');
 }

 protected function execute(Input $input, Output $output)
 {
 $code = $input->getArgument('autocode');
 $this->openid = $input->getArgument('openid');

 // 参数验证
 if (empty($code)) {
 $this->setQueueError('Message Code cannot be empty');
 return;
 }
 if (empty($this->openid)) {
 $this->setQueueError('Wechat Openid cannot be empty');
 return;
 }

 // 查询数据
 $map = ['code' => $code, 'status' => 1];
 $data = WechatAuto::mk()->where($map)->find();
 if (empty($data)) {
 $this->setQueueError('Message Data Query failed');
 return;
 }

 // 处理业务逻辑
 $this->buildMessage($data->toArray());
 }

 private function buildMessage(array $data)
 {
 // 处理消息逻辑
 $result = $this->sendMessage('text', ['content' => $data['content']]);

 if (empty($result[0])) {
 $this->setQueueError($result[1]);
 } else {
 $this->setQueueSuccess($result[1]);
 }
 }
}

注意事项:

  • 命令类需要在 Service 类的 register() 方法中注册
  • 使用 setQueueError()setQueueSuccess() 会结束任务执行
  • 任务进度使用百分比(0-100),用于前端显示
  • 可以通过 $this->queue 访问当前任务数据

注册命令:

<?php
// app/admin/Service.php
namespace app\admin;

use think\admin\Plugin;

class Service extends Plugin
{
 public function register(): void
 {
 // 注册命令
 $this->commands([
 \app\store\command\MemberSync::class,
 ]);
 }
}

验证命令是否注册:

# 执行以下命令,查看是否包含你的命令
php think

# 输出示例:
# member:sync 同步会员数据

使用 Queue 类处理任务

除了使用 Command 指令,还可以直接继承 Queue 类:

<?php
declare(strict_types=1);

namespace app\store\queue;

use think\admin\Queue;

/**
 * 会员等级同步任务
 * @class MemberSyncQueue
 * @package app\store\queue
 */
class MemberSyncQueue extends Queue
{
 /**
 * 执行任务
 * @param array $data 任务数据
 * @throws \think\admin\Exception
 */
 public function execute(array $data)
 {
 $total = $this->app->db->name('StoreMember')->count();
 $count = 0;

 $this->app->db->name('StoreMember')->chunk(100, function ($members) use (&$count, $total) {
 foreach ($members as $member) {
 $count++;
 MemberService::instance()->syncLevel($member['id']);

 // 更新进度
 $this->setQueueProgress(
 "处理会员 {$member['id']}", 
 intval($count / $total * 100)
 );
 }
 });

 $this->setQueueSuccess("完成 {$count} 个会员的等级同步");
 }
}

创建任务时使用类名:

// 在控制器中
$this->_queue('同步会员等级', 'app\store\queue\MemberSyncQueue', 0, ['type' => 'level']);

❓ 常见问题

❓ 任务不执行

可能原因:

  1. 监听进程未启动
  2. 命令未注册
  3. PHP 函数被禁用

解决方法:

# 1. 检查监听进程是否运行
php think xadmin:queue status

# 2. 启动监听进程
php think xadmin:queue start

# 3. 检查命令是否注册
php think
# 查看是否包含你的命令

# 4. 检查 PHP 函数是否启用
php -m | grep exec
# 或
php -r "echo function_exists('exec') ? 'OK' : 'FAIL';"

❓ 任务失败

可能原因:

  1. 命令类不存在
  2. 命令类语法错误
  3. 数据库连接失败

解决方法:

// 1. 检查命令类是否存在
// 确保类文件路径正确
// 确保命名空间正确

// 2. 检查命令类语法
// 确保继承正确的类
// 确保实现了必要的方法

// 3. 查看任务日志
// 在任务中使用 try-catch 捕获异常
try {
 // 任务逻辑
} catch (\Exception $e) {
 $this->setQueueError('任务失败:' . $e->getMessage());
}

❓ 查看日志

方法1:查看数据库

-- 查看任务列表
SELECT * FROM system_queue ORDER BY create_at DESC LIMIT 10;

-- 查看失败的任务
SELECT * FROM system_queue WHERE status = 4;

方法2:查看系统日志

# 查看 ThinkAdmin 日志
tail -f runtime/log/think.log

# 查看 PHP 错误日志
tail -f /var/log/php_errors.log

❓ 停止任务

方法1:使用命令停止

# 停止所有任务
php think xadmin:queue stop

方法2:手动停止

# 查询正在执行的任务
php think xadmin:queue query

# 手动杀死进程(Linux)
kill -9 <进程ID>

❓ 执行超时

优化建议:

  1. 分批处理:将大任务拆分成多个小任务
// ❌ 不推荐:一次性处理所有数据
foreach ($allData as $item) {
 processItem($item);
}

// ✅ 推荐:分批处理
Db::name('Table')->chunk(100, function($data) {
 foreach ($data as $item) {
 processItem($item);
 }
});
  1. 设置超时时间:在任务中设置最大执行时间
$startTime = time();
$maxTime = 300; // 最大执行 5 分钟

foreach ($data as $item) {
 if (time() - $startTime > $maxTime) {
 $this->setQueueError('任务执行超时');
 return;
 }
 processItem($item);
}
  1. 拆分任务:将大任务拆分成多个小任务
// 创建多个小任务
foreach ($batches as $batch) {
 $this->_queue("处理批次-{$batch['id']}", 'data:process', 0, [
 'batch_id' => $batch['id']
 ]);
}

❓ 任务重试

方法1:在任务中实现重试逻辑

class RetryTask extends Command
{
 protected function execute(Input $input, Output $output)
 {
 $maxRetries = 3;
 $retry = 0;

 while ($retry < $maxRetries) {
 try {
 // 执行任务
 $this->doWork();
 $this->setQueueSuccess('完成');
 return;
 } catch (\Exception $e) {
 $retry++;
 if ($retry >= $maxRetries) {
 $this->setQueueError('重试失败:' . $e->getMessage());
 return;
 }
 sleep(5); // 等待 5 秒后重试
 }
 }
 }
}

方法2:创建新任务重试

class TaskWithRetry extends Command
{
 protected function execute(Input $input, Output $output)
 {
 try {
 // 执行任务
 $this->doWork();
 $this->setQueueSuccess('完成');
 } catch (\Exception $e) {
 // 失败后创建新任务重试
 $retryCount = $this->queue->data['retry'] ?? 0;
 if ($retryCount < 3) {
 $queue = QueueService::instance([], true);
 $queue->register(
 '重试任务',
 'task:retry',
 60, // 60 秒后重试
 ['retry' => $retryCount + 1],
 0,
 0
 );
 } else {
 $this->setQueueError('重试次数超限');
 }
 }
 }
}
最近更新:
Contributors: 邹景立, Anyon

AltStyle によって変換されたページ (->オリジナル) /