Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 6e23095

Browse files
committed
Auto merge of #140145 - Zoxc:job-server-proxy, r=SparrowLii
Add a jobserver proxy to ensure at least one token is always held This adds a jobserver proxy to ensure at least one token is always held by `rustc`. Currently with `-Z threads` `rustc` can temporarily give up all its tokens, causing `cargo` to spawn additional `rustc` instances beyond the job limit. The current behavior causes an issue with `cargo fix` which has a global lock preventing concurrent `rustc` instances, but it also holds a jobserver token, causing a deadlock when `rustc` gives up its token. That is fixed by this PR. Fixes #67385. Fixes #133873. Fixes #140093.
2 parents 0c33fe2 + 08b27ff commit 6e23095

File tree

10 files changed

+156
-29
lines changed

10 files changed

+156
-29
lines changed

‎compiler/rustc_data_structures/src/jobserver.rs‎

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use std::sync::{LazyLock, OnceLock};
1+
use std::sync::{Arc,LazyLock, OnceLock};
22

33
pub use jobserver_crate::{Acquired, Client, HelperThread};
44
use jobserver_crate::{FromEnv, FromEnvErrorKind};
5+
use parking_lot::{Condvar, Mutex};
56

67
// We can only call `from_env_ext` once per process
78

@@ -71,10 +72,93 @@ pub fn client() -> Client {
7172
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
7273
}
7374

