Am experimenting with the IAsyncEnumerable interface and PipeReader class. Have come up with the following to decode a stream of bytes into chunks of characters and am able to decode all of my test files successfully but that doesn't mean that one hasn't thought of every edge case.
Of main concern to me is the final call to Convert
that is intended to flush any remaining characters in the decoder. As far as I can tell this call does absolutely nothing with my test data and I don't know enough to contrive an example. My code is loosely based on the example provided here but has been altered because it seems incorrect to use charsRead - charIndex
; as the latter variable is set via charIndex += charsUsed
. If one understands the example correctly then charsRead
represents the number of bytes that were read while charsUsed
actually represents the number of characters that were written. 1 byte is not always equivalent to 1 char, unless I'm mistaken, so one adjusted things accordingly.
Am not sure if my logic is totally sound and hoping one of you can enlighten me...
Code:
public static async IAsyncEnumerable<ReadOnlyMemory<char>> DecodeAsync(this PipeReader pipeReader, Decoder decoder) {
var decodedBlock = new ArrayBufferWriter<char>();
var readResult = await pipeReader.ReadAsync();
if (!readResult.IsCompleted) {
do {
var encodedBlock = readResult.Buffer;
try {
var isDecodingCompleted = false;
if (!isDecodingCompleted) {
do {
decoder.Convert(
bytes: in encodedBlock,
charsUsed: out _,
completed: out isDecodingCompleted,
flush: false,
writer: decodedBlock
);
yield return decodedBlock.WrittenMemory;
decodedBlock.Clear();
} while (!isDecodingCompleted);
}
}
finally {
pipeReader.AdvanceTo(encodedBlock.End);
}
readResult = await pipeReader.ReadAsync();
} while (!readResult.IsCompleted);
decoder.Convert(
bytes: in ReadOnlySequence<byte>.Empty,
charsUsed: out var numberOfCharsUsed,
completed: out _,
flush: true,
writer: decodedBlock
);
if (0 < numberOfCharsUsed) {
yield return decodedBlock.WrittenMemory;
}
}
}
Example Usage:
public static async IAsyncEnumerable<ReadOnlyMemory<char>> ReadLinesAsync(this PipeReader pipeReader, Decoder decoder) {
var isLineFeedCharacterHandlingEnabled = true;
var stringBuilder = new StringBuilder();
await foreach (var chunk in pipeReader.DecodeAsync(decoder)) {
var newLineIndex = chunk.Span.IndexOfAny('\n', '\r');
var offset = 0;
if (-1 != newLineIndex) {
do {
if (isLineFeedCharacterHandlingEnabled) {
if (0 == stringBuilder.Length) {
yield return chunk.Slice(offset, newLineIndex);
}
else {
stringBuilder.Append(chunk.Slice(offset, newLineIndex));
yield return stringBuilder.ToString().AsMemory();
stringBuilder.Clear();
}
}
isLineFeedCharacterHandlingEnabled = ('\r' != chunk.Span[offset..][newLineIndex]);
offset += (newLineIndex + 1);
newLineIndex = chunk.Span[offset..].IndexOfAny('\n', '\r');
} while (-1 != newLineIndex);
}
stringBuilder.Append(chunk[offset..]);
}
if (0 < stringBuilder.Length) {
yield return stringBuilder.ToString().AsMemory();
}
}
1 Answer 1
Whenever you see 3+ levels of indentation then it is a good sign to do some refactoring :D
After making some simple transformations I've ended up with this version
public static async IAsyncEnumerable<ReadOnlyMemory<char>> DecodeAsync(this PipeReader pipeReader, Decoder decoder)
{
var decodedBlock = new ArrayBufferWriter<char>();
ReadResult readResult;
while(!(readResult = await pipeReader.ReadAsync()).IsCompleted)
{
var encodedBlock = readResult.Buffer;
try
{
var isDecodingCompleted = false;
do
{
decoder.Convert(bytes: in encodedBlock, charsUsed: out _,
completed: out isDecodingCompleted, flush: false, writer: decodedBlock);
yield return decodedBlock.WrittenMemory;
decodedBlock.Clear();
} while (!isDecodingCompleted);
}
finally
{
pipeReader.AdvanceTo(encodedBlock.End);
}
}
decoder.Convert(bytes: in ReadOnlySequence<byte>.Empty, charsUsed: out var numberOfCharsUsed,
completed: out _, flush: true, writer: decodedBlock);
if (0 < numberOfCharsUsed)
{
yield return decodedBlock.WrittenMemory;
}
}
- Rather than having a
do
-while
, where the last statement is a newReadAsync
I suggest to convert it to a simplewhile
loop - With this approach you don't need the most outer guard:
if (!readResult.IsCompleted)
- Your most inner guard:
if (!isDecodingCompleted)
just dose not make sense, you have just declared the variable asfalse
in the previous line, so your guard expression will be alwaystrue
.
You can apply the same transformations for the ReadLinesAsync
as well.
As I've indicated in the comment section, I don't see the point why would you want to advance the pipeReader
if you have failed to decode.
-
1\$\begingroup\$ Thanks for the feedback! The silly outer guards are loop inversions that have been done by hand; is a bad habit one picked up that is likely unnecessary in .NET 6. As for the
AdvanceTo
call, I addressed that above. Don't like it myself but seems to be what MS expects us to do. \$\endgroup\$Kittoes0124– Kittoes01242021年09月24日 15:43:20 +00:00Commented Sep 24, 2021 at 15:43 -
\$\begingroup\$ @Kittoes0124 I've never encountered with the loop inversion concept before. For me it seems like a micro-optimization for this use case. Your method is I/O and memory bound, so optimizing for CPU instructions seems a bit counter-productive for me. But this is just my opinion. \$\endgroup\$Peter Csala– Peter Csala2021年09月27日 07:12:59 +00:00Commented Sep 27, 2021 at 7:12
pipeReader.AdvanceTo
in afinal
block? Why do you want to advance in case of exception? \$\endgroup\$**Always** call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. This lets the PipeReader know when the caller is done with the memory so that it can be tracked.
\$\endgroup\$