3
\$\begingroup\$

I have some code to interface with a wire protocol that requires data to be inserted into a stream at regular byte intervals. Every 8KB (or at some other definable interval), a small chunk will be inserted. To make this easy, I decided to create a transform stream that would take a flowing stream and write fixed chunk sizes. That is, this stream can be written to in any size (2KB here, 500KB there, 5 bytes next, etc.) and it will output chunks in 8KB of size every time.

var stream = require('stream');
function ChunkerTransformStream (chunkSize) {
 chunkSize = chunkSize || 8192;
 var buffer = new Buffer(0);
 var chunker = new stream.Transform({objectMode: true});
 chunker._transform = function (chunk, encoding, done) {
 buffer = Buffer.concat([buffer, chunk]);
 while (buffer.length >= chunkSize) {
 this.push(buffer.slice(0, chunkSize));
 buffer = buffer.slice(chunkSize);
 }
 done();
 }
 chunker._flush = function (done) {
 if (buffer.length) {
 this.push(buffer);
 done();
 }
 }
 return chunker;
}
module.exports = ChunkerTransformStream;

This transform stream will be used heavily in my code, having several megabit pushed through it per second. Is this the most efficient way to achieve what I want? I am most concerned about my buffer operations. It's my understanding that Buffer.concat() is very expensive, as it allocates an entirely new buffer and copies the first two to it.

Any feedback, on performance or otherwise, is welcomed.

asked Jul 20, 2014 at 2:05
\$\endgroup\$
2
  • \$\begingroup\$ Is there a version of this which works with StreamTransform in the browser? I tried to create one but could not find any documentation or examples for the same. \$\endgroup\$ Commented Aug 27, 2022 at 23:35
  • \$\begingroup\$ I ended up creating one that works in the browser. gist.github.com/samaybhavsar/97d2674536c6f64de8b6d2c43085a347 \$\endgroup\$ Commented Aug 30, 2022 at 14:35

3 Answers 3

2
\$\begingroup\$

You can actually do this without using Buffer.concat() and by only allocating a single Buffer of size chunkSize.

The basic flow for each chunk coming in to the stream is:

  1. If we have data in buffer already and there's enough data in the incoming chunk to fill buffer up to chunkSize, copy the appropriate bytes from chunk to buffer, push buffer, and then clear it.
  2. If we have data in buffer already but there isn't enough data in the incoming chunk to fill buffer up to chunkSize, append chunk to buffer.
  3. If there's nothing in buffer, push chunkSize chunks from chunk (that's a mouth full) and put and remaining bytes in buffer.

This greatly reduces the number of byte that are copied around. Instead of each chunk being copied to buffer, only the bytes that are needed from chunk are copied. And, if chunk happens to exactly fit in chunkSize, no bytes are copied at all.

I tried this out using your code as a base. I haven't tested it thoroughly, but it works well as a proof as concept:

var stream = require('stream');
// Helper function to push chunks of chunkSize from buffer to stream
// returns a Buffer containing the remaining bytes that weren't pushed
function pushChunks(buffer, stream, chunkSize) {
 var offset = 0,
 size = buffer.length;
 while (size >= chunkSize) {
 stream.push(buffer.slice(offset, offset + chunkSize));
 offset += chunkSize;
 size -= chunkSize;
 }
 return buffer.slice(offset, offset + size);
}
function ChunkerTransformStream (chunkSize) {
 chunkSize = chunkSize || 8192;
 var buffer = new Buffer(chunkSize),
 bufferOffset = 0;
 var chunker = new stream.Transform({objectMode: true});
 chunker._transform = function (chunk, encoding, done) {
 // If we have data in the buffer, try to fill it up to chunkSize.
 if (bufferOffset != 0) {
 var bytesNeeded = (chunkSize - bufferOffset);
 // If we have enough bytes in this chunk to get buffer up to chunkSize,
 // fill in buffer, push it, and reset its offset.
 // Otherwise, just copy the entire chunk in to buffer.
 if (chunk.length >= bytesNeeded) {
 chunk.copy(buffer, bufferOffset, 0, bytesNeeded);
 this.push(buffer);
 bufferOffset = 0;
 chunk = chunk.slice(0, chunk.length - bytesNeeded);
 } else {
 chunk.copy(buffer, bufferOffset);
 bufferOffset += chunk.length;
 }
 }
 // If there's nothing in the buffer, push the chunk.
 if (bufferOffset == 0) {
 var remainingChunk = pushChunks(chunk, this, chunkSize);
 // If there are bytes left over, put them in the buffer.
 if (remainingChunk.length) {
 remainingChunk.copy(buffer, bufferOffset);
 bufferOffset += remainingChunk.length;
 }
 }
 done();
 }
 chunker._flush = function (done) {
 if (bufferOffset) {
 this.push(buffer.slice(0, bufferOffset));
 done();
 }
 }
 return chunker;
}
module.exports = ChunkerTransformStream;
answered Aug 3, 2014 at 1:22
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Firstly, chunk = chunk.slice(0, chunk.length - bytesNeeded) is wrong, it should be chunk = chunk.slice(bytesNeeded, chunk.length) instead. Secondly, we need to make a copy of the buffer before we push it. otherwise we can end up with mixed data \$\endgroup\$ Commented Mar 29, 2019 at 8:08
6
\$\begingroup\$

First of all, your code looks great. It brought me to study the internal workings of Node's stream API.

The piece that you're concerned with, and rightfully so, is:

buffer = Buffer.concat([buffer, chunk]);

That's the way to do it. There are no other ways to do what you want to do without using bufferjs, which likely handles things the same way, or less efficiently.

Now, there is one optimization you can make here. By adding the totalLength parameter of Buffer.concat Buffer.concat(list, [totalLength]) you can improve efficiency by avoiding an additional loop in the function, which would need to check the new length of the new buffer.

 Name Description Required? Type
 list List of Buffer objects to concat Required array
 totalLength Total length of the buffers when concatenated. Optional number

If totalLength is not provided, it is read from the buffers in the list. However, this adds an additional loop to the function, so it is faster to provide the length explicitly. - See more at: http://www.w3resource.com/node.js/nodejs-buffer.php#sthash.qISHPlCO.dpuf

Not sure if this will help or not, but other than that, I don't see any possible optimizations.

answered Jul 20, 2014 at 21:53
\$\endgroup\$
3
  • \$\begingroup\$ I'm glad that after you've helped me with node.js multiple times on Stack Overflow, I can try to give back. It's interesting that I was struggling with just learning Node a few months ago, and thanks to people like you, I can now study and write code at this level. \$\endgroup\$ Commented Jul 20, 2014 at 21:55
  • \$\begingroup\$ +1, thanks for the feedback and I'm happy that I've been able to help you with questions in the past! \$\endgroup\$ Commented Jul 21, 2014 at 0:38
  • \$\begingroup\$ Looks like I was wrong! \$\endgroup\$ Commented Aug 4, 2014 at 7:40
3
\$\begingroup\$

The _flush function is wrong. It will never close the stream if the buffer is empty.

chunker._flush = function (done) {
 if (buffer.length) {
 this.push(buffer);
 }
 done();
}
mdfst13
22.4k6 gold badges34 silver badges70 bronze badges
answered Oct 1, 2016 at 19:56
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.