@@ -34,6 +34,11 @@ pub(crate) struct Reactor {
34
34
poller : Poller ,
35
35
36
36
/// Ticker bumped before polling.
37
+ ///
38
+ /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
39
+ /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
40
+ /// methods must make sure they don't receive stale I/O events - they only accept events from a
41
+ /// fresh "round" of `ReactorLock::react()`.
37
42
ticker : AtomicUsize ,
38
43
39
44
/// Registered sources.
@@ -191,6 +196,9 @@ impl Reactor {
191
196
192
197
// Try grabbing a lock on the reactor to wait on I/O.
193
198
if let Some ( mut reactor_lock) = Reactor :: get ( ) . try_lock ( ) {
199
+ // Record the instant at which the lock was grabbed.
200
+ let start = Instant :: now ( ) ;
201
+
194
202
loop {
195
203
// First let wakers know this parker is blocked on I/O.
196
204
IO_POLLING . with ( |io| io. set ( true ) ) ;
@@ -200,7 +208,8 @@ impl Reactor {
200
208
io_blocked. store ( false , Ordering :: SeqCst ) ;
201
209
} ) ;
202
210
203
- // Check if a notification has been received.
211
+ // Check if a notification has been received before `io_blocked` was updated
212
+ // because in that case the reactor won't receive a wakeup.
204
213
if p. park_timeout ( Duration :: from_secs ( 0 ) ) {
205
214
break ;
206
215
}
@@ -212,6 +221,23 @@ impl Reactor {
212
221
if p. park_timeout ( Duration :: from_secs ( 0 ) ) {
213
222
break ;
214
223
}
224
+
225
+ // Check if this thread been handling I/O events for a long time.
226
+ if start. elapsed ( ) > Duration :: from_micros ( 500 ) {
227
+ // This thread is clearly processing I/O events for some other threads
228
+ // because it didn't get a notification yet. It's best to stop hogging the
229
+ // reactor and give other threads a chance to process I/O events for
230
+ // themselves.
231
+ drop ( reactor_lock) ;
232
+
233
+ // Unpark the "async-io" thread in case no other thread is ready to start
234
+ // processing I/O events. This way we prevent a potential latency spike.
235
+ self . thread_unparker . unpark ( ) ;
236
+
237
+ // Wait for a notification.
238
+ p. park ( ) ;
239
+ break ;
240
+ }
215
241
}
216
242
} else {
217
243
// Wait for an actual notification.
0 commit comments