分享
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。
## 🧩 编程练习题:并发任务调度器(Concurrent Task Scheduler)
### 📌 题目描述:
编写一个并发任务调度器程序,它能够:
1. 接收一组任务(函数)。
2. 并发执行这些任务。
3. 支持超时控制和提前取消。
4. 等待所有任务完成或其中一个任务出错后立即返回。
5. 使用 `sync.WaitGroup` 控制并发流程。
6. 使用 `channel` 在任务之间通信结果。
7. 使用 `context.Context` 实现任务取消和超时机制。
---
### ✨ 具体要求:
#### 函数签名如下:
```go
func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error
```
- `ctx`: 上下文,用于外部主动取消任务。
- `tasks`: 一组要执行的任务,每个任务是一个返回 `error` 的函数。
- `concurrency`: 最大并发数(即最多同时运行多少个任务)。
- `timeout`: 超时时间,一旦超过该时间还未完成,则取消所有任务。
#### 功能要求:
- 使用带缓冲的 channel 控制并发数量(类似工作池模式)。
- 每个任务执行后通过 channel 返回错误信息。
- 如果任意一个任务返回错误,应取消所有正在运行的任务。
- 如果超时或上下文被取消,也要取消所有任务。
- 所有 goroutine 必须优雅退出,不能泄露。
---
### 🧪 代码示例:
```go
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// RunTasks 并发执行一组任务,并支持取消、超时和错误快速返回
// 参数:
//
// ctx: 父上下文,用于控制整个任务的执行生命周期
// tasks: 需要执行的任务函数切片
// concurrency: 最大并发数限制
// timeout: 整个任务组的超时时间
//
// 返回值:
//
// error: 第一个遇到的错误或超时错误
func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error {
// 检查任务列表是否为空
if len(tasks) == 0 {
return nil
}
// 创建带超时的子上下文,timeout后会自动取消
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel() // 确保在函数退出时取消上下文,释放资源
// 创建可手动取消的子上下文,用于任务级别的控制
taskCtx, taskCancel := context.WithCancel(ctxWithTimeout)
defer taskCancel()
// 信号量模式控制最大并发数
semaphore := make(chan struct{}, concurrency)
// 错误通道,缓冲为1只需要第一个错误
errChan := make(chan error, 1)
var wg sync.WaitGroup // 用于等待所有任务完成
// 遍历所有任务
for _, task := range tasks {
// 检查上下文是否已取消或超时
select {
case <-taskCtx.Done():
return taskCtx.Err() // 直接返回取消原因
default:
}
wg.Add(1) // 增加等待计数器
semaphore <- struct{}{} // 获取一个并发槽位(如果已满会阻塞)
// 启动goroutine执行任务
go func(tsk func() error) {
defer func() {
<-semaphore // 释放并发槽位
wg.Done() // 标记任务完成
}()
// 执行实际任务
if err := tsk(); err != nil {
select {
case errChan <- err: // 尝试发送错误到通道
taskCancel() // 取消所有任务
default: // 如果错误通道已满(已有错误),则忽略后续错误
}
}
}(task)
}
// 启动goroutine等待所有任务完成
go func() {
wg.Wait() // 等待所有任务完成
close(errChan) // 关闭错误通道
}()
// 等待第一个错误或上下文取消
select {
case err := <-errChan: // 有任务出错
return err
case <-taskCtx.Done(): // 超时或被取消
return taskCtx.Err()
}
}
func main() {
// 定义测试任务组
tasks := []func() error{
func() error {
time.Sleep(1 * time.Second)
return nil
},
func() error {
time.Sleep(2 * time.Second)
return errors.New("task failed") // 模拟任务失败
},
func() error {
time.Sleep(3 * time.Second)
return nil
},
}
// 执行任务组: 最大并发2,超时5秒
err := RunTasks(context.Background(), tasks, 2, 5*time.Second)
if err != nil {
fmt.Println("Error:", err) // 输出: Error: task failed
}
}
```
---
### 💡 提示:
- 使用 `context.WithCancel` 或 `context.WithTimeout` 创建子上下文来传递取消信号。
- 使用 `sync.WaitGroup` 来等待所有任务启动完毕。
- 使用带缓冲的 channel 控制并发数量。
- 注意避免 goroutine 泄漏,确保在取消时所有协程都能退出。
---
🧠 整体逻辑目标回顾
这段代码的目标是:
并发执行一组任务函数
支持 最大并发数限制
超时自动取消
遇到错误立即返回并取消所有任务
使用标准库组件:context, channel, sync
📌 函数签名解析
func RunTasks(ctx context.Context, tasks []func() error, concurrency int, timeout time.Duration) error {
参数名 类型 含义
ctx context.Context 父上下文,用于控制整个任务组生命周期
tasks []func() error 要并发执行的任务列表
concurrency int 最大同时运行的任务数量
timeout time.Duration 整个任务组的总超时时间
返回值为第一个遇到的错误或上下文取消原因。
✅ 第一步:检查空任务
if len(tasks) == 0 {
return nil
}
如果没有任务要执行,直接返回。
防止后续不必要的操作。
🔁 创建两个嵌套的 context
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
taskCtx, taskCancel := context.WithCancel(ctxWithTimeout)
defer taskCancel()
🎯 目标:
创建一个组合上下文,支持两种取消方式:
外部主动取消(来自传入的 ctx)
内部主动取消(由某个任务出错触发)
超时自动取消
🧩 嵌套关系图解:
原始 ctx
↓
ctxWithTimeout (加了超时)
↓
taskCtx (加了手动取消)
ctxWithTimeout 是带超时的子上下文。
taskCtx 是基于 ctxWithTimeout 的可手动取消上下文。
所有任务都监听 taskCtx.Done() 来响应取消信号。
🚦 并发控制:使用 channel 实现"信号量"机制
semaphore := make(chan struct{}, concurrency)
这是一个 缓冲 channel,大小为 concurrency。
每次启动新任务前尝试往里面发送空结构体 struct{}。
如果已满(达到并发上限),就会阻塞,直到有其他 goroutine 释放槽位。
✅ 这是一种经典的 goroutine 并发控制模式,也叫"工作池"。
📡 错误通信:用 channel 传递第一个错误
errChan := make(chan error, 1)
缓冲为 1 的 channel,只保留第一个出现的错误。
其他任务即使也出错,也不会再写入(避免死锁)。
🧱 WaitGroup 控制任务完成状态
var wg sync.WaitGroup
用来等待所有任务执行完毕。
每个任务开始时调用 wg.Add(1),结束时调用 wg.Done()。
最后通过 wg.Wait() 判断是否全部完成。
🔁 遍历任务,启动 goroutine
for _, task := range tasks {
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
}
wg.Add(1)
semaphore <- struct{}{}
go func(tsk func() error) {
defer func() {
<-semaphore
wg.Done()
}()
if err := tsk(); err != nil {
select {
case errChan <- err:
taskCancel()
default:
}
}
}(task)
}
🔍 关键步骤详解:
1️⃣ 检查是否已取消/超时
select {
case <-taskCtx.Done():
return taskCtx.Err()
default:
}
在每次添加新任务前,先检查当前上下文是否已被取消。
如果已经被取消,就不再继续添加任务,直接返回错误。
2️⃣ 占用并发槽位
semaphore <- struct{}{}
成功发送表示获得了执行权限。
如果槽位满了,会阻塞,直到其他任务释放。
3️⃣ 启动 goroutine 执行任务
go func(tsk func() error) {
...
}(task)
把当前任务包装成闭包传入 goroutine 中执行。
4️⃣ defer 释放资源
defer func() {
<-semaphore
wg.Done()
}()
defer 保证无论任务正常还是异常结束,都会释放并发槽位和减少计数器。
wg.Done() 表示这个任务已完成。
5️⃣ 执行任务 & 错误处理
if err := tsk(); err != nil {
select {
case errChan <- err:
taskCancel()
default:
}
}
如果任务返回错误:
尝试将错误写入 errChan(非阻塞)
成功写入后调用 taskCancel() 取消所有任务
后续错误被忽略(因为 errChan 已满)
🧾 等待所有任务完成,并关闭 errChan
go func() {
wg.Wait()
close(errChan)
}()
启动一个新的 goroutine 等待所有任务完成。
完成后关闭 errChan,防止主 goroutine 死锁。
⏳ 主 goroutine 等待结果
select {
case err := <-errChan:
return err
case <-taskCtx.Done():
return taskCtx.Err()
}
选择最先发生的事件:
如果有任务出错,则从 errChan 收到错误,立即返回。
如果上下文被取消(可能是超时或手动取消),则返回取消原因。
🧪 示例 main 函数说明
func main() {
tasks := []func() error{
func() error {
time.Sleep(1 * time.Second)
return nil
},
func() error {
time.Sleep(2 * time.Second)
return errors.New("task failed") // 模拟任务失败
},
func() error {
time.Sleep(3 * time.Second)
return nil
},
}
err := RunTasks(context.Background(), tasks, 2, 5*time.Second)
if err != nil {
fmt.Println("Error:", err)
}
}
三个任务中,第二个最快失败。
主函数输出:
Error: task failed
🧠 总结:各个组件的作用
组件 作用
context 控制任务生命周期,实现取消与超时
channel 用于协程间通信,传递错误信息
WaitGroup 等待所有任务完成,确保优雅退出
goroutine 并发执行任务
buffered channel 控制最大并发数量(信号量模式)
📌 常见问题解答(FAQ)
❓ 为什么要用 taskCtx, taskCancel := context.WithCancel(ctxWithTimeout)?
这是为了在 超时之外还能手动取消任务。例如,如果某个任务出错,我们希望立刻取消所有正在运行的任务。
❓ 为什么 errChan 的缓冲是 1?
因为我们只需要第一个错误即可,后续错误可以忽略。缓冲为 1 可以防止 goroutine 因为无法写入而卡住。
❓ 为什么 semaphore 是带缓冲的 channel?
这模拟了"信号量"的行为,控制最多有多少个 goroutine 同时执行任务。
❓ 为什么 wg.Wait() 要放在另一个 goroutine 中?
因为主 goroutine 需要监听 errChan 和 taskCtx.Done(),不能自己阻塞等待所有任务完成。
有疑问加站长微信联系(非本文作者))
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信386 次点击 ∙ 1 赞
添加一条新回复
(您需要 后才能回复 没有账号 ?)
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传