Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 99e0932

Browse files
feat(pool): add a Singleton pool type
1 parent 3021828 commit 99e0932

File tree

4 files changed

+307
-0
lines changed

4 files changed

+307
-0
lines changed

‎Cargo.toml‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4242
http-body-util = "0.1.0"
4343
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4444
tokio-test = "0.4"
45+
tower-test = "0.4"
4546
pretty_env_logger = "0.5"
4647

4748
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
@@ -60,6 +61,7 @@ default = []
6061
full = [
6162
"client",
6263
"client-legacy",
64+
"client-pool",
6365
"client-proxy",
6466
"client-proxy-system",
6567
"server",
@@ -74,6 +76,7 @@ full = [
7476

7577
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7678
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
79+
client-pool = []
7780
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7881
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
7982

‎src/client/mod.rs‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
#[cfg(feature = "client-legacy")]
55
pub mod legacy;
66

7+
#[cfg(feature = "client-pool")]
8+
pub mod pool;
9+
710
#[cfg(feature = "client-proxy")]
811
pub mod proxy;

‎src/client/pool/mod.rs‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//! Composable pool services
2+
3+
mod singleton;
4+
5+
pub use self::singleton::Singleton;

‎src/client/pool/singleton.rs‎

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::sync::{Arc, Mutex, Weak};
4+
use std::task::{self, Poll};
5+
6+
use futures_core::ready;
7+
use pin_project_lite::pin_project;
8+
use tokio::sync::oneshot;
9+
use tower_service::Service;
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
12+
13+
/// A singleton pool over an inner service.
14+
#[derive(Clone, Debug)]
15+
pub struct Singleton<M, Dst>
16+
where
17+
M: Service<Dst>,
18+
{
19+
mk_svc: M,
20+
state: Arc<Mutex<State<M::Response>>>,
21+
}
22+
23+
pin_project! {
24+
#[project = SingletonFutureProj]
25+
pub enum SingletonFuture<F, S> {
26+
Driving {
27+
#[pin]
28+
future: F,
29+
singleton: DitchGuard<S>,
30+
},
31+
Waiting {
32+
rx: oneshot::Receiver<S>,
33+
},
34+
Made {
35+
svc: Option<S>,
36+
},
37+
}
38+
}
39+
40+
// XXX: pub because of the enum SingletonFuture
41+
pub struct DitchGuard<S>(Weak<Mutex<State<S>>>);
42+
43+
#[derive(Debug)]
44+
enum State<S> {
45+
Empty,
46+
Making(Vec<oneshot::Sender<S>>),
47+
Made(S),
48+
}
49+
50+
impl<M, Target> Singleton<M, Target>
51+
where
52+
M: Service<Target>,
53+
M::Response: Clone,
54+
{
55+
/// Create a new singleton pool over an inner make service.
56+
pub fn new(mk_svc: M) -> Self {
57+
Singleton {
58+
mk_svc,
59+
state: Arc::new(Mutex::new(State::Empty)),
60+
}
61+
}
62+
63+
// pub fn reset?
64+
// pub fn retain?
65+
}
66+
67+
impl<M, Target> Service<Target> for Singleton<M, Target>
68+
where
69+
M: Service<Target>,
70+
M::Response: Clone,
71+
M::Error: Into<BoxError>,
72+
{
73+
type Response = M::Response;
74+
type Error = SingletonError;
75+
type Future = SingletonFuture<M::Future, M::Response>;
76+
77+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
78+
if let State::Empty = *self.state.lock().unwrap() {
79+
return self
80+
.mk_svc
81+
.poll_ready(cx)
82+
.map_err(|e| SingletonError(e.into()));
83+
}
84+
Poll::Ready(Ok(()))
85+
}
86+
87+
fn call(&mut self, dst: Target) -> Self::Future {
88+
let mut locked = self.state.lock().unwrap();
89+
match *locked {
90+
State::Empty => {
91+
let fut = self.mk_svc.call(dst);
92+
*locked = State::Making(Vec::new());
93+
SingletonFuture::Driving {
94+
future: fut,
95+
singleton: DitchGuard(Arc::downgrade(&self.state)),
96+
}
97+
}
98+
State::Making(ref mut waiters) => {
99+
let (tx, rx) = oneshot::channel();
100+
waiters.push(tx);
101+
SingletonFuture::Waiting { rx }
102+
}
103+
State::Made(ref svc) => SingletonFuture::Made {
104+
svc: Some(svc.clone()),
105+
},
106+
}
107+
}
108+
}
109+
110+
impl<F, S, E> Future for SingletonFuture<F, S>
111+
where
112+
F: Future<Output = Result<S, E>>,
113+
E: Into<BoxError>,
114+
S: Clone,
115+
{
116+
type Output = Result<S, SingletonError>;
117+
118+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
119+
match self.project() {
120+
SingletonFutureProj::Driving { future, singleton } => {
121+
match ready!(future.poll(cx)) {
122+
Ok(svc) => {
123+
if let Some(state) = singleton.0.upgrade() {
124+
let mut locked = state.lock().unwrap();
125+
singleton.0 = Weak::new();
126+
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
127+
State::Making(waiters) => {
128+
for tx in waiters {
129+
let _ = tx.send(svc.clone());
130+
}
131+
}
132+
State::Empty | State::Made(_) => {
133+
// shouldn't happen!
134+
}
135+
}
136+
}
137+
Poll::Ready(Ok(svc))
138+
}
139+
Err(e) => {
140+
if let Some(state) = singleton.0.upgrade() {
141+
let mut locked = state.lock().unwrap();
142+
singleton.0 = Weak::new();
143+
*locked = State::Empty;
144+
}
145+
Poll::Ready(Err(SingletonError(e.into())))
146+
}
147+
}
148+
}
149+
SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) {
150+
Ok(svc) => Poll::Ready(Ok(svc)),
151+
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
152+
},
153+
SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())),
154+
}
155+
}
156+
}
157+
158+
impl<S> Drop for DitchGuard<S> {
159+
fn drop(&mut self) {
160+
if let Some(state) = self.0.upgrade() {
161+
if let Ok(mut locked) = state.lock() {
162+
*locked = State::Empty;
163+
}
164+
}
165+
}
166+
}
167+
168+
// An opaque error type. By not exposing the type, nor being specifically
169+
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
170+
// error type. This will be possible with the refactor to baton passing.
171+
#[derive(Debug)]
172+
pub struct SingletonError(BoxError);
173+
174+
impl std::fmt::Display for SingletonError {
175+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176+
f.write_str("singleton connection error")
177+
}
178+
}
179+
180+
impl std::error::Error for SingletonError {
181+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
182+
Some(&*self.0)
183+
}
184+
}
185+
186+
#[derive(Debug)]
187+
struct Canceled;
188+
189+
impl std::fmt::Display for Canceled {
190+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191+
f.write_str("singleton connection canceled")
192+
}
193+
}
194+
195+
impl std::error::Error for Canceled {}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use std::future::Future;
200+
use std::pin::Pin;
201+
use std::task::Poll;
202+
203+
use tower_service::Service;
204+
205+
use super::Singleton;
206+
207+
#[tokio::test]
208+
async fn first_call_drives_subsequent_wait() {
209+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
210+
211+
let mut singleton = Singleton::new(mock_svc);
212+
213+
handle.allow(1);
214+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
215+
.await
216+
.unwrap();
217+
// First call: should go into Driving
218+
let fut1 = singleton.call(());
219+
// Second call: should go into Waiting
220+
let fut2 = singleton.call(());
221+
222+
// Expect exactly one request to the inner service
223+
let ((), send_response) = handle.next_request().await.unwrap();
224+
send_response.send_response("svc");
225+
226+
// Both futures should resolve to the same value
227+
assert_eq!(fut1.await.unwrap(), "svc");
228+
assert_eq!(fut2.await.unwrap(), "svc");
229+
}
230+
231+
#[tokio::test]
232+
async fn made_state_returns_immediately() {
233+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
234+
let mut singleton = Singleton::new(mock_svc);
235+
236+
handle.allow(1);
237+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
238+
.await
239+
.unwrap();
240+
// Drive first call to completion
241+
let fut1 = singleton.call(());
242+
let ((), send_response) = handle.next_request().await.unwrap();
243+
send_response.send_response("svc");
244+
assert_eq!(fut1.await.unwrap(), "svc");
245+
246+
// Second call should not hit inner service
247+
let res = singleton.call(()).await.unwrap();
248+
assert_eq!(res, "svc");
249+
}
250+
251+
#[tokio::test]
252+
async fn cancel_waiter_does_not_affect_others() {
253+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
254+
let mut singleton = Singleton::new(mock_svc);
255+
256+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
257+
.await
258+
.unwrap();
259+
let fut1 = singleton.call(());
260+
let fut2 = singleton.call(());
261+
drop(fut2); // cancel one waiter
262+
263+
let ((), send_response) = handle.next_request().await.unwrap();
264+
send_response.send_response("svc");
265+
266+
assert_eq!(fut1.await.unwrap(), "svc");
267+
}
268+
269+
// TODO: this should be able to be improved with a cooperative baton refactor
270+
#[tokio::test]
271+
async fn cancel_driver_cancels_all() {
272+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
273+
let mut singleton = Singleton::new(mock_svc);
274+
275+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
276+
.await
277+
.unwrap();
278+
let mut fut1 = singleton.call(());
279+
let fut2 = singleton.call(());
280+
281+
// poll driver just once, and then drop
282+
crate::common::future::poll_fn(move |cx| {
283+
let _ = Pin::new(&mut fut1).poll(cx);
284+
Poll::Ready(())
285+
})
286+
.await;
287+
288+
let ((), send_response) = handle.next_request().await.unwrap();
289+
send_response.send_response("svc");
290+
291+
assert_eq!(
292+
fut2.await.unwrap_err().0.to_string(),
293+
"singleton connection canceled"
294+
);
295+
}
296+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /