基于Node.js的优先异步队列
发布于 6 年前 作者 Checkson 4755 次浏览 来自 分享

原文地址

前言

想不到我在日常开发中,竟然遇到"优先异步队列"的需求。github上有类似功能,并且集成redis的库有KueBull等。但是,对于追求"简、易、轻"的我来说,其实并不满意。根据"二八原则",我决定,自己来实现一个只有上述两者"两成"功能的轻量级开源库:priority-async-queue

开源库的命名为什么这么长呢?原因是,我认为开发者只需看一眼库名就知道其功能,一点都不含糊。但是,为了行文流畅,priority-async-queue下面统一简称为"paq"。

你可能不需要paq

按照 redux 作者的套路,首先,我们需要明确的是,你可能不需要 paq

只有遇到 N 个异步任务不能并行执行,并且只能顺序执行时,你才需要使用 paq。

简单来说,如果你需要执行的 N 个异步任务,并不存在资源争夺和占用、数据共享、严格的逻辑顺序、优先权对比等,paq 可能是没必要的,用了反而降低执行效率、影响程序性能。

paq 设计思路

paq 的设计思路非常简单,一共3个类:

  • Task 是描述每个待执行(异步/同步)任务的执行逻辑以及配置参数。
  • PriorityQueue 是控制每个待执行(异步/同步)任务的优先级队列、具有队列的基本属性和操作。
  • AsyncQueue 是控制每个待执行(异步/同步)任务能严格顺序地执行的队列。

下面是 paq 的程序流程图:

paq

paq 基本概念和API

1. addTask

addTask 是核心方法,它可以创建一个任务,并且添加到 paq 队列中。

paq.addTask([options, ]callback);

options 是一个可选对象,包含以下属性:

{
 id: undefined, // 任务id
 priority: 'normal', // 任务权重,例如: low, normal, mid, high, urgent
 context: null, // 执行任务的上下文
 start: (ctx, options) => {}, // 任务将要被执行的回调
 completed: (ctx, res) => {}, // 任务执行完成后的回调
 failed: (ctx, err) => {}, // 任务执行失败后的回调
 remove: (ctx, options) => {} // 任务被删除后的回调
}

callback 是一个描述执行任务的逻辑的函数,它包含两个参数:ctxoptions:

  • ctx 是任务所属的paq实例。
  • options 是此任务的options参数的最终值。
paq.addTask((ctx, options) => {
 console.log(ctx === paq); // true
});

2. removeTask

removeTask 方法是根据任务 id 来删除等待对列中的任务。

paq.removeTask(taskId);

如果成功删除任务,它将返回true。否则,它将返回false。

3. pause

pause 方法是暂停 paq 继续执行任务。

paq.pause();

注意: 但是,您无法暂停当前正在执行的任务,因为无法暂时检测到异步任务的进度。

4. isPause

isPause 属性,返回当前队列是否处于暂停状态。

paq.isPause; // return true or false.

5. resume

resume 方法,用来重新启动 paq 队列执行任务。

paq.resume();

6. clearTask

cleartTask 方法,用来清除 paq 等待队列中所有的任务。

paq.clearTask();

paq 用法

1. 基本用法

只要向 paq 添加任务,该任务就会自动被执行。

const PAQ = require('priority-async-queue');
const paq = new PAQ();
paq.addTask((ctx, options) => {
 console.log('This is a simple task!');
});
// This is a simple task!

2. 同步任务

你可以使用 paq 执行一系列同步任务,例如:

const syncTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask(() => {
 console.log('Step', i, 'sync');
 return i;
 });
 }
};
syncTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync

3. 异步任务

你还可以使用 paq 执行一系列的异步任务,例如:

const asyncTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask(() => {
 return new Promise(resolve => {
 setTimeout(() => {
 console.log('Step', i, 'async');
 resolve(i);
 }, i * 1000);
 });
 });
 }
};
asyncTask(3);
// Step 0 async
// Step 1 async
// Step 2 async

4. 混合任务

你甚至可以使用 paq 执行一系列同步和异步交错的任务,例如:

const mixTask = (n) => {
 asyncTask(n);
 syncTask(n);
 asyncTask(n);
};
mixTask(2);
// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async

5. 绑定执行上下文

有时如果你需要指定上下文来执行任务,例如:

const contextTask = (n) => {
 var testObj = {
 name: 'foo',
 sayName: (name) => {
 console.log(name);
 }
 };
 for (let i = 0; i < n; i++) {
 paq.addTask({ context: testObj }, function () {
 this.sayName(this.name + i);
 });
 }
};
contextTask(3);
// foo0
// foo1
// foo2

注意: this 在箭头函数中并不存在,或者说它是指向其定义处的上下文。

6. 延迟执行

paq 还支持延迟执行任务,例如:

const delayTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask({ delay: 1000 * i }, () => {
 console.log('Step', i, 'sync');
 return i;
 });
 }
};
delayTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync

7. 优先权

如果需要执行的任务具有权重,例如:

const priorityTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask({ priority: i === n - 1 ? 'high' : 'normal' }, () => {
 return new Promise(resolve => {
 setTimeout(() => {
 console.log('Step', i, 'async');
 resolve(i);
 }, i * 1000);
 });
 });
 }
};
priorityTask(5);
// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async

默认优先级映射如下:

{
 "low": 1,
 "normal": 0, // default
 "mid": -1,
 "high": -2,
 "urgent": -3
}

8. 回调函数

有时,你希望能够在任务的开始、完成、失败、被删除时做点事情,例如

const callbackTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask({
 id: i,
 start: (ctx, options) => {
 console.log('start running task id is', options.id);
 },
 completed: (ctx, res) => {
 console.log('complete, result is', res);
 },
 failed: (ctx, err) => {
 console.log(err);
 }
 }, () => {
 if (i < n / 2) {
 throw new Error(i + ' is too small!');
 }
 return i;
 });
 }
};
callbackTask(5);
// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4

9. 删除任务

有时,你需要删除一些任务,例如:

const removeTask = (n) => {
 for (let i = 0; i < n; i++) {
 paq.addTask({
 id: i,
 remove: (ctx, options) => {
 console.log('remove task id is', options.id);
 }
 }, () => {
 return new Promise(resolve => {
 setTimeout(() => {
 console.log('Step', i, 'async');
 resolve(i);
 }, i * 1000);
 });
 });
 }
 console.log(paq.removeTask(3));
 console.log(paq.removeTask(5));
};
removeTask(5);
// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async

注意: 你必须在创建任务时分配id,并根据id删除任务。

paq 事件

如果需要监视 paq 队列的状态,paq 提供以下事件侦听器:

1. addTask

paq.on('addTask', (options) => {
 // Triggered when the queue adds a task.
});

2. startTask

paq.on('startTask', (options) => {
 // Triggered when a task in the queue is about to execute.
});

3. changeTask

paq.on('changeTask', (options) => {
 // Triggered when a task in the queue changes.
});

4. removeTask

paq.on('removeTask', (options) => {
 // Triggered when the queue remove a task.
});

5. completed

paq.on('completed', (options, result) => {
 // Triggered when the task execution in the queue is complete.
});

6. failed

paq.on('failed', (options, err) => {
 // Triggered when a task in the queue fails to execute.
});

最后,想补充的是:若遇到 Promise.allPromise.race 解决不了的需求,可以考虑一下 paq

回到顶部

AltStyle によって変換されたページ (->オリジナル) /