Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

一个纯Python实现的异步任务管理器,支持线程池和动态伸缩。

License

Notifications You must be signed in to change notification settings

fishzjp/FishAsyncTask

Repository files navigation

FishAsyncTask

一个纯Python实现的异步任务管理器,支持线程池和动态伸缩。

GitHub License Python

目录

特性

  • 🚀 纯Python实现:无需额外依赖,使用标准库实现
  • 🔄 动态伸缩:根据任务队列大小自动调整工作线程数量
  • 📊 任务状态追踪:实时查询任务执行状态和结果
  • 🧹 自动清理:自动清理过期的任务状态记录
  • 🔒 线程安全:使用锁机制保证线程安全
  • 🎯 单例模式:支持多实例管理,不同业务模块可使用独立实例
  • ⏱️ 任务超时:支持配置任务执行超时时间
  • 🚦 队列控制:支持阻塞和非阻塞两种任务提交模式

安装

从 PyPI 安装(推荐)

pip install fish-async-task

从 GitHub 安装

pip install git+https://github.com/fishzjp/FishAsyncTask.git

开发模式安装

git clone https://github.com/fishzjp/FishAsyncTask.git
cd FishAsyncTask
pip install -e ".[dev,performance]"
pre-commit install

开发依赖包括:

  • pytestpytest-covpytest-benchmark - 测试框架
  • blackisort - 代码格式化
  • mypyinterrogate - 代码质量检查
  • locust - 负载测试
  • psutilredishueydramatiq - 性能测试依赖

📖 详细的安装说明请参考 安装文档

快速开始

基本使用

from fish_async_task import TaskManager
import time
# 创建任务管理器实例(单例模式)
task_manager = TaskManager()
# 定义一个任务函数
def my_task(name: str, value: int):
 print(f"执行任务: {name}, 值: {value}")
 time.sleep(1) # 模拟耗时操作
 return f"任务完成: {name}"
# 提交任务(非阻塞模式)
task_id = task_manager.submit_task(my_task, "任务1", value=100)
print(f"任务ID: {task_id}")
# 等待任务完成并查询状态
while True:
 status = task_manager.get_task_status(task_id)
 if status:
 if status["status"] == "completed":
 print(f"任务完成,结果: {status.get('result')}")
 break
 elif status["status"] == "failed":
 print(f"任务失败: {status.get('error')}")
 break
 time.sleep(0.1)
# 关闭任务管理器
task_manager.shutdown()

阻塞模式提交任务

当队列已满时,可以使用阻塞模式等待队列有空间:

# 阻塞模式提交任务(等待队列有空间)
task_id = task_manager.submit_task(
 my_task, 
 "任务2", 
 value=200,
 block=True, # 启用阻塞模式
 timeout=10.0 # 最多等待10秒
)

多实例管理

不同业务模块可以使用独立的任务管理器实例:

# 默认实例
default_manager = TaskManager()
# 订单模块的独立实例
order_manager = TaskManager(instance_key="order")
# 支付模块的独立实例
payment_manager = TaskManager(instance_key="payment")

文档

详细的文档请参考以下链接:

使用场景

  • 📦 后台任务处理:异步处理耗时操作,不阻塞主流程
  • 🔄 批量数据处理:并发处理大量数据,提高处理效率
  • 📧 消息队列:作为轻量级消息队列使用
  • 🎯 任务调度:配合定时任务实现任务调度
  • 🔌 API异步处理:Web API中异步处理请求

许可证

MIT License

贡献

欢迎提交Issue和Pull Request!

About

一个纯Python实现的异步任务管理器,支持线程池和动态伸缩。

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

Languages

AltStyle によって変換されたページ (->オリジナル) /