1
\$\begingroup\$

A parallel map written in Rust. I am new to Rust and wondering if there is things that could be done better or more efficiently.

use crossbeam_channel::unbounded;
use std::{thread, time};
fn parallel_map<T, U, F>(mut input_vec: Vec<T>, num_threads: usize, f: F) -> Vec<U>
where
 F: FnOnce(T) -> U + Send + Copy + 'static,
 T: Send + 'static,
 U: Send + 'static + Default,
{
 let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len());
 let mut threads = Vec::new();
 let (in_s, in_r) = unbounded::<(T, usize)>();
 let (out_s, out_r) = unbounded::<(U, usize)>();
 for _ in 0..num_threads {
 let in_r = in_r.clone();
 let out_s = out_s.clone();
 threads.push(thread::spawn(move || {
 while let Ok((value, index)) = in_r.recv() {
 let res = f(value);
 out_s.send((res, index)).expect("Failed to send");
 }
 }));
 }
 while let Some(val) = input_vec.pop() {
 in_s.send((val, input_vec.len())).expect("Failed to send");
 }
 drop(in_s);
 drop(out_s);
 let mut collect_results: Vec<(U, usize)> = Vec::with_capacity(output_vec.capacity());
 while let Ok(res) = out_r.recv() {
 collect_results.push(res);
 }
 collect_results.sort_by(|(_, a_index), (_, b_index)| a_index.partial_cmp(b_index).unwrap());
 output_vec.extend(collect_results.into_iter().map(|(val, _)| val));
 for thread in threads {
 thread.join().expect("Failed to join thread");
 }
 output_vec
}
fn main() {
 let v = vec![6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 12, 18, 11, 5, 20];
 let squares = parallel_map(v, 10, |num| {
 println!("{} squared is {}", num, num * num);
 thread::sleep(time::Duration::from_millis(500));
 num * num
 });
 println!("squares: {:?}", squares);
}
```
asked Jan 23, 2021 at 1:42
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

The crate rayon implements similar functionality:

use rayon::prelude::*;
let mut par_iter = (0..5).into_par_iter().map(|x| x * x);
let squares: Vec<_> = par_iter.collect();

In your solution, the multithreading part looks OK.

The part where you collect results could be done more efficiently. Specifically, one alloc and data copy could be avoided with use of a little unsafety.

let mut output_vec: Vec<U> = Vec::with_capacity(initial_len);
unsafe {
 output_vec.set_len(initial_len);
}
while let Ok((res, index)) = out_r.recv() {
 let old = mem::replace(&mut output_vec[index], res);
 mem::forget(old);
}

Same optimization could be done a little more succintly with ptr::write. Further, you may need to assert that output_vec has correct length, and forget the vec beforehand in case the assertion fails. My concern is that above proposed code is not protected against incorrect drops if U has a destructor and code panics.

Your bound of U as Default appears to be unnecessary.

answered Jan 23, 2021 at 12:49
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.