Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Sunk011/MultiTaskFlow

Repository files navigation

MultiTaskFlow 多任务流管理工具

MultiTaskFlow 是一个轻量级的多任务流管理工具,用于按顺序执行和监控一系列任务。它可以帮助您管理数据处理、模型训练、评估等一系列需要顺序执行的任务,并提供实时状态更新和执行结果跟踪。

功能特点

  • 基于YAML配置文件定义任务流
  • 支持Python脚本和Shell命令的执行
  • 提供任务状态实时监控
  • 自动执行失败任务的重试逻辑
  • 支持任务之间的依赖关系
  • 完整的日志记录和任务执行历史
  • 进程PID跟踪与管理
  • 优雅的信号处理和任务终止
  • 支持静默模式,可跳过消息通知

安装方法

要求

  • Python 3.7+
  • PyYAML
  • 其他依赖库(如有)

配置消息推送令牌和静默模式

消息推送令牌

在使用消息推送功能前,需要配置 MSG_PUSH_TOKEN 环境变量。以下是配置方法:

1. 永久配置(推荐)

~/.bashrc~/.zshrc 文件中添加:

# MultiTaskFlow 消息推送配置
export MSG_PUSH_TOKEN="your_pushplus_token_here"

然后重新加载配置:

source ~/.bashrc # 或 source ~/.zshrc
2. 临时配置

在运行命令前设置:

MSG_PUSH_TOKEN=your_token python your_script.py
3. 开发模式配置

在项目根目录创建 .env 文件:

echo "MSG_PUSH_TOKEN=your_token" > .env

静默模式配置

如果您不希望收到消息通知,可以启用静默模式:

1. 永久配置静默模式

~/.bashrc~/.zshrc 文件中添加:

# MultiTaskFlow 静默模式配置
export MTF_SILENT_MODE=true

然后重新加载配置:

source ~/.bashrc # 或 source ~/.zshrc
2. 临时配置静默模式

在运行命令前设置:

MTF_SILENT_MODE=true taskflow your_tasks.yaml
3. 开发模式配置静默模式

在项目根目录创建或编辑 .env 文件:

echo "MTF_SILENT_MODE=true" >> .env

获取 Token

  1. 访问 PushPlus 官网
  2. 注册并登录
  3. 在个人中心获取您的 token

(方法1)从PyPI安装

# 使用pip直接安装
pip install multitaskflow

(方法2)从源码安装

# 克隆仓库
git clone https://github.com/Polaris-F/MultiTaskFlow.git
cd MultiTaskFlow
# 方法1: 使用pip直接安装
pip install .
# 方法2: 开发模式安装
pip install -e .

(方法3)构建离线包方法

如果您想构建wheel包或源码分发包,可以使用以下命令:

# 安装构建工具
pip install build
# 构建分发包
python -m build
# 构建的包会在dist/目录下生成

使用方法

如果需要使用 消息接收功能,请访问 https://www.pushplus.plus/ 获取您的token

1. 创建任务配置文件

创建一个YAML格式的任务配置文件,定义您要执行的任务序列:

# tasks.yaml 示例
- name: "任务1-数据准备"
 command: "python scripts/prepare_data.py --input data/raw --output data/processed"
 status: "pending"
- name: "任务2-模型训练"
 command: "python scripts/train_model.py --data data/processed --epochs 10"
 status: "pending"
- name: "任务3-结果评估"
 command: "python scripts/evaluate.py --model-path models/latest.pt"
 status: "pending"

2. (方法一)使用Python API (推荐使用方法二、三)

在您的Python代码中使用MultiTaskFlow:

from multitaskflow import TaskFlow
# 创建任务流管理器
task_manager = TaskFlow("path/to/your/tasks.yaml")
# 启动任务执行
task_manager.run()
# 您也可以动态添加任务
task_manager.add_task_by_config(
 name="额外任务", 
 command="echo '这是一个动态添加的任务'"
)

2. (方法二)使用命令行工具【使用场景:不需要后台运行,可实时查看输出】

安装后,您可以直接使用taskflow命令行工具:

# 使用配置文件运行任务流
taskflow path/to/your/tasks.yaml
# 使用默认配置
# 如果不提供配置文件路径,将在examples/tasks.yaml创建示例配置
taskflow
# 查看帮助
taskflow --help

2. (方法三)使用sh脚本工具【使用场景:需要后台运行,通过log查看输出】

首先taskflowPro.sh修改脚本中 TASK_CONFIG为任务流yaml路径

chmod +x taskflowPro.sh
./taskflowPro.sh start # 开始运行
./taskflowPro.sh stop # 结束运行

效果展示