74-
pub fn acquire_thread() {
75-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
75+
struct ProxyData {
76+
/// The number of tokens assigned to threads.
77+
/// If this is 0, a single token is still assigned to this process, but is unused.
78+
used: u16,
79+
80+
/// The number of threads requesting a token
81+
pending: u16,
82+
}
83+
84+
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
85+
pub struct Proxy {
86+
client: Client,
87+
data: Mutex<ProxyData>,
88+
89+
/// Threads which are waiting on a token will wait on this.
90+
wake_pending: Condvar,
91+
92+
helper: OnceLock<HelperThread>,
7693
}
7794

78-
pub fn release_thread() {
79-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
95+
impl Proxy {
96+
pub fn new() -> Arc<Self> {
97+
let proxy = Arc::new(Proxy {
98+
client: client(),
99+
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100+
wake_pending: Condvar::new(),
101+
helper: OnceLock::new(),
102+
});
103+
let proxy_ = Arc::clone(&proxy);
104+
let helper = proxy
105+
.client
106+
.clone()
107+
.into_helper_thread(move |token| {
108+
if let Ok(token) = token {
109+
let mut data = proxy_.data.lock();
110+
if data.pending > 0 {
111+
// Give the token to a waiting thread
112+
token.drop_without_releasing();
113+
assert!(data.used > 0);
114+
data.used += 1;
115+
data.pending -= 1;
116+
proxy_.wake_pending.notify_one();
117+
} else {
118+
// The token is no longer needed, drop it.
119+
drop(data);
120+
drop(token);
121+
}
122+
}
123+
})
124+
.expect("failed to create helper thread");
125+
proxy.helper.set(helper).unwrap();
126+
proxy
127+
}
128+
129+
pub fn acquire_thread(&self) {
130+
let mut data = self.data.lock();
131+
132+
if data.used == 0 {
133+
// There was a free token around. This can
134+
// happen when all threads release their token.
135+
assert_eq!(data.pending, 0);
136+
data.used += 1;
137+
} else {
138+
// Request a token from the helper thread. We can't directly use `acquire_raw`
139+
// as we also need to be able to wait for the final token in the process which
140+
// does not get a corresponding `release_raw` call.
141+
self.helper.get().unwrap().request_token();
142+
data.pending += 1;
143+
self.wake_pending.wait(&mut data);
144+
}
145+
}
146+
147+
pub fn release_thread(&self) {
148+
let mut data = self.data.lock();
149+
150+
if data.pending > 0 {
151+
// Give the token to a waiting thread
152+
data.pending -= 1;
153+
self.wake_pending.notify_one();
154+
} else {
155+
data.used -= 1;
156+
157+
// Release the token unless it's the last one in the process
158+
if data.used > 0 {
159+
drop(data);
160+
self.client.release_raw().ok();
161+
}
162+
}
163+
}
80164
}

‎compiler/rustc_data_structures/src/marker.rs‎

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ macro_rules! already_send {
5959
// These structures are already `Send`.
6060
already_send!(
6161
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
62-
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
63-
[crate::owned_slice::OwnedSlice]
62+
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
63+
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
6464
);
6565

6666
macro_rules! impl_dyn_send {
@@ -134,8 +134,8 @@ macro_rules! already_sync {
134134
already_sync!(
135135
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
136136
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
137-
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
138-
[crate::owned_slice::OwnedSlice]
137+
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
138+
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
139139
);
140140

141141
// Use portable AtomicU64 for targets without native 64-bit atomics

‎compiler/rustc_interface/src/interface.rs‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use rustc_ast::{LitKind, MetaItemKind, token};
66
use rustc_codegen_ssa::traits::CodegenBackend;
77
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
8-
use rustc_data_structures::jobserver;
8+
use rustc_data_structures::jobserver::{self,Proxy};
99
use rustc_data_structures::stable_hasher::StableHasher;
1010
use rustc_errors::registry::Registry;
1111
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@@ -40,7 +40,12 @@ pub struct Compiler {
4040
pub sess: Session,
4141
pub codegen_backend: Box<dyn CodegenBackend>,
4242
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
43+
44+
/// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`.
4345
pub(crate) current_gcx: CurrentGcx,
46+
47+
/// A jobserver reference which we pass on to `GlobalCtxt`.
48+
pub(crate) jobserver_proxy: Arc<Proxy>,
4449
}
4550

4651
/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@@ -415,7 +420,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
415420
config.opts.unstable_opts.threads,
416421
&config.extra_symbols,
417422
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
418-
|current_gcx| {
423+
|current_gcx, jobserver_proxy| {
419424
// The previous `early_dcx` can't be reused here because it doesn't
420425
// impl `Send`. Creating a new one is fine.
421426
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@@ -511,6 +516,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
511516
codegen_backend,
512517
override_queries: config.override_queries,
513518
current_gcx,
519+
jobserver_proxy,
514520
};
515521

516522
// There are two paths out of `f`.

‎compiler/rustc_interface/src/passes.rs‎

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{env, fs, iter};
77

88
use rustc_ast as ast;
99
use rustc_codegen_ssa::traits::CodegenBackend;
10+
use rustc_data_structures::jobserver::Proxy;
1011
use rustc_data_structures::parallel;
1112
use rustc_data_structures::steal::Steal;
1213
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
841842
dyn for<'tcx> FnOnce(
842843
&'tcx Session,
843844
CurrentGcx,
845+
Arc<Proxy>,
844846
&'tcx OnceLock<GlobalCtxt<'tcx>>,
845847
&'tcx WorkerLocal<Arena<'tcx>>,
846848
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
847849
F,
848850
) -> T,
849-
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
851+
> = Box::new(move |sess, current_gcx, jobserver_proxy,gcx_cell, arena, hir_arena, f| {
850852
TyCtxt::create_global_ctxt(
851853
gcx_cell,
852854
sess,
@@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
865867
),
866868
providers.hooks,
867869
current_gcx,
870+
jobserver_proxy,
868871
|tcx| {
869872
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
870873
assert_eq!(feed.key(), LOCAL_CRATE);
@@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
887890
)
888891
});
889892

890-
inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
893+
inner(
894+
&compiler.sess,
895+
compiler.current_gcx.clone(),
896+
Arc::clone(&compiler.jobserver_proxy),
897+
&gcx_cell,
898+
&arena,
899+
&hir_arena,
900+
f,
901+
)
891902
}
892903

893904
/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.

‎compiler/rustc_interface/src/util.rs‎

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
22
use std::path::{Path, PathBuf};
3-
use std::sync::OnceLock;
43
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, OnceLock};
55
use std::{env, iter, thread};
66

77
use rustc_ast as ast;
88
use rustc_codegen_ssa::traits::CodegenBackend;
9+
use rustc_data_structures::jobserver::Proxy;
910
use rustc_data_structures::sync;
1011
use rustc_metadata::{DylibError, load_symbol_from_dylib};
1112
use rustc_middle::ty::CurrentGcx;
@@ -124,7 +125,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
124125
})
125126
}
126127

127-
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
128+
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx,Arc<Proxy>) -> R + Send, R: Send>(
128129
thread_stack_size: usize,
129130
edition: Edition,
130131
sm_inputs: SourceMapInputs,
@@ -150,7 +151,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
150151
edition,
151152
extra_symbols,
152153
Some(sm_inputs),
153-
|| f(CurrentGcx::new()),
154+
|| f(CurrentGcx::new(),Proxy::new()),
154155
)
155156
})
156157
.unwrap()
@@ -163,7 +164,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
163164
})
164165
}
165166

