I wrote this code for a home task, and received some negative feedback. Though I really appreciated the reviewer, but I disagree when he said I use auto
disorderly, and the algorithm is not optimal (I know it's not optimal, but comparing to the code that they expected which runs about 5 times slower than mine, it's really hard to accept).
There are two big problems I see (when reviewing again my code):
- I did not handle the case when there are more than 1024 chunks, which usually is the number of max open files in Linux. I'm aware of that and I mentioned it when I sent the code instead of handling it in my code, as we can increase the limit of the system.
- There is a bug when storing strings with vectors. When we use GCC, the vector will resize by allocating new
2x current memory
and copy new elements to the new location, so we can use more memory than we expected. Interestingly, no one points out this.
I would appreciate any comments to know where should I improve and get better next time. Below is from the reviewer:
- His application could not run with 1.6GB, this error might come from the limitation for reading file simultaneously. I think he did not check with huge file and he could not avoid the I/O problem. {{{ ./main ~/PycharmProjects/serp_improvement/data_restore/ch_result_20.csv out_ch_result_20.csv 100000 Sort chunks in async mode Intermediate files 1021 are missing! Considering remove all temp files with "rm -r *.tmp" }}}
- His algorithm is not good enough:
- To get the data by line, he used the seek function to back the latest line for avoiding limitation exceeding. This method is quite complex.
- He merges all chunks files to the output file at the same time. This method faces I/O blocking, on my machine, OS allows reading 1024 files simultaneously.
- His coding is quite complex, hard to read: for example, sometimes, he used auto, sometimes no.
My code:
#include <iostream>
#include <string>
#include <vector>
#include <fstream>
#include <future>
#include <memory>
#include <algorithm>
#include <locale>
#include <codecvt>
#include <deque>
#include <cstdio>
#include <queue>
// we will use async read & write, so make the stream shareable
typedef std::shared_ptr<std::ifstream> istream_pointer;
typedef std::shared_ptr<std::ofstream> ostream_pointer;
// string comparison, sort by < by default
// uncomment bellow line to sort with std::locale
// #define ENABLE_LOCALE
#ifdef ENABLE_LOCALE
bool strComp(const std::string& a, const std::string& b) {
// en rules works quite well for latin characters
static std::locale comp("en_US.UTF-8");
return comp(a, b);
}
#else
bool strComp(const std::string& a, const std::string& b) { return a < b; }
#endif
// read from a stream no more than `limit` bytes
std::vector<std::string> readFromStream(const istream_pointer& fin,
size_t limit) {
std::vector<std::string> ret;
std::string line;
size_t sz = 0; // total bytes of contents
size_t len = fin->tellg(); // for back track
while (std::getline(*fin, line)) {
if (sz + line.size() > limit) {
// put line back to stream
fin->seekg(len, std::ios_base::beg);
break;
}
sz += line.size();
len += line.size() + 1; // +1 for newline
ret.push_back(std::move(line));
}
// assume copy elision
return ret;
}
// write a vector of string to a stream line by line
int writeToStream(const ostream_pointer& fout,
const std::vector<std::string>& chunks) {
for (auto& v : chunks) {
*fout << v << '\n';
}
return 1;
}
// split file to chunks and sort each chunks
size_t sortChunks(const istream_pointer& fin, size_t nbytes) {
// pre-read some lines
std::vector<std::string> cur;
const size_t sz = nbytes;
readFromStream(fin, sz).swap(cur);
size_t n_chunks = 0;
while (cur.size() > 0) {
// sort current chunk
std::sort(cur.begin(), cur.end(), strComp);
// write the chunk to fout
ostream_pointer fout =
std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp");
writeToStream(fout, cur);
// read new chunks
cur.clear();
readFromStream(fin, sz).swap(cur);
}
return n_chunks;
}
// split file to chunks and sort each chunks - async read
size_t sortChunksAsync(const istream_pointer& fin, size_t nbytes) {
// pre-read some lines
std::vector<std::string> cur;
const size_t sz = nbytes / 2;
readFromStream(fin, sz).swap(cur);
int n_chunks = 0;
while (cur.size() > 0) {
// async point: read next chunk from stream async while process current
// chunk
std::future<std::vector<std::string>> nxt = std::async(
std::launch::async, readFromStream, fin, sz); // non-blocking
// sort current chunk
std::sort(cur.begin(), cur.end(), strComp);
// write the chunk to fout
ostream_pointer fout =
std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp");
writeToStream(fout, cur);
// wait for reading nxt done
nxt.wait();
// async point: swap cur with next
nxt.get().swap(cur);
}
return n_chunks;
}
// we will use priority queue to merge k buffers, but the actual strings should
// not be pushed to the queue. In stead, we use vector iterator (which is just
// pointer). We also need the identity of the buffer to handle when we reach
// end()
typedef std::pair<std::vector<std::string>::iterator, size_t> vstring_iterator;
// Merge K streams and write to fout
void kWayMerge(const std::vector<istream_pointer>& streams,
const ostream_pointer& fout, size_t nbytes) {
const size_t n_chunks = streams.size();
std::cout << "Merging " << n_chunks << " streams\n";
// max size of chunks
const size_t sz = nbytes / (n_chunks + 1);
// buffer to store sorted chunks
std::vector<std::vector<std::string>> bufs(n_chunks);
// fill the buffer some lines
for (size_t i = 0; i < n_chunks; ++i) {
readFromStream(streams[i], sz).swap(bufs[i]);
}
// output buffers
std::vector<std::string> ret;
// comparator for priority queue
auto comp = [](const vstring_iterator& it0, const vstring_iterator& it1) {
return strComp(*it1.first, *it0.first); // min heap
};
std::priority_queue<vstring_iterator, std::vector<vstring_iterator>,
decltype(comp)> pq(comp);
// push the begining of each buffer to pq
for (size_t i = 0; i < n_chunks; ++i) {
if (bufs[i].size() > 0) {
pq.push({bufs[i].begin(), i});
}
else {
streams[i]->close(); // empty stream
}
}
size_t sz2 = 0; // keep track the size of output buffer
// now run untill we have nothing to push to pq
while (!pq.empty()) {
auto vit = pq.top();
auto it = vit.first; //current iterator
auto id = vit.second; // id of the buffer
pq.pop();
// std::cout << *it << std::endl;
if (sz2 + it->size() > sz) {
writeToStream(fout, ret);
sz2 = 0;
ret.clear();
}
sz2 += it->size();
auto nxt = it + 1; // next string in bufs[id]
ret.push_back(move(*it));
if (nxt == bufs[id].end()) { // reach end of buffer id
bufs[id].clear();
readFromStream(streams[id], sz).swap(bufs[id]);
if (bufs[id].size() > 0) {
nxt = bufs[id].begin();
pq.push({nxt, id});
} else {
// if buf is empty, streams is ended
streams[id]->close();
}
} else { // if not, just push to queue
pq.push({nxt, id});
}
}
// last write
writeToStream(fout, ret);
return;
}
// Merge K streams and write to fout - async read
void kWayMergeAsync(const std::vector<istream_pointer>& streams,
const ostream_pointer& fout, size_t nbytes) {
const size_t n_chunks = streams.size();
std::cout << "Merging " << n_chunks << " streams\n";
// max size of chunks
const size_t sz = nbytes / n_chunks;
// we only use half limit size of buffer for async read
const size_t bz = sz / 2;
// buffer to store strings in sorted chunks
std::vector<std::vector<std::string>> bufs(n_chunks);
// next buffers
std::vector<std::future<std::vector<std::string>>> nxt_bufs(n_chunks);
// fill the buffer some line
for (size_t i = 0; i < n_chunks; ++i) {
readFromStream(streams[i], bz).swap(bufs[i]);
}
// prefetch some next buffer
for (size_t i = 0; i < n_chunks; ++i) {
nxt_bufs[i] = std::async(readFromStream, streams[i], bz);
}
// mereged buffers
std::vector<std::string> ret;
std::future<int> pret = std::async(std::launch::async,writeToStream,fout,std::move(ret));
// comparator for priority queue
auto comp = [](vstring_iterator& it0, vstring_iterator& it1) {
return strComp(*it1.first, *it0.first); // min heap
};
std::priority_queue<vstring_iterator, std::vector<vstring_iterator>,
decltype(comp)> pq(comp);
// push the begining of each buffer to pq
for (size_t i = 0; i < n_chunks; ++i) {
if (bufs[i].size() > 0) {
pq.push({bufs[i].begin(), i});
}
else {
streams[i]->close(); // empty stream
}
}
size_t sz2 = 0; // keep track the size of merged buffer
// now run until we have nothing to push to pq
while (!pq.empty()) {
auto vit = pq.top();
auto it = vit.first; //current iterator
auto id = vit.second; // id of the buffer
pq.pop();
// std::cout << *it << std::endl;
if (sz2 + it->size() > bz) {
pret.wait();
pret = std::async(std::launch::async,writeToStream,fout,std::move(ret));
sz2 = 0;
}
sz2 += it->size();
auto nxt = it + 1; // next string in bufs[id]
ret.push_back(move(*it));
if (nxt == bufs[id].end()) { // reach end of buffer id
// wait for next buffer - expected no wait
nxt_bufs[id].wait();
// swap contents of current buffer with next buffer
nxt_bufs[id].get().swap(bufs[id]);
if (bufs[id].size() > 0) {
nxt = bufs[id].begin();
pq.push({nxt, id});
// prefetch next bufs[id]
nxt_bufs[id] = std::async(std::launch::async, readFromStream, streams[id], bz);
} else {
// if buf is empty, streams is ended
streams[id]->close();
}
} else { // if not, just push to queue
pq.push({nxt, id});
}
}
// last write
pret.wait();
writeToStream(fout, ret);
return;
// what if using k thread to push to a priority queue and one thread to from
// the queue?
}
void cleanTmpFiles(size_t chunks) {
for (size_t i = 0; i < chunks; ++i) {
std::remove((std::to_string(i) + ".tmp").c_str());
}
}
// Our extenal sort funtion
void externalSort(const char* input_file, const char* output_file,
size_t limits, int async) {
// read input file
istream_pointer fin = std::make_shared<std::ifstream>(input_file);
if (!fin->is_open()) {
throw std::logic_error("Input file is missing");
}
// sort the stream
size_t n_chunks = 0;
if (async & 1) {
std::cout << "Sort chunks in async mode\n";
n_chunks = sortChunksAsync(fin, limits);
} else {
n_chunks = sortChunks(fin, limits);
}
fin->close();
// read temporary file
std::vector<istream_pointer> streams(n_chunks);
for (size_t i = 0; i < n_chunks; ++i) {
istream_pointer isptr =
std::make_shared<std::ifstream>(std::to_string(i) + ".tmp");
if (!isptr->is_open()) {
cleanTmpFiles(n_chunks);
throw std::logic_error("Itermediate files " + std::to_string(i) +
" are missing!");
}
streams[i] = std::move(isptr);
}
// stream to output file
ostream_pointer fout = std::make_shared<std::ofstream>(output_file);
// merge the streams
if (async & 2) {
std::cout << "Merge chunks in async mode\n";
kWayMergeAsync(streams, fout, limits);
} else {
kWayMerge(streams, fout, limits);
}
fout->close();
// clean tmp file
cleanTmpFiles(n_chunks);
std::cout << "Done!\n";
}
// out main application
int main(int argc, char* argv[]) {
const char* input_file = (argc > 1) ? argv[1] : "input.txt";
const char* output_file = (argc > 2) ? argv[2] : "output.txt";
size_t limits = (argc > 3) ? std::stoull(argv[3]) : 6000;
int async = (argc > 4) ? std::stoi(argv[4]) : 3;
try {
externalSort(input_file, output_file, limits, async);
}
// should clean tmp files, but don't know how many files, so let remind users
// doing it
catch (const std::exception& e) {
std::cerr << e.what() << "\n";
std::cerr << "Considering remove all temp files with \"rm -r *.tmp\"\n";
return -1;
} catch (...) {
std::cerr << "Exception caught during sorting!\nConsidering remove all temp "
"files with \"rm -r *.tmp\"";
return -1;
}
return 0;
}
UPDATE I will summarise the bugs here, and make the better version in my spare time.
Bugs found from user673679
:
- Formatting is hard to read.
2 spaces
indent is good for me, but I think it may be hard for others to read. I usedclang-format
, so it may make thing worst. - Bugs related to the minimum size of memory; we should handle the case when we have more than allowed opened files. I think it can be done with incremental merging (merge 512 files at a time for example).
- The code is over-complicated, especially when handling async IO. He suggests an
async_read
andasync_write
class that hold bothcurrent
andnext
buffer and hide all internal async detail. I think this is a very nice idea. - I should use
runtime_error
instead oflogic_error
.
1 Answer 1
Formatting:
Please don't use two spaces for indent - it's really hard to visually match indentation levels. Either use 4 spaces, or an actual tab character (so that someone with bad eyes like me can set the spacing to whatever they need without having to reformat the code).
Some vertical spacing would also help readability.
Bugs:
I used a ~4MB random word list for input, and rapidly found the following:
Windows (or more specifically the C runtime library on Windows) limits the number of open files to 512 by default. We can increase this to 8192 with _setmaxstdio, but this may not be sufficient either. We probably need an algorithm that explicitly works with a specified file limit.
If we fail to open a temporary file (e.g. because we hit the file limit),
cleanTmpFiles
is called. However, this will not clean up any temporary files that we already have open (i.e. all the files up to the file limit).If the line length is greater than the byte limit, we silently produce nonsense output. With the random word list and the default 6000 byte limit, we get limits that are too small to be practically used (
sz = 3
,bz = 1
inkWayMergeAsync
). We should either emit an error if this happens, or accept lines longer than the limit when necessary.writeToStream
writes an empty line at the end of each temporary file. We should either avoid this, or changereadFromStream
to ignore empty lines.The sorted output contains some duplicate words where the input has no duplicates (I think it's a problem with the read function backtracking, though I haven't tracked it down exactly).
Buffering:
The OS does its own buffering for file I/O. I'm not convinced there's a benefit to doing manual buffering on top of that in the merge stage of the algorithm. (If we have strict memory limits, we can call setbuf / pubsetbuf to supply our own buffers).
I'd suggest starting with an unbuffered version, and examining how that performs before adding any extra complexity.
Complexity:
I agree with the reviewers that the code is over-complicated. I think mostly this is due to logic and state that should be hidden (in another function or object) being "inlined" into the higher-level algorithm. e.g. kWayMergeAsync
could look something like:
void kWayMergeAsync(const std::vector<istream_pointer> &streams,
const ostream_pointer &fout, size_t nbytes)
{
// ... declare limits as before ...
// class async_buffered_reader:
// holds a pair of buffers (current and future)
// hides the complexity of buffering and async reading
std::vector<async_buffered_reader> bufs;
for (size_t i = 0; i != n_chunks; ++i)
bufs.emplace_back(streams[i], bz);
// ... declare pq as before ...
for (size_t i = 0; i != n_chunks; ++i)
pq.push({ bufs.next(), i }); // next() does everything we need in terms of waiting, swapping buffers, launching async reads, etc.
// class async_buffered_writer:
// holds the output buffer
// hides the complexity of buffering and async writing
async_buffered_writer abw_out(fout, sz);
while (!pq.empty())
{
auto [nxt, i] = pq.top();
pq.pop();
abw_out.push(std::move(nxt)); // push() does buffering / writing as necessary.
if (!bufs[i].is_complete())
pq.push({ bufs.next(), i }); // again, next() does everything we need :)
}
// abw_out destructor will ensure we finish writing... so we don't need to do anything here!
}
Quite a rough sketch, but all the complexity that doesn't matter to the algorithm is hidden away in the async_buffered_reader
and async_buffered_writer
classes.
We could even implement other reader
and writer
classes (e.g. without buffering or with different async implementations) with the same interface. And, if we make kWayMerge
a template function, we could then switch to those types without changing the merge algorithm at all!
Algorithm:
Beyond the bugs mentioned, I don't think there's too much wrong with the algorithm.
I/O from disk is really slow compared to sorting, so minimizing the amount of reading and writing to disk (which your algorithm does) is a good thing.
However, it also means that there's not much point in the async I/O (at least with the current implementation). With a fixed memory limit, we always have to wait for a chunk to be fully read or written. An HDD / SSD can only read or write one thing at a time (at least over SATA).
So improvements we could make would probably focus on:
Working with specific hardware (do we have an M.2 SSD allowing multiple operations at once? do we have 8 drives to work with, or just one?).
Allowing the user to specify multiple drives / locations for the temporary files.
Creating one thread per location for I/O, instead of using
std::async
.Profiling the code (e.g. using minitrace and chrome://tracing), to optimize it for a specific scenario.
Misc.:
std::logic_error
is the wrong exception type to use here (std::runtime_error
is probably what we want).We don't need to wrap the file streams in
std::shared_ptr
.writeToStream
returnsint
for some reason? It could also use aconst&
when iterating.