分享
  1. 首页
  2. 文章

并发任务调度器(Concurrent Task Scheduler)

SuperStupid1 · · 386 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 🧩 编程练习题:并发任务调度器(Concurrent Task Scheduler) ### 📌 题目描述: 编写一个并发任务调度器程序,它能够: 1. 接收一组任务(函数)。 2. 并发执行这些任务。 3. 支持超时控制和提前取消。 4. 等待所有任务完成或其中一个任务出错后立即返回。 5. 使用 `sync.WaitGroup` 控制并发流程。 6. 使用 `channel` 在任务之间通信结果。 7. 使用 `context.Context` 实现任务取消和超时机制。 --- ### ✨ 具体要求: #### 函数签名如下: ```go func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error ``` - `ctx`: 上下文,用于外部主动取消任务。 - `tasks`: 一组要执行的任务,每个任务是一个返回 `error` 的函数。 - `concurrency`: 最大并发数(即最多同时运行多少个任务)。 - `timeout`: 超时时间,一旦超过该时间还未完成,则取消所有任务。 #### 功能要求: - 使用带缓冲的 channel 控制并发数量(类似工作池模式)。 - 每个任务执行后通过 channel 返回错误信息。 - 如果任意一个任务返回错误,应取消所有正在运行的任务。 - 如果超时或上下文被取消,也要取消所有任务。 - 所有 goroutine 必须优雅退出,不能泄露。 --- ### 🧪 代码示例: ```go package main import ( "context" "errors" "fmt" "sync" "time" ) // RunTasks 并发执行一组任务,并支持取消、超时和错误快速返回 // 参数: // // ctx: 父上下文,用于控制整个任务的执行生命周期 // tasks: 需要执行的任务函数切片 // concurrency: 最大并发数限制 // timeout: 整个任务组的超时时间 // // 返回值: // // error: 第一个遇到的错误或超时错误 func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error { // 检查任务列表是否为空 if len(tasks) == 0 { return nil } // 创建带超时的子上下文,timeout后会自动取消 ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) defer cancel() // 确保在函数退出时取消上下文,释放资源 // 创建可手动取消的子上下文,用于任务级别的控制 taskCtx, taskCancel := context.WithCancel(ctxWithTimeout) defer taskCancel() // 信号量模式控制最大并发数 semaphore := make(chan struct{}, concurrency) // 错误通道,缓冲为1只需要第一个错误 errChan := make(chan error, 1) var wg sync.WaitGroup // 用于等待所有任务完成 // 遍历所有任务 for _, task := range tasks { // 检查上下文是否已取消或超时 select { case <-taskCtx.Done(): return taskCtx.Err() // 直接返回取消原因 default: } wg.Add(1) // 增加等待计数器 semaphore <- struct{}{} // 获取一个并发槽位(如果已满会阻塞) // 启动goroutine执行任务 go func(tsk func() error) { defer func() { <-semaphore // 释放并发槽位 wg.Done() // 标记任务完成 }() // 执行实际任务 if err := tsk(); err != nil { select { case errChan <- err: // 尝试发送错误到通道 taskCancel() // 取消所有任务 default: // 如果错误通道已满(已有错误),则忽略后续错误 } } }(task) } // 启动goroutine等待所有任务完成 go func() { wg.Wait() // 等待所有任务完成 close(errChan) // 关闭错误通道 }() // 等待第一个错误或上下文取消 select { case err := <-errChan: // 有任务出错 return err case <-taskCtx.Done(): // 超时或被取消 return taskCtx.Err() } } func main() { // 定义测试任务组 tasks := []func() error{ func() error { time.Sleep(1 * time.Second) return nil }, func() error { time.Sleep(2 * time.Second) return errors.New("task failed") // 模拟任务失败 }, func() error { time.Sleep(3 * time.Second) return nil }, } // 执行任务组: 最大并发2,超时5秒 err := RunTasks(context.Background(), tasks, 2, 5*time.Second) if err != nil { fmt.Println("Error:", err) // 输出: Error: task failed } } ``` --- ### 💡 提示: - 使用 `context.WithCancel` 或 `context.WithTimeout` 创建子上下文来传递取消信号。 - 使用 `sync.WaitGroup` 来等待所有任务启动完毕。 - 使用带缓冲的 channel 控制并发数量。 - 注意避免 goroutine 泄漏,确保在取消时所有协程都能退出。 --- 🧠 整体逻辑目标回顾 这段代码的目标是: 并发执行一组任务函数 支持 最大并发数限制 超时自动取消 遇到错误立即返回并取消所有任务 使用标准库组件:context, channel, sync 📌 函数签名解析 func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error { 参数名 类型 含义 ctx context.Context 父上下文,用于控制整个任务组生命周期 tasks []func() error 要并发执行的任务列表 concurrency int 最大同时运行的任务数量 timeout time.Duration 整个任务组的总超时时间 返回值为第一个遇到的错误或上下文取消原因。 ✅ 第一步:检查空任务 if len(tasks) == 0 { return nil } 如果没有任务要执行,直接返回。 防止后续不必要的操作。 🔁 创建两个嵌套的 context ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) defer cancel() taskCtx, taskCancel := context.WithCancel(ctxWithTimeout) defer taskCancel() 🎯 目标: 创建一个组合上下文,支持两种取消方式: 外部主动取消(来自传入的 ctx) 内部主动取消(由某个任务出错触发) 超时自动取消 🧩 嵌套关系图解: 原始 ctx ↓ ctxWithTimeout (加了超时) ↓ taskCtx (加了手动取消) ctxWithTimeout 是带超时的子上下文。 taskCtx 是基于 ctxWithTimeout 的可手动取消上下文。 所有任务都监听 taskCtx.Done() 来响应取消信号。 🚦 并发控制:使用 channel 实现"信号量"机制 semaphore := make(chan struct{}, concurrency) 这是一个 缓冲 channel,大小为 concurrency。 每次启动新任务前尝试往里面发送空结构体 struct{}。 如果已满(达到并发上限),就会阻塞,直到有其他 goroutine 释放槽位。 ✅ 这是一种经典的 goroutine 并发控制模式,也叫"工作池"。 📡 错误通信:用 channel 传递第一个错误 errChan := make(chan error, 1) 缓冲为 1 的 channel,只保留第一个出现的错误。 其他任务即使也出错,也不会再写入(避免死锁)。 🧱 WaitGroup 控制任务完成状态 var wg sync.WaitGroup 用来等待所有任务执行完毕。 每个任务开始时调用 wg.Add(1),结束时调用 wg.Done()。 最后通过 wg.Wait() 判断是否全部完成。 🔁 遍历任务,启动 goroutine for _, task := range tasks { select { case <-taskCtx.Done(): return taskCtx.Err() default: } wg.Add(1) semaphore <- struct{}{} go func(tsk func() error) { defer func() { <-semaphore wg.Done() }() if err := tsk(); err != nil { select { case errChan <- err: taskCancel() default: } } }(task) } 🔍 关键步骤详解: 1️⃣ 检查是否已取消/超时 select { case <-taskCtx.Done(): return taskCtx.Err() default: } 在每次添加新任务前,先检查当前上下文是否已被取消。 如果已经被取消,就不再继续添加任务,直接返回错误。 2️⃣ 占用并发槽位 semaphore <- struct{}{} 成功发送表示获得了执行权限。 如果槽位满了,会阻塞,直到其他任务释放。 3️⃣ 启动 goroutine 执行任务 go func(tsk func() error) { ... }(task) 把当前任务包装成闭包传入 goroutine 中执行。 4️⃣ defer 释放资源 defer func() { <-semaphore wg.Done() }() defer 保证无论任务正常还是异常结束,都会释放并发槽位和减少计数器。 wg.Done() 表示这个任务已完成。 5️⃣ 执行任务 & 错误处理 if err := tsk(); err != nil { select { case errChan <- err: taskCancel() default: } } 如果任务返回错误: 尝试将错误写入 errChan(非阻塞) 成功写入后调用 taskCancel() 取消所有任务 后续错误被忽略(因为 errChan 已满) 🧾 等待所有任务完成,并关闭 errChan go func() { wg.Wait() close(errChan) }() 启动一个新的 goroutine 等待所有任务完成。 完成后关闭 errChan,防止主 goroutine 死锁。 ⏳ 主 goroutine 等待结果 select { case err := <-errChan: return err case <-taskCtx.Done(): return taskCtx.Err() } 选择最先发生的事件: 如果有任务出错,则从 errChan 收到错误,立即返回。 如果上下文被取消(可能是超时或手动取消),则返回取消原因。 🧪 示例 main 函数说明 func main() { tasks := []func() error{ func() error { time.Sleep(1 * time.Second) return nil }, func() error { time.Sleep(2 * time.Second) return errors.New("task failed") // 模拟任务失败 }, func() error { time.Sleep(3 * time.Second) return nil }, } err := RunTasks(context.Background(), tasks, 2, 5*time.Second) if err != nil { fmt.Println("Error:", err) } } 三个任务中,第二个最快失败。 主函数输出: Error: task failed 🧠 总结:各个组件的作用 组件 作用 context 控制任务生命周期,实现取消与超时 channel 用于协程间通信,传递错误信息 WaitGroup 等待所有任务完成,确保优雅退出 goroutine 并发执行任务 buffered channel 控制最大并发数量(信号量模式) 📌 常见问题解答(FAQ) ❓ 为什么要用 taskCtx, taskCancel := context.WithCancel(ctxWithTimeout)? 这是为了在 超时之外还能手动取消任务。例如,如果某个任务出错,我们希望立刻取消所有正在运行的任务。 ❓ 为什么 errChan 的缓冲是 1? 因为我们只需要第一个错误即可,后续错误可以忽略。缓冲为 1 可以防止 goroutine 因为无法写入而卡住。 ❓ 为什么 semaphore 是带缓冲的 channel? 这模拟了"信号量"的行为,控制最多有多少个 goroutine 同时执行任务。 ❓ 为什么 wg.Wait() 要放在另一个 goroutine 中? 因为主 goroutine 需要监听 errChan 和 taskCtx.Done(),不能自己阻塞等待所有任务完成。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
386 次点击 ∙ 1 赞
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