Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 0d8bc16

Browse files
timursevimlitshemsedinov
authored andcommitted
Add pipeline usage example for queue on streams
1 parent af7f745 commit 0d8bc16

File tree

1 file changed

+88
-0
lines changed

1 file changed

+88
-0
lines changed

‎JavaScript/7-pipeline.js‎

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
'use strict';
2+
3+
const { Readable, Writable, Transform, pipeline } = require('node:stream');
4+
5+
class QueueStream extends Readable {
6+
constructor(concurrent) {
7+
super({ objectMode: true });
8+
this.concurrent = concurrent;
9+
this.count = 0;
10+
this.queue = [];
11+
}
12+
13+
static channels(concurrent) {
14+
return new QueueStream(concurrent);
15+
}
16+
17+
add(task) {
18+
this.queue.push(task);
19+
}
20+
21+
_read() {
22+
while (this.count < this.concurrent && this.queue.length > 0) {
23+
const task = this.queue.shift();
24+
this.count++;
25+
this.onProcess(task, (err, res) => {
26+
if (err) this.emit('error', err);
27+
this.push({ err, res });
28+
this.count--;
29+
});
30+
}
31+
if (this.queue.length === 0 && this.count === 0) {
32+
this.push(null);
33+
}
34+
}
35+
36+
process(listener) {
37+
this.onProcess = listener;
38+
return this;
39+
}
40+
}
41+
42+
// Usage
43+
44+
const fs = require('node:fs');
45+
46+
const fileWStream = fs.createWriteStream('./tasks.txt');
47+
48+
const stringifyStream = new Transform({
49+
readableObjectMode: true,
50+
writableObjectMode: true,
51+
write(data, encoding, next) {
52+
const result = JSON.stringify(data);
53+
this.push(result + '\n');
54+
next();
55+
},
56+
});
57+
58+
const writable = new Writable({
59+
objectMode: true,
60+
write(data, encoding, next) {
61+
console.log({ data });
62+
next();
63+
}
64+
});
65+
66+
const job = (task, next) => {
67+
setTimeout(next, task.interval, null, task);
68+
};
69+
70+
const queue = QueueStream.channels(3).process(job);
71+
72+
pipeline(queue, stringifyStream, fileWStream, (err) => {
73+
if (err) throw err;
74+
console.log('pipeline done');
75+
});
76+
77+
queue.pipe(writable);
78+
// queue.on('data', (data) => void console.log(data));
79+
80+
queue.on('end', () => void console.log('tasks end'));
81+
queue.on('close', () => void console.log('stream closed'));
82+
83+
writable.on('finish', () => void console.log('writable finished'));
84+
85+
for (let i = 0; i < 20; i++) {
86+
if (i < 10) queue.add({ name: `Task${i}`, interval: 1000 });
87+
else queue.add({ name: `Task${i}`, interval: i * 100 });
88+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /