I'm trying to read quite big file with around 28 million of rows in the following way:
var jobs = new ActionBlock<string[]>((jobs) =>
{
//Some code
});
var pool = ArrayPool<string>.Shared;
var items = pool.Rent(1000);
using (FileStream fs = File.Open("test.csv", FileMode.Open))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs, Encoding.UTF8, true, 1024))
{
var count = 0;
string s;
while ((s = sr.ReadLine()) != null)
{
items[count] = s;
if (count++ == 999)
{
count = 0;
jobs.Post(items);
}
}
jobs.Complete();
}
await jobs.Completion;
pool.Return(items);
I'm trying to read it by means of ReadLine with some pre-defined buffer. Now it takes around 17 seconds in average. Each line contains alike lines:
100.1.8.hah,2017年05月16日,00:00:00,0.0,1054102.0,0001493152-17-005364,form8-k.htm,200.0,3767.0,0.0,0.0,0.0,9.0,0.0,
How can I be sure is it slow and needs enhancements or it's quite appropriate ? If someone have some clues please let me know please ...
2 Answers 2
Generally IO operations are the slowest. You potentially have an issue with your code. ActionBlock can be async or chained into other blocks. Because you are sharing the same array on each post the data will change. Typically ArrayPools should be used if you are in control over the entire lifetime and usage of the array. When pushing it out of the method you lose that control.
You can also replace all the low level code of FileStream/BufferedStream/StreamReader with just File.ReadLines and if using .NET 6 could use Enumerable.Chunk to group them.
Usually large files the biggest concern is loading too much in memory. While using the IEnumerable of File.ReadLines will only load into memory as it's demanded. If wanting to go more "async" to not block on readings could still use the StreamReader.ReadLineAsync and implement IAsyncEnumerable.
Example of how changing ActionBlock can compile but at runtime have strange results. On my machine I do not always get the same results on both Console.Log
var jobs = new ActionBlock<string[]>(async (jobs) =>
{
var count = Interlocked.Increment(ref _count);
if (count == 1)
{
Console.WriteLine(jobs[0]);
}
await Task.Delay(TimeSpan.FromSeconds(2));
if (count == 1)
{
Console.WriteLine(jobs[0]);
}
});
Just to add my two cents:
If the number of lines in your csv-file aren't evenly dividable by 1000
your code won't process the last batch of lines because the items
won't be passed to the ActionBlock
. This can be fixed by passing items
before jobs.Complete();
but you need to ensure only pass the items which are newly assigned, meaning e.g the last batch contains only 10 lines you need to ensure only to pass items[0]...items[9]
.
jobs
execution included in the 17 seconds? \$\endgroup\$