- 
  Notifications
 You must be signed in to change notification settings 
- Fork 128
[Question] Parallelizing over a list of PyArrays with rayon #363
-
I have a simple extension which operates on a single PyReadonlyArray1<'_, f64> and now I want to let it take a list of such arrays and apply the same function to all of them, in parallel. The serial version of a simplified example (I'm not really implementing cumsum) is below:
#[pymodule] fn repro(_py: Python<'_>, m: &PyModule) -> PyResult<()> { fn cumsum(array: ArrayView1<'_, f64>) -> Array1<f64> { let mut total = 0.; Array1::from_iter( array .as_slice() .expect("input not contiguous") .iter() .map(|v| { total += v; total }) ) } #[pyfn(m)] #[pyo3(name = "cumsum_single")] fn cumsum_single_py<'py>( py: Python<'py>, array: PyReadonlyArray1<'_, f64>, ) -> &'py PyArray1<f64> { cumsum(array.as_array()).into_pyarray(py) } #[pyfn(m)] #[pyo3(name = "cumsum_many_sequential")] fn cumsum_many_sequential_py<'py>( py: Python<'py>, arrays: Vec<PyReadonlyArray1<'_, f64>>, ) -> Vec<&'py PyArray1<f64>> { arrays.into_iter().map(|arr| cumsum_single_py(py, arr)).collect() } Ok(()) }
The problem is when I try to use rayon and turn into_iter to into_par_iter. The compiler complains as follows:
error[E0599]: the method `into_par_iter` exists for struct `Vec<PyReadonlyArray<'_, f64, Dim<[usize; 1]>>>`, but its trait bounds were not satisfied
 --> src/lib.rs:46:16
 |
46 | arrays.into_par_iter().map(|arr| cumsum_single_py(py, arr)).collect()
 | ^^^^^^^^^^^^^ method cannot be called on `Vec<PyReadonlyArray<'_, f64, Dim<[usize; 1]>>>` due to unsatisfied trait bounds
 |
 = note: the following trait bounds were not satisfied:
 `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: Sized`
 which is required by `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::IntoParallelIterator`
 `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::ParallelIterator`
 which is required by `[PyReadonlyArray<'_, f64, Dim<[usize; 1]>>]: rayon::iter::IntoParallelIterator`
The constraints look a bit weird on the rayon side (to my best understanding, [T]: !Sized regardless of T), but perhaps there's a workaround? What I arrived at is below:
#[pyfn(m)] #[pyo3(name = "cumsum_many_rayon")] fn cumsum_many_rayon_py<'py>( py: Python<'py>, arrays: Vec<PyReadonlyArray1<'_, f64>>, ) -> Vec<&'py PyArray1<f64>> { let arrays: Vec<_> = arrays .iter() .map(|pa| pa.as_array()) .collect(); // first collect: for some reason cannot send PyReadonlyArray<_, _>, // with ArrayBase<ViewRepr<_>, _> it works. But they hold references // in a way that forces me to materialize a vector, instead of using // par_bridge() directly let results: Vec<_> = arrays .into_par_iter() .map(cumsum) .collect(); // second collect: need to turn the parallel iterator back to sequential // for into_pyarray results .into_iter() .map(|result| result.into_pyarray(py)) .collect() // third collect: to create the actual returned Python list }
This solution uses three individual collect calls. It is unclear to me how much overhead that is: are those structs all just holding references or am I copying big arrays of input data? Is there a better way to achieve my goal?
Beta Was this translation helpful? Give feedback.
All reactions
are those structs all just holding references or am I copying big arrays of input data?
You are just copying references, not the underlying data.
Is there a better way to achieve my goal?
The problem when using Rayon here is that PyReadonlyArray<'py, T, D>: !Send as it basically wraps a &'py PyArray<T, D> which is not Send either as it is locked to the lifetime 'py for which the GIL is held by the current thread.
At the moment I don't see a better way to achieve what you need. I suspect that the overhead of the three calls to collect should be manageable if what you are doing instead of cumsum is sufficiently expensive.
One alternative that would however require a different signature ...
Replies: 2 comments 1 reply
-
are those structs all just holding references or am I copying big arrays of input data?
You are just copying references, not the underlying data.
Is there a better way to achieve my goal?
The problem when using Rayon here is that PyReadonlyArray<'py, T, D>: !Send as it basically wraps a &'py PyArray<T, D> which is not Send either as it is locked to the lifetime 'py for which the GIL is held by the current thread.
At the moment I don't see a better way to achieve what you need. I suspect that the overhead of the three calls to collect should be manageable if what you are doing instead of cumsum is sufficiently expensive.
One alternative that would however require a different signature and would preclude the arrays from having different lengths would be to take a PyReadonlyArray2 instead and operate on that using e.g. Zip::from(arrays.rows()).par_map_collect(...) or even directly on the two-dimensional structure if that is possible.
Beta Was this translation helpful? Give feedback.
All reactions
-
One thing you could maybe do to merge the second and third calls to collect would be to temporarily release the GIL for the parallel part and acquire it from the thread pool to perform the conversion using from Array1<..> to Py<PyArray1<..>, e.g.
#[pyfn(m)] #[pyo3(name = "cumsum_many_rayon")] fn cumsum_many_rayon_py<'py>( py: Python<'py>, arrays: Vec<PyReadonlyArray1<'_, f64>>, ) -> Vec<Py<PyArray1<f64>>> { let arrays: Vec<_> = arrays.iter().map(|pa| pa.as_array()).collect(); py.allow_threads(|| { arrays .into_par_iter() .map(|array| { let array = cumsum(array); Python::with_gil(move |py| array.into_pyarray(py).into()) }) .collect() }) }
But honestly speaking, I think the approach using three calls to collect is preferable and I suspect is also faster as it avoids to bump the GIL around the thread pool.
Beta Was this translation helpful? Give feedback.
All reactions
-
Thanks a lot for your quick reply, it's good to know I have not strayed too far from what is reasonable :)
Beta Was this translation helpful? Give feedback.