166-
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
167+
pub(crate) fn run_in_thread_pool_with_globals<
168+
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
169+
R: Send,
170+
>(
167171
thread_builder_diag: &EarlyDiagCtxt,
168172
edition: Edition,
169173
threads: usize,
@@ -173,8 +177,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
173177
) -> R {
174178
use std::process;
175179

180+
use rustc_data_structures::defer;
176181
use rustc_data_structures::sync::FromDyn;
177-
use rustc_data_structures::{defer, jobserver};
178182
use rustc_middle::ty::tls;
179183
use rustc_query_impl::QueryCtxt;
180184
use rustc_query_system::query::{QueryContext, break_query_cycles};
@@ -189,22 +193,26 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
189193
edition,
190194
sm_inputs,
191195
extra_symbols,
192-
|current_gcx| {
196+
|current_gcx, jobserver_proxy| {
193197
// Register the thread for use with the `WorkerLocal` type.
194198
registry.register();
195199

196-
f(current_gcx)
200+
f(current_gcx, jobserver_proxy)
197201
},
198202
);
199203
}
200204

201205
let current_gcx = FromDyn::from(CurrentGcx::new());
202206
let current_gcx2 = current_gcx.clone();
203207

208+
let proxy = Proxy::new();
209+
210+
let proxy_ = Arc::clone(&proxy);
211+
let proxy__ = Arc::clone(&proxy);
204212
let builder = rayon_core::ThreadPoolBuilder::new()
205213
.thread_name(|_| "rustc".to_string())
206-
.acquire_thread_handler(jobserver::acquire_thread)
207-
.release_thread_handler(jobserver::release_thread)
214+
.acquire_thread_handler(move || proxy_.acquire_thread())
215+
.release_thread_handler(move || proxy__.release_thread())
208216
.num_threads(threads)
209217
.deadlock_handler(move || {
210218
// On deadlock, creates a new thread and forwards information in thread
@@ -268,7 +276,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
268276
},
269277
// Run `f` on the first thread in the thread pool.
270278
move |pool: &rayon_core::ThreadPool| {
271-
pool.install(|| f(current_gcx.into_inner()))
279+
pool.install(|| f(current_gcx.into_inner(), proxy))
272280
},
273281
)
274282
.unwrap()

‎compiler/rustc_middle/src/ty/context.rs‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rustc_data_structures::defer;
2121
use rustc_data_structures::fingerprint::Fingerprint;
2222
use rustc_data_structures::fx::FxHashMap;
2323
use rustc_data_structures::intern::Interned;
24+
use rustc_data_structures::jobserver::Proxy;
2425
use rustc_data_structures::profiling::SelfProfilerRef;
2526
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
2627
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@@ -1441,6 +1442,9 @@ pub struct GlobalCtxt<'tcx> {
14411442
pub(crate) alloc_map: interpret::AllocMap<'tcx>,
14421443

14431444
current_gcx: CurrentGcx,
1445+
1446+
/// A jobserver reference used to release then acquire a token while waiting on a query.
1447+
pub jobserver_proxy: Arc<Proxy>,
14441448
}
14451449

14461450
impl<'tcx> GlobalCtxt<'tcx> {
@@ -1645,6 +1649,7 @@ impl<'tcx> TyCtxt<'tcx> {
16451649
query_system: QuerySystem<'tcx>,
16461650
hooks: crate::hooks::Providers,
16471651
current_gcx: CurrentGcx,
1652+
jobserver_proxy: Arc<Proxy>,
16481653
f: impl FnOnce(TyCtxt<'tcx>) -> T,
16491654
) -> T {
16501655
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@@ -1679,6 +1684,7 @@ impl<'tcx> TyCtxt<'tcx> {
16791684
data_layout,
16801685
alloc_map: interpret::AllocMap::new(),
16811686
current_gcx,
1687+
jobserver_proxy,
16821688
});
16831689

16841690
// This is a separate function to work around a crash with parallel rustc (#135870)

‎compiler/rustc_query_impl/src/plumbing.rs‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use std::num::NonZero;
66

7+
use rustc_data_structures::jobserver::Proxy;
78
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
89
use rustc_data_structures::sync::{DynSend, DynSync};
910
use rustc_data_structures::unord::UnordMap;
@@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> {
6970
impl<'tcx> QueryContext for QueryCtxt<'tcx> {
7071
type QueryInfo = QueryStackDeferred<'tcx>;
7172

73+
#[inline]
74+
fn jobserver_proxy(&self) -> &Proxy {
75+
&*self.jobserver_proxy
76+
}
77+
7278
#[inline]
7379
fn next_job_id(self) -> QueryJobId {
7480
QueryJobId(

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /