一个纯Python实现的异步任务管理器,支持线程池和动态伸缩。
- 🚀 纯Python实现:无需额外依赖,使用标准库实现
- 🔄 动态伸缩:根据任务队列大小自动调整工作线程数量
- 📊 任务状态追踪:实时查询任务执行状态和结果
- 🧹 自动清理:自动清理过期的任务状态记录
- 🔒 线程安全:使用锁机制保证线程安全
- 🎯 单例模式:支持多实例管理,不同业务模块可使用独立实例
- ⏱️ 任务超时:支持配置任务执行超时时间
- 🚦 队列控制:支持阻塞和非阻塞两种任务提交模式
pip install fish-async-task
pip install git+https://github.com/fishzjp/FishAsyncTask.git
git clone https://github.com/fishzjp/FishAsyncTask.git cd FishAsyncTask pip install -e ".[dev,performance]" pre-commit install
开发依赖包括:
pytest、pytest-cov、pytest-benchmark- 测试框架black、isort- 代码格式化mypy、interrogate- 代码质量检查locust- 负载测试psutil、redis、huey、dramatiq- 性能测试依赖
📖 详细的安装说明请参考 安装文档
from fish_async_task import TaskManager import time # 创建任务管理器实例(单例模式) task_manager = TaskManager() # 定义一个任务函数 def my_task(name: str, value: int): print(f"执行任务: {name}, 值: {value}") time.sleep(1) # 模拟耗时操作 return f"任务完成: {name}" # 提交任务(非阻塞模式) task_id = task_manager.submit_task(my_task, "任务1", value=100) print(f"任务ID: {task_id}") # 等待任务完成并查询状态 while True: status = task_manager.get_task_status(task_id) if status: if status["status"] == "completed": print(f"任务完成,结果: {status.get('result')}") break elif status["status"] == "failed": print(f"任务失败: {status.get('error')}") break time.sleep(0.1) # 关闭任务管理器 task_manager.shutdown()
当队列已满时,可以使用阻塞模式等待队列有空间:
# 阻塞模式提交任务(等待队列有空间) task_id = task_manager.submit_task( my_task, "任务2", value=200, block=True, # 启用阻塞模式 timeout=10.0 # 最多等待10秒 )
不同业务模块可以使用独立的任务管理器实例:
# 默认实例 default_manager = TaskManager() # 订单模块的独立实例 order_manager = TaskManager(instance_key="order") # 支付模块的独立实例 payment_manager = TaskManager(instance_key="payment")
详细的文档请参考以下链接:
- 📖 安装文档 - 详细的安装说明和开发环境设置
- 📚 API 文档 - 完整的 API 参考
- ⚙️ 配置文档 - 环境变量和配置说明
- 💡 最佳实践 - 使用建议和注意事项
- ❓ 常见问题 - FAQ 和问题解答
- 📝 更新日志 - 版本更新记录
- 📦 后台任务处理:异步处理耗时操作,不阻塞主流程
- 🔄 批量数据处理:并发处理大量数据,提高处理效率
- 📧 消息队列:作为轻量级消息队列使用
- 🎯 任务调度:配合定时任务实现任务调度
- 🔌 API异步处理:Web API中异步处理请求
MIT License
欢迎提交Issue和Pull Request!