Problem: In order to be able to use the arrow
crate's infer_file_schema
function with input piped to /dev/stdin
, I need to manually implement the Seek
trait on a Reader, as it doesn't exist for piped /dev/stdin/
by default.
The function that uses that Reader reads n
lines, infers the schema from these n
lines, then seeks back to the start of the "file".
So I don't need to buffer more than these n
lines.
This is what I wrote:
struct SeekableReader<R: std::io::Read> {
inner: R, // underlying reader
buffer: Vec<u8>, // buffer for the first n lines
buffer_size: usize, // size of the buffer in bytes
pos: usize, // current position in the buffer
seekable: bool, // whether seek is still possible
}
const BUFFER_SIZE: usize = 8192;
impl<R: std::io::Read> SeekableReader<R> {
fn new(reader: R, lines_to_buffer: Option<usize>) -> Self {
let mut inner = reader;
let mut buffer = Vec::<u8>::with_capacity(BUFFER_SIZE);
let mut lines = 0;
let mut bytes_read = 0;
let mut bytes_before;
loop {
bytes_before = bytes_read;
buffer.append(&mut vec![0; BUFFER_SIZE - (buffer.len() - bytes_read)]);
bytes_read += inner
.read(&mut buffer[bytes_read..bytes_read + BUFFER_SIZE])
.unwrap();
lines += buffer[bytes_before..bytes_read]
.iter()
.filter(|&&x| x == 10)
.count();
if let Some(lines_to_buffer) = lines_to_buffer {
// +1 because there may be a header
if lines > lines_to_buffer + 1 {
break;
}
}
if bytes_read == 0 {
break;
}
}
SeekableReader {
inner,
buffer,
buffer_size: bytes_read,
pos: 0,
seekable: true,
}
}
}
impl<R: std::io::Read> std::io::Read for SeekableReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let buf_len = buf.len();
if self.pos < self.buffer_size {
if self.buffer_size - self.pos < buf_len {
buf[..self.buffer_size - self.pos]
.copy_from_slice(&self.buffer[self.pos..self.buffer_size]);
let len_read = self.buffer_size - self.pos;
self.pos = self.buffer_size;
Ok(len_read)
} else {
buf.copy_from_slice(&self.buffer[self.pos..self.pos + buf_len]);
self.pos += buf_len;
Ok(buf_len)
}
} else {
self.seekable = false;
self.inner.read(buf)
}
}
}
impl<R: std::io::Read> std::io::Seek for SeekableReader<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
let error = Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Seeking outside of buffer, please report to https://github.com/domoritz/arrow-tools/issues/new".to_string(),
));
if self.seekable {
match pos {
SeekFrom::Start(pos) => {
if pos >= self.buffer_size as u64 {
error
} else {
self.pos = pos as usize;
Ok(pos)
}
}
SeekFrom::Current(pos) => {
let new_pos = self.pos as i64 + pos;
if 0 <= new_pos && new_pos < self.buffer_size as i64 {
self.pos = new_pos as usize;
Ok(new_pos as u64)
} else {
error
}
}
SeekFrom::End(_) => error,
}
} else {
error
}
}
}
You can see this struct in action here: https://github.com/domoritz/arrow-tools/pull/10/files
1 Answer 1
I think you could adopt a simpler approach.
- Instead of using
infer_file_schema
useinfer_reader_schema
which doesn't requireSeek
. - Wrap the
File
in aTee
struct that stores copies of everything read from the file in aVec<u8>
. .chain()
that buffer and the file after inferring the schema to get the whole file.
Let's look at your current code:
struct SeekableReader<R: std::io::Read> {
The convention in Rust is typically to avoid putting restrictions on the struct
itself.
inner: R, // underlying reader
buffer: Vec<u8>, // buffer for the first n lines
buffer_size: usize, // size of the buffer in bytes
A Vec
already has both a capacity and length. It seems suspect that you need another one.
pos: usize, // current position in the buffer
seekable: bool, // whether seek is still possible
}
const BUFFER_SIZE: usize = 8192;
impl<R: std::io::Read> SeekableReader<R> {
fn new(reader: R, lines_to_buffer: Option<usize>) -> Self {
let mut inner = reader;
let mut buffer = Vec::<u8>::with_capacity(BUFFER_SIZE);
It might be an idea to consider using BufReader
and call its read_until
method several times to get each line, use buffer
to pull out the remaining buffer, and then into_inner
to get back the original file.
let mut lines = 0;
let mut bytes_read = 0;
let mut bytes_before;
Just let
the bytes_before be in the loop. There's no reason to have it outside of the loop.
loop {
bytes_before = bytes_read;
buffer.append(&mut vec![0; BUFFER_SIZE - (buffer.len() - bytes_read)]);
bytes_read += inner
.read(&mut buffer[bytes_read..bytes_read + BUFFER_SIZE])
.unwrap();
lines += buffer[bytes_before..bytes_read]
.iter()
.filter(|&&x| x == 10)
.count();
if let Some(lines_to_buffer) = lines_to_buffer {
// +1 because there may be a header
if lines > lines_to_buffer + 1 {
break;
}
}
if bytes_read == 0 {
break;
}
I think this is incorrect, because it looks at the total bytes read. I think you need to look at the bytes read in the last invocation of read
to determine if that was zero and thus there is nothing more to read.
}
SeekableReader {
inner,
buffer,
buffer_size: bytes_read,
pos: 0,
seekable: true,
}
}
}
impl<R: std::io::Read> std::io::Read for SeekableReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
You could simplify your implementation if you put the buffer you assembled in a std::io::Cursor
. Basically, you should be able to read from the cursor, and if it returns nothing, then you can read from the file.
let buf_len = buf.len();
I'm not sure why you copy the length like this.
if self.pos < self.buffer_size {
if self.buffer_size - self.pos < buf_len {
buf[..self.buffer_size - self.pos]
.copy_from_slice(&self.buffer[self.pos..self.buffer_size]);
let len_read = self.buffer_size - self.pos;
self.pos = self.buffer_size;
Ok(len_read)
} else {
buf.copy_from_slice(&self.buffer[self.pos..self.pos + buf_len]);
self.pos += buf_len;
Ok(buf_len)
}
I think you should be able to construct len_read
as the minimum of the buffer length and the remaining bytes and then combine these two cases.
} else {
self.seekable = false;
self.inner.read(buf)
}
}
}
impl<R: std::io::Read> std::io::Seek for SeekableReader<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
let error = Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Seeking outside of buffer, please report to https://github.com/domoritz/arrow-tools/issues/new".to_string(),
));
It's a little odd to build an error and then usually not return it. Especially true since this allocates by constructing a String. Probably Rust optimizes that out though. Nevertheless, I'd move it to a function or something to build the error and call it in the right places.
if self.seekable {
match pos {
It seems to me that in practice, you only need to support the narrow case of seeking back to the start of the file. So I'd just support that precise case.
SeekFrom::Start(pos) => {
if pos >= self.buffer_size as u64 {
error
} else {
self.pos = pos as usize;
Ok(pos)
}
}
SeekFrom::Current(pos) => {
let new_pos = self.pos as i64 + pos;
if 0 <= new_pos && new_pos < self.buffer_size as i64 {
self.pos = new_pos as usize;
Ok(new_pos as u64)
} else {
error
}
}
SeekFrom::End(_) => error,
}
} else {
error
}
}
}