Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e99eafe

Browse files
author
James Munns
authored
Merge pull request #132 from shekohex/stream-all-method
Stream::all implementation
2 parents a4d2cd1 + e517c60 commit e99eafe

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

‎src/stream/stream.rs‎

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use cfg_if::cfg_if;
2727

2828
use crate::future::Future;
2929
use crate::task::{Context, Poll};
30+
use std::marker::PhantomData;
3031

3132
cfg_if! {
3233
if #[cfg(feature = "docs")] {
@@ -111,6 +112,63 @@ pub trait Stream {
111112
remaining: n,
112113
}
113114
}
115+
116+
/// Tests if every element of the stream matches a predicate.
117+
///
118+
/// `all()` takes a closure that returns `true` or `false`. It applies
119+
/// this closure to each element of the stream, and if they all return
120+
/// `true`, then so does `all()`. If any of them return `false`, it
121+
/// returns `false`.
122+
///
123+
/// `all()` is short-circuiting; in other words, it will stop processing
124+
/// as soon as it finds a `false`, given that no matter what else happens,
125+
/// the result will also be `false`.
126+
///
127+
/// An empty stream returns `true`.
128+
///
129+
/// # Examples
130+
///
131+
/// Basic usage:
132+
///
133+
/// ```
134+
/// # fn main() { async_std::task::block_on(async {
135+
/// #
136+
/// use async_std::prelude::*;
137+
/// use async_std::stream;
138+
///
139+
/// let mut s = stream::repeat::<u32>(42).take(3);
140+
/// assert!(s.all(|x| x == 42).await);
141+
///
142+
/// #
143+
/// # }) }
144+
/// ```
145+
///
146+
/// Empty stream:
147+
///
148+
/// ```
149+
/// # fn main() { async_std::task::block_on(async {
150+
/// #
151+
/// use async_std::prelude::*;
152+
/// use async_std::stream;
153+
///
154+
/// let mut s = stream::empty::<u32>();
155+
/// assert!(s.all(|_| false).await);
156+
/// #
157+
/// # }) }
158+
/// ```
159+
#[inline]
160+
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item>
161+
where
162+
Self: Sized,
163+
F: FnMut(Self::Item) -> bool,
164+
{
165+
AllFuture {
166+
stream: self,
167+
result: true,
168+
__item: PhantomData,
169+
f,
170+
}
171+
}
114172
}
115173

116174
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
@@ -168,3 +226,50 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
168226
}
169227
}
170228
}
229+
230+
#[derive(Debug)]
231+
pub struct AllFuture<'a, S, F, T>
232+
where
233+
F: FnMut(T) -> bool,
234+
{
235+
stream: &'a mut S,
236+
f: F,
237+
result: bool,
238+
__item: PhantomData<T>,
239+
}
240+
241+
impl<'a, S, F, T> AllFuture<'a, S, F, T>
242+
where
243+
F: FnMut(T) -> bool,
244+
{
245+
pin_utils::unsafe_pinned!(stream: &'a mut S);
246+
pin_utils::unsafe_unpinned!(result: bool);
247+
pin_utils::unsafe_unpinned!(f: F);
248+
}
249+
250+
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
251+
where
252+
S: futures::Stream + Unpin + Sized,
253+
F: FnMut(S::Item) -> bool,
254+
{
255+
type Output = bool;
256+
257+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
258+
use futures::Stream;
259+
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
260+
match next {
261+
Some(v) => {
262+
let result = (self.as_mut().f())(v);
263+
*self.as_mut().result() = result;
264+
if result {
265+
// don't forget to wake this task again to pull the next item from stream
266+
cx.waker().wake_by_ref();
267+
Poll::Pending
268+
} else {
269+
Poll::Ready(false)
270+
}
271+
}
272+
None => Poll::Ready(self.result),
273+
}
274+
}
275+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /