I needed to do transformations on a data stream, not on an element-by-element basis, but on subsequences. This is what I've come up with: https://github.com/peterjoel/rust-iter-replace/blob/master/src/lib.rs
I am relatively new to Rust, and would love some feedback on any areas of the code: style, design, memory/performance no-nos. But performance (throughput) is particularly important in this application, as I'll be using it to "compress" very large (multi GB) pieces of data, in memory and on disc.
Design Overview
The central struct Replace
, keeps track of two buffers: buffer_out
and buffer_in
, a set of partial match candidates
and some other members for keeping track of state between invocations of next()
.
buffer_out
is data that is fully processed and ready to pass to the next iterator adapter - this will either contain unmatched data, or the full replacement sequence. buffer_in
contains data that may or may not match, and gets copied to buffer_out
as soon as it can be shown that it doesn't match and gets erased when it does. I chose a VecDeque
for buffer_out
because it generally gets written to at the back and read from at the start. As I write this, I realise that buffer_in
could have been just a Vec
. Maybe I'll change that.
The BTreeSet
, candidates
, keeps track of the index when the first item of a partial match occurred. As soon as a partial match no longer matches then it is discarded. I chose a BTreeSet
because I needed to access the smallest element to know when I can flush any part of buffer_in
. But actually its elements are added in size order - which I'm not taking advantage of here - so there could be a better data structure which can exploit that invariant.
The section of code where I remove elements from the candidates
set isn't great. I originally wrote a trait and had it as a method of BTreeSet
, but I had some errors (I forgot now) to do with the type signature, where I couldn't get it to match the expected type of the predicate used by filter
. I may revisit that.
Source
use std::collections::{BTreeSet};
use std::collections::VecDeque;
pub struct Replace <'a, I, T: 'a + Ord > {
iter: I,
buffer_out: VecDeque<T>,
buffer_in: VecDeque<T>,
replace_from: &'a [T],
replace_with: &'a [T],
candidates: BTreeSet<usize>,
index: usize,
flushed_index: usize,
}
impl <'a, I, T> Replace <'a, I, T> where
I: Iterator<Item = T>,
T: Eq + Ord + Copy {
pub fn adapt(iter: I, replace_from: &'a [T], replace_with: &'a [T]) -> Replace<'a, I, T> {
Replace {
iter: iter,
buffer_out: VecDeque::new(),
buffer_in: VecDeque::new(),
replace_from: replace_from,
replace_with: replace_with,
candidates: BTreeSet::new(),
index: 0,
flushed_index: 0,
}
}
fn fill_buffer(&mut self) {
'consume: while let Some(item) = self.iter.next() {
self.index += 1;
// buffer all incoming items
self.buffer_in.push_back(item);
// Prune existing partial match candidates that don't match the next item
let removes: Vec<_> = self.candidates.iter().cloned()
.filter(|start_index| {
self.replace_from[self.index - *start_index] != item
}).collect();
for r in removes {
self.candidates.remove(&r);
}
// Keep track of new partial match candidates
if self.replace_from[0] == item {
self.candidates.insert(self.index);
}
// if the length of the first match is the length of the replace sequence then it's a complete match
match self.candidates.iter().cloned()
.next()
.into_iter()
.find(|x| self.index - x + 1 == self.replace_from.len()) {
None => {
// We can flush the inbound buffer up to the first partial match
// (or the full buffer if there are no partial matches)
let flush_index = self.candidates.iter().next().map(|x| x - 1).unwrap_or(self.index);
if flush_index > self.flushed_index {
let mut flush: VecDeque<_> = self.buffer_in.drain(0 .. flush_index - self.flushed_index).collect();
self.buffer_out.append(&mut flush);
self.flushed_index = flush_index;
break 'consume;
}
},
Some(_) => {
// A match! So replace it and clear all the partial matches
self.candidates.clear();
for &x in self.replace_with.iter() {
self.buffer_out.push_back(x);
}
self.buffer_in.clear();
self.flushed_index = self.index;
break 'consume;
}
}
}
}
}
pub trait ReplaceIter<'a, I, T> where
I: Iterator<Item = T>,
T: Ord {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T>;
}
impl <'a, I, T> ReplaceIter<'a, I, T> for I where
I: Iterator<Item = T>,
T: Eq + Ord + Copy {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T> {
Replace::adapt(self, from, to)
}
}
impl <'a, I, T> Iterator for Replace <'a, I, T> where
I: Iterator<Item = T>,
T: Eq + Ord + Copy {
type Item = T;
fn next(&mut self) -> Option<T> {
if self.buffer_out.len() == 0 {
self.fill_buffer();
}
self.buffer_out.pop_front()
}
}
And tests, which show the usage:
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_replace_simple() {
let v: Vec<u32> = vec![1,2,3].into_iter().replace(&[2], &[10]).collect();
assert_eq!(v, vec![1,10,3]);
}
#[test]
pub fn test_replace_longer() {
let v: Vec<u32> = vec![3,4,5,6,7,8,9].into_iter().replace(&[4,5], &[100]).collect();
assert_eq!(v, vec![3,100,6,7,8,9]);
}
#[test]
pub fn test_replace_multi() {
let v: Vec<u32> = vec![3,4,5,6,4,5,9].into_iter().replace(&[4,5], &[100,200,300]).collect();
assert_eq!(v, vec![3,100,200,300,6,100,200,300,9]);
}
#[test]
pub fn test_nearly_match() {
let v: Vec<u32> = vec![3,4,5,6].into_iter().replace(&[4,5,1], &[100,200]).collect();
assert_eq!(v, vec![3,4,5,6]);
}
#[test]
pub fn test_replace_overlapping() {
let v: Vec<u32> = vec![3,4,5,4,5,4,9].into_iter().replace(&[4,5,4,5], &[100]).collect();
assert_eq!(v, vec![3,100,4,9]);
}
}
-
2\$\begingroup\$ This has been cross-posted to Reddit and received some answers there. \$\endgroup\$Veedrac– Veedrac2016年04月05日 10:11:49 +00:00Commented Apr 5, 2016 at 10:11
1 Answer 1
The 'consume
label is a bit strange; just return
instead.
This is a bit ugly:
match self.candidates.iter().cloned()
.next()
.into_iter()
.find(|x| self.index - x + 1 == self.replace_from.len()) {
None => {
...
},
Some(_) => {
...
}
}
You can simplify this with any
and splitting this onto a new line:
let complete_match = self.candidates.iter().cloned().next().into_iter()
.any(|x| self.index - x + 1 == self.replace_from.len());
if complete_match {
...
}
else {
...
}
The
.iter().cloned().next().into_iter().any(|x| self.index - x + 1 == self.replace_from.len())
part is ugly; using .into_iter().any()
on an Option
to test it like this is odd, and the cloned
is unneeded. How about
.first().map_or(false, |x| *index - x == replace_from.len() - 1);
The complete_match
case always returns, so we can drop the else
:
let full_match = self.candidates.first().map_or(false, |x|
*index - x == replace_from.len() - 1
);
if full_match {
// A match! So replace it and clear all the partial matches
self.candidates.clear();
for &x in self.replace_with.iter() {
self.buffer_out.push_back(x);
}
self.buffer_in.clear();
self.flushed_index = self.index;
return;
}
// We can flush the inbound buffer up to the first partial match
// (or the full buffer if there are no partial matches)
let flush_index = self.candidates.iter().next().map(|x| x - 1).unwrap_or(self.index);
if flush_index > self.flushed_index {
let mut flush: VecDeque<_> = self.buffer_in.drain(0 .. flush_index - self.flushed_index).collect();
self.buffer_out.append(&mut flush);
self.flushed_index = flush_index;
return;
}
Then
for &x in self.replace_with.iter() {
self.buffer_out.push_back(x);
}
is just
self.buffer_out.extend(self.replace_with);
Later
self.candidates.iter().next().map(|x| x - 1).unwrap_or(self.index)
is just
self.candidates.iter().next().map_or(self.index, |x| x - 1)
Idiomatic styling seems to be no spaces around ..
. So
self.buffer_in.drain(0 .. flush_index - self.flushed_index).collect()
should be
self.buffer_in.drain(..flush_index - self.flushed_index).collect()
That said, all of
let mut flush = self.buffer_in.drain(..flush_index - self.flushed_index).collect();
self.buffer_out.append(&mut flush);
is better as
let flush = self.buffer_in.drain(..flush_index - self.flushed_index);
self.buffer_out.extend(flush);
candidates
should be a Vec
, since you don't really use the fact it's a BTreeSet
usefully. The removal then looks like
self.candidates.retain(|start|
self.replace_from[self.index - *start] == item
);
...except for the fact that retain
borrows self
mutably so self.replace_from
doesn't work. This is fixable with a destructuring assignment.
{
let Replace { ref mut candidates, ref replace_from, index, .. } = *self;
candidates.retain(|start|
replace_from[index - *start] == item
);
}
Though at this point it might make sense to just do this at the start of the function. This adds a bit of length but simplifies things.
candidates
should really be inserted into before index
is incremented. This lets us work with more natural half-open ranges. We can also skip the conditional for candidates.push
by just reusing the if
in candidates.retain
.
let Replace {
ref mut iter,
ref mut buffer_out,
ref mut buffer_in,
replace_from,
replace_with,
ref mut candidates,
ref mut index,
ref mut flushed_index
} = *self;
for item in iter {
buffer_in.push_back(item);
candidates.push(*index);
candidates.retain(|&start| replace_from[*index - start] == item);
*index += 1;
let full_match = candidates.first().map_or(false, |start|
*index - start == replace_from.len()
);
if full_match {
// Replace it and clear all the partial matches
candidates.clear();
buffer_out.extend(replace_with);
buffer_in.clear();
*flushed_index = *index;
return;
}
// We can flush the inbound buffer up to the first partial match
// (or the full buffer if there are no partial matches)
let flush_index = candidates.iter().next().unwrap_or(index);
if flush_index > flushed_index {
let flush = buffer_in.drain(..flush_index - *flushed_index);
buffer_out.extend(flush);
*flushed_index = *flush_index;
return;
}
}
Later, we have
pub trait ReplaceIter<'a, I, T> where
I: Iterator<Item = T>,
T: Ord {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T>;
}
impl <'a, I, T> ReplaceIter<'a, I, T> for I where
I: Iterator<Item = T>,
T: Eq + Ord + Copy {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T> {
Replace::adapt(self, from, to)
}
}
which is nicer as
pub trait ReplaceIter<'a, I, T> {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T>;
}
impl <'a, I, T> ReplaceIter<'a, I, T> for I where
I: Iterator<Item=T>,
T: Eq + Clone
{
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T> {
Replace::adapt(self, from, to)
}
}
Note how we no longer need Ord
and Copy
was changed to Clone
. This requires minor changes to the code; we need to do this in the right order:
candidates.push(*index);
candidates.retain(|&start| replace_from[*index - start] == item);
buffer_in.push_back(item);
(else item
is moved by push_back
before retain
can use it) and we need to extend
cloned values:
buffer_out.extend(replace_with.iter().cloned());
Here's the code in entirety
use std::collections::VecDeque;
pub struct Replace <'a, I, T: 'a> {
iter: I,
buffer_out: VecDeque<T>,
buffer_in: VecDeque<T>,
replace_from: &'a [T],
replace_with: &'a [T],
candidates: Vec<usize>,
index: usize,
flushed_index: usize,
}
impl <'a, I, T> Replace <'a, I, T> where
I: Iterator<Item = T>,
T: Eq + Clone
{
pub fn adapt(iter: I, replace_from: &'a [T], replace_with: &'a [T]) -> Replace<'a, I, T> {
Replace {
iter: iter,
buffer_out: VecDeque::new(),
buffer_in: VecDeque::new(),
replace_from: replace_from,
replace_with: replace_with,
candidates: Vec::new(),
index: 0,
flushed_index: 0,
}
}
fn fill_buffer(&mut self) {
let Replace {
ref mut iter,
ref mut buffer_out,
ref mut buffer_in,
replace_from,
replace_with,
ref mut candidates,
ref mut index,
ref mut flushed_index
} = *self;
for item in iter {
candidates.push(*index);
candidates.retain(|&start| replace_from[*index - start] == item);
buffer_in.push_back(item);
*index += 1;
let full_match = candidates.first().map_or(false, |start|
*index - start == replace_from.len()
);
if full_match {
// Replace it and clear all the partial matches
candidates.clear();
buffer_out.extend(replace_with.iter().cloned());
buffer_in.clear();
*flushed_index = *index;
return;
}
// We can flush the inbound buffer up to the first partial match
// (or the full buffer if there are no partial matches)
let flush_index = candidates.iter().next().unwrap_or(index);
if flush_index > flushed_index {
let flush = buffer_in.drain(..flush_index - *flushed_index);
buffer_out.extend(flush);
*flushed_index = *flush_index;
return;
}
}
}
}
pub trait ReplaceIter<'a, I, T> {
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T>;
}
impl<'a, I, T> ReplaceIter<'a, I, T> for I where
I: Iterator<Item=T>,
T: Eq + Clone
{
fn replace(self, from: &'a [T], to: &'a [T]) -> Replace<'a, I, T> {
Replace::adapt(self, from, to)
}
}
impl<'a, I, T> Iterator for Replace<'a, I, T> where
I: Iterator<Item=T>,
T: Eq + Clone
{
type Item = T;
fn next(&mut self) -> Option<T> {
if self.buffer_out.len() == 0 {
self.fill_buffer();
}
self.buffer_out.pop_front()
}
}