您可以运行我们提供的演示脚本,查看任务管理和消息接收的实际效果。演示脚本模拟了一个完整的深度学习工作流,包括数据预处理、模型训练、模型评估和数据归档等步骤。

运行演示脚本

# 安装完成后,直接运行示例脚本
python -m multitaskflow.examples.demo
# 或使用命令行工具
taskflow examples/tasks.yaml

演示内容

演示脚本将依次执行以下任务:

  1. 数据预处理 - 模拟数据集加载、清洗和处理过程
  2. 模型训练-阶段1 - 模拟第一阶段模型训练过程
  3. 模型评估-阶段1 - 模拟对第一阶段训练模型的评估
  4. 模型训练-阶段2 - 模拟基于第一阶段模型继续训练
  5. 模型评估-阶段2 - 模拟对第二阶段训练模型的评估
  6. 数据归档 - 模拟模型和结果数据的归档过程

每个任务都会显示详细的执行进度和模拟输出,让您直观了解MultiTaskFlow的任务管理能力。所有演示任务都是模拟执行,不会创建实际文件或占用大量资源。

期望效果

运行示例后,您将看到:

  • 任务管理器启动和初始化过程
  • 任务状态的实时更新(等待中→执行中→完成/失败)
  • 每个任务的详细输出和进度信息
  • 任务完成后的状态汇总

通过观察演示效果,您可以了解MultiTaskFlow如何帮助管理复杂的多步骤工作流程,以及它如何提供清晰的任务执行状态和结果反馈。

运行效果截图

任务管理和执行效果

实际运行时在控制台中会看到详细的输出,显示任务状态和进度信息

高级功能(TODO)

任务配置选项

任务配置文件支持以下选项:

- name: "示例任务"
 command: "python script.py"
 status: "pending" # pending, running, completed, failed
 retry: 3 # 失败后重试次数 (TODO)
 timeout: 3600 # 任务超时时间(秒)(TODO)
 depends_on: ["前置任务名称"] # 依赖的任务 (TODO)

静默模式

MultiTaskFlow 支持静默模式,在此模式下不会发送任何消息通知。这对于以下场景非常有用:

  • 生产环境部署:在生产环境中运行时,可能不需要消息通知
  • 调试阶段:开发和调试过程中避免频繁接收通知
  • 批量任务:执行大量批处理任务时,只关注最终结果而非每个任务
  • CI/CD 流程:在自动化构建流水线中使用,避免触发过多通知

启用静默模式

静默模式通过环境变量 MTF_SILENT_MODE 控制:

# 启用静默模式
export MTF_SILENT_MODE=true
# 临时启用
MTF_SILENT_MODE=true taskflow tasks.yaml

支持的值:

  • 设为 true, 1, yes, on 表示启用静默模式
  • 不设置或设为其他值表示禁用静默模式

静默模式的工作原理

当启用静默模式时:

  1. 所有任务执行完成后不会发送消息通知
  2. 任务管理器完成时不会发送总结报告
  3. 所有操作和结果仍会记录在日志文件中
  4. 控制台输出不受影响,仍然会显示所有信息

注意,静默模式只影响消息通知行为,不会改变任务的实际执行过程。

自定义通知

您可以配置系统在任务状态变更时发送通知:

from multitaskflow import TaskFlow, Msg_push
# 创建消息推送实例
notifier = Msg_push(
 webhook_url="your_webhook_url",
 channel="your_channel"
)
# 创建带通知功能的任务流管理器
task_manager = TaskFlow(
 "tasks.yaml",
 msg_push=notifier
)

自定义与扩展

MultiTaskFlow设计为可扩展的,您可以:

  • 自定义任务状态处理逻辑
  • 添加新的任务类型
  • 扩展监控和报告功能

自定义任务处理器示例

from multitaskflow import TaskFlow
class CustomTaskFlow(TaskFlow):
 def process_task_output(self, task, output):
 # 自定义输出处理逻辑
 print(f"处理任务 {task.name} 的输出: {output}")
 # 继续处理...
 super().process_task_output(task, output)

常见问题(FAQ)

Q: XXXX?

贡献指南

欢迎贡献代码、报告问题或提出新功能建议!

  1. Fork 这个仓库
  2. 创建您的特性分支 (git checkout -b feature/amazing-feature)
  3. 提交您的更改 (git commit -m 'Add some amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 打开一个 Pull Request

版本历史

  • 1.0.0 - 2024年03月15日
    • 首次发布
    • 基本任务管理功能
    • 命令行工具支持

许可证

本项目采用MIT许可证 - 详情请查看 LICENSE 文件

作者与致谢

  • 主要开发者: Polaris
  • 感谢所有贡献者和使用者的宝贵反馈

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 84.2%
  • Shell 15.8%

AltStyle によって変換されたページ (->オリジナル) /