I have some code to interface with a wire protocol that requires data to be inserted into a stream at regular byte intervals. Every 8KB (or at some other definable interval), a small chunk will be inserted. To make this easy, I decided to create a transform stream that would take a flowing stream and write fixed chunk sizes. That is, this stream can be written to in any size (2KB here, 500KB there, 5 bytes next, etc.) and it will output chunks in 8KB of size every time.
var stream = require('stream');
function ChunkerTransformStream (chunkSize) {
chunkSize = chunkSize || 8192;
var buffer = new Buffer(0);
var chunker = new stream.Transform({objectMode: true});
chunker._transform = function (chunk, encoding, done) {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= chunkSize) {
this.push(buffer.slice(0, chunkSize));
buffer = buffer.slice(chunkSize);
}
done();
}
chunker._flush = function (done) {
if (buffer.length) {
this.push(buffer);
done();
}
}
return chunker;
}
module.exports = ChunkerTransformStream;
This transform stream will be used heavily in my code, having several megabit pushed through it per second. Is this the most efficient way to achieve what I want? I am most concerned about my buffer operations. It's my understanding that Buffer.concat()
is very expensive, as it allocates an entirely new buffer and copies the first two to it.
Any feedback, on performance or otherwise, is welcomed.
-
\$\begingroup\$ Is there a version of this which works with StreamTransform in the browser? I tried to create one but could not find any documentation or examples for the same. \$\endgroup\$Samay– Samay2022年08月27日 23:35:39 +00:00Commented Aug 27, 2022 at 23:35
-
\$\begingroup\$ I ended up creating one that works in the browser. gist.github.com/samaybhavsar/97d2674536c6f64de8b6d2c43085a347 \$\endgroup\$Samay– Samay2022年08月30日 14:35:05 +00:00Commented Aug 30, 2022 at 14:35
3 Answers 3
You can actually do this without using Buffer.concat()
and by only allocating a single Buffer
of size chunkSize
.
The basic flow for each chunk coming in to the stream is:
- If we have data in
buffer
already and there's enough data in the incomingchunk
to fillbuffer
up tochunkSize
, copy the appropriate bytes fromchunk
tobuffer
, pushbuffer
, and then clear it. - If we have data in
buffer
already but there isn't enough data in the incomingchunk
to fillbuffer
up tochunkSize
, appendchunk
tobuffer
. - If there's nothing in
buffer
, pushchunkSize
chunks fromchunk
(that's a mouth full) and put and remaining bytes inbuffer
.
This greatly reduces the number of byte that are copied around. Instead of each chunk
being copied to buffer
, only the bytes that are needed from chunk
are copied. And, if chunk
happens to exactly fit in chunkSize
, no bytes are copied at all.
I tried this out using your code as a base. I haven't tested it thoroughly, but it works well as a proof as concept:
var stream = require('stream');
// Helper function to push chunks of chunkSize from buffer to stream
// returns a Buffer containing the remaining bytes that weren't pushed
function pushChunks(buffer, stream, chunkSize) {
var offset = 0,
size = buffer.length;
while (size >= chunkSize) {
stream.push(buffer.slice(offset, offset + chunkSize));
offset += chunkSize;
size -= chunkSize;
}
return buffer.slice(offset, offset + size);
}
function ChunkerTransformStream (chunkSize) {
chunkSize = chunkSize || 8192;
var buffer = new Buffer(chunkSize),
bufferOffset = 0;
var chunker = new stream.Transform({objectMode: true});
chunker._transform = function (chunk, encoding, done) {
// If we have data in the buffer, try to fill it up to chunkSize.
if (bufferOffset != 0) {
var bytesNeeded = (chunkSize - bufferOffset);
// If we have enough bytes in this chunk to get buffer up to chunkSize,
// fill in buffer, push it, and reset its offset.
// Otherwise, just copy the entire chunk in to buffer.
if (chunk.length >= bytesNeeded) {
chunk.copy(buffer, bufferOffset, 0, bytesNeeded);
this.push(buffer);
bufferOffset = 0;
chunk = chunk.slice(0, chunk.length - bytesNeeded);
} else {
chunk.copy(buffer, bufferOffset);
bufferOffset += chunk.length;
}
}
// If there's nothing in the buffer, push the chunk.
if (bufferOffset == 0) {
var remainingChunk = pushChunks(chunk, this, chunkSize);
// If there are bytes left over, put them in the buffer.
if (remainingChunk.length) {
remainingChunk.copy(buffer, bufferOffset);
bufferOffset += remainingChunk.length;
}
}
done();
}
chunker._flush = function (done) {
if (bufferOffset) {
this.push(buffer.slice(0, bufferOffset));
done();
}
}
return chunker;
}
module.exports = ChunkerTransformStream;
-
1\$\begingroup\$ Firstly, chunk = chunk.slice(0, chunk.length - bytesNeeded) is wrong, it should be chunk = chunk.slice(bytesNeeded, chunk.length) instead. Secondly, we need to make a copy of the buffer before we push it. otherwise we can end up with mixed data \$\endgroup\$zag2art– zag2art2019年03月29日 08:08:42 +00:00Commented Mar 29, 2019 at 8:08
First of all, your code looks great. It brought me to study the internal workings of Node's stream API.
The piece that you're concerned with, and rightfully so, is:
buffer = Buffer.concat([buffer, chunk]);
That's the way to do it. There are no other ways to do what you want to do without using bufferjs, which likely handles things the same way, or less efficiently.
Now, there is one optimization you can make here. By adding the totalLength
parameter of Buffer.concat Buffer.concat(list, [totalLength])
you can improve efficiency by avoiding an additional loop in the function, which would need to check the new length of the new buffer.
Name Description Required? Type list List of Buffer objects to concat Required array totalLength Total length of the buffers when concatenated. Optional number
If totalLength is not provided, it is read from the buffers in the list. However, this adds an additional loop to the function, so it is faster to provide the length explicitly. - See more at: http://www.w3resource.com/node.js/nodejs-buffer.php#sthash.qISHPlCO.dpuf
Not sure if this will help or not, but other than that, I don't see any possible optimizations.
-
\$\begingroup\$ I'm glad that after you've helped me with node.js multiple times on Stack Overflow, I can try to give back. It's interesting that I was struggling with just learning Node a few months ago, and thanks to people like you, I can now study and write code at this level. \$\endgroup\$user73428– user734282014年07月20日 21:55:32 +00:00Commented Jul 20, 2014 at 21:55
-
\$\begingroup\$ +1, thanks for the feedback and I'm happy that I've been able to help you with questions in the past! \$\endgroup\$Brad– Brad2014年07月21日 00:38:10 +00:00Commented Jul 21, 2014 at 0:38
-
\$\begingroup\$ Looks like I was wrong! \$\endgroup\$user73428– user734282014年08月04日 07:40:26 +00:00Commented Aug 4, 2014 at 7:40
The _flush
function is wrong. It will never close the stream if the buffer is empty.
chunker._flush = function (done) {
if (buffer.length) {
this.push(buffer);
}
done();
}