From 42effd1132091e2f14af416c64412d4047c76aff Mon Sep 17 00:00:00 2001 From: Jacob Lifshay Date: Tue, 15 Oct 2024 20:32:33 -0700 Subject: [PATCH] switch to using a make job server for managing test parallelism --- Cargo.lock | 60 ++++++++ Cargo.toml | 2 + crates/fayalite/Cargo.toml | 2 + crates/fayalite/src/cli.rs | 129 +---------------- crates/fayalite/src/util.rs | 1 + crates/fayalite/src/util/job_server.rs | 193 +++++++++++++++++++++++++ 6 files changed, 265 insertions(+), 122 deletions(-) create mode 100644 crates/fayalite/src/util/job_server.rs diff --git a/Cargo.lock b/Cargo.lock index 1e6f88c..7a2a2ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,6 +230,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctor" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "derive_destructure2" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64b697ac90ff296f0fc031ee5a61c7ac31fb9fff50e3fb32873b09223613fc0c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -285,10 +306,12 @@ dependencies = [ "bitvec", "blake3", "clap", + "ctor", "eyre", "fayalite-proc-macros", "fayalite-visit-gen", "hashbrown", + "jobslot", "num-bigint", "num-traits", "os_pipe", @@ -350,6 +373,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "glob" version = "0.3.1" @@ -410,6 +444,20 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "jobslot" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe10868679d7a24c2c67d862d0e64a342ce9aef7cdde9ce8019bd35d353d458d" +dependencies = [ + "cfg-if", + "derive_destructure2", + "getrandom", + "libc", + "scopeguard", + "windows-sys 0.59.0", +] + [[package]] name = "libc" version = "0.2.153" @@ -520,6 +568,12 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.202" @@ -672,6 +726,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "which" version = "6.0.1" diff --git a/Cargo.toml b/Cargo.toml index 699d57f..da104c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,11 @@ base16ct = "0.2.0" bitvec = { version = "1.0.1", features = ["serde"] } blake3 = { version = "1.5.4", features = ["serde"] } clap = { version = "4.5.9", features = ["derive", "env", "string"] } +ctor = "0.2.8" eyre = "0.6.12" hashbrown = "0.14.3" indexmap = { version = "2.2.6", features = ["serde"] } +jobslot = "0.2.19" num-bigint = "0.4.4" num-traits = "0.2.16" os_pipe = "1.2.1" diff --git a/crates/fayalite/Cargo.toml b/crates/fayalite/Cargo.toml index 828d759..5724a80 100644 --- a/crates/fayalite/Cargo.toml +++ b/crates/fayalite/Cargo.toml @@ -17,9 +17,11 @@ version.workspace = true bitvec.workspace = true blake3.workspace = true clap.workspace = true +ctor.workspace = true eyre.workspace = true fayalite-proc-macros.workspace = true hashbrown.workspace = true +jobslot.workspace = true num-bigint.workspace = true num-traits.workspace = true os_pipe.workspace = true diff --git a/crates/fayalite/src/cli.rs b/crates/fayalite/src/cli.rs index 1dace37..a236c77 100644 --- a/crates/fayalite/src/cli.rs +++ b/crates/fayalite/src/cli.rs @@ -5,7 +5,7 @@ use crate::{ firrtl::{self, ExportOptions}, intern::Interned, module::Module, - util::streaming_read_utf8::streaming_read_utf8, + util::{job_server::AcquiredJob, streaming_read_utf8::streaming_read_utf8}, }; use clap::{ builder::{OsStringValueParser, TypedValueParser}, @@ -18,10 +18,8 @@ use std::{ ffi::OsString, fmt::{self, Write}, fs, io, mem, - num::NonZeroUsize, path::{Path, PathBuf}, process, - sync::{Condvar, Mutex, OnceLock}, }; use tempfile::TempDir; @@ -49,107 +47,10 @@ impl From for CliError { } } -#[derive(Debug)] -pub struct AcquiredJob { - job_count: NonZeroUsize, -} - -impl Drop for AcquiredJob { - fn drop(&mut self) { - Self::change_job_count(Some(self.job_count), None); - } -} - -impl AcquiredJob { - pub fn max_available_job_count() -> NonZeroUsize { - static RETVAL: OnceLock = OnceLock::new(); - *RETVAL.get_or_init(|| { - std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap()) - }) - } - fn change_job_count(released: Option, acquired: Option) { - static AVAILABLE_JOB_COUNT: OnceLock> = OnceLock::new(); - static COND_VAR: Condvar = Condvar::new(); - let mut available_job_count_lock = AVAILABLE_JOB_COUNT - .get_or_init(|| Mutex::new(Self::max_available_job_count().get())) - .lock() - .unwrap(); - if let Some(released) = released { - *available_job_count_lock = available_job_count_lock - .checked_add(released.get()) - .expect("tried to release too many jobs"); - COND_VAR.notify_all(); - } - if let Some(acquired) = acquired { - loop { - match available_job_count_lock.checked_sub(acquired.get()) { - Some(jobs_left) => { - *available_job_count_lock = jobs_left; - break; - } - None => { - available_job_count_lock = COND_VAR.wait(available_job_count_lock).unwrap() - } - } - } - } - } - pub fn job_count(&self) -> NonZeroUsize { - self.job_count - } - pub fn increase_job_count( - &mut self, - new_minimum_count: NonZeroUsize, - f: impl FnOnce(&mut AcquiredJob) -> R, - ) -> R { - if new_minimum_count <= self.job_count { - return f(self); - } - struct ReleaseOnDrop<'a> { - acquired_job: &'a mut AcquiredJob, - old_job_count: NonZeroUsize, - } - impl Drop for ReleaseOnDrop<'_> { - fn drop(&mut self) { - AcquiredJob::change_job_count( - NonZeroUsize::new(self.acquired_job.job_count.get() - self.old_job_count.get()), - None, - ); - self.acquired_job.job_count = self.old_job_count; - } - } - let release_on_drop = ReleaseOnDrop { - old_job_count: self.job_count, - acquired_job: self, - }; - // release our current jobs when acquiring new jobs to avoid deadlock - Self::change_job_count(Some(release_on_drop.old_job_count), Some(new_minimum_count)); - release_on_drop.acquired_job.job_count = new_minimum_count; - let retval = f(release_on_drop.acquired_job); - drop(release_on_drop); - retval - } - pub fn acquire_jobs(job_count: NonZeroUsize) -> Self { - Self::change_job_count(None, Some(job_count)); - Self { job_count } - } - pub fn run_command( - &mut self, - mut cmd: std::process::Command, - f: impl FnOnce(&mut std::process::Command) -> std::io::Result, - ) -> std::io::Result { - // TODO: if we implement a make job server, add the proper env vars to cmd - f(&mut cmd) - } -} - pub trait RunPhase { type Output; fn run(&self, arg: Arg) -> Result { - self.run_with_job( - arg, - &mut AcquiredJob::acquire_jobs(NonZeroUsize::new(1).unwrap()), - ) + self.run_with_job(arg, &mut AcquiredJob::acquire()) } fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result; } @@ -492,14 +393,6 @@ impl FormalMode { FormalMode::Cover => "cover", } } - fn needs_extra_job(self) -> bool { - match self { - FormalMode::BMC => false, - FormalMode::Prove => true, - FormalMode::Live => false, - FormalMode::Cover => false, - } - } } impl fmt::Display for FormalMode { @@ -721,12 +614,6 @@ impl FormalArgs { cmd.arg(sby_file.file_name().unwrap()); cmd.args(&self.sby_extra_args); cmd.current_dir(&output.verilog.firrtl.output_dir); - let new_minimum_count = if self.mode.needs_extra_job() { - NonZeroUsize::new(2).unwrap() - } else { - NonZeroUsize::new(1).unwrap() - }; - let new_minimum_count = AcquiredJob::max_available_job_count().min(new_minimum_count); let mut captured_output = String::new(); let cache_file = output.cache_file(); let do_cache = if let Some(contents_hash) = contents_hash.filter(|_| self.cache_results) { @@ -752,13 +639,11 @@ impl FormalArgs { false }; let _ = fs::remove_file(&cache_file); - let status = acquired_job.increase_job_count(new_minimum_count, |acquired_job| { - self.verilog.firrtl.base.run_external_command( - acquired_job, - cmd, - do_cache.then_some(&mut captured_output), - ) - })?; + let status = self.verilog.firrtl.base.run_external_command( + acquired_job, + cmd, + do_cache.then_some(&mut captured_output), + )?; let result = if status.success() { Ok(output) } else { diff --git a/crates/fayalite/src/util.rs b/crates/fayalite/src/util.rs index 95f5793..f66654f 100644 --- a/crates/fayalite/src/util.rs +++ b/crates/fayalite/src/util.rs @@ -27,4 +27,5 @@ pub use misc::{ interned_bit, iter_eq_by, BitSliceWriteWithBase, DebugAsDisplay, DebugAsRawString, MakeMutSlice, }; +pub mod job_server; pub mod ready_valid; diff --git a/crates/fayalite/src/util/job_server.rs b/crates/fayalite/src/util/job_server.rs new file mode 100644 index 0000000..376ddc0 --- /dev/null +++ b/crates/fayalite/src/util/job_server.rs @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// See Notices.txt for copyright information + +use ctor::ctor; +use jobslot::{Acquired, Client}; +use std::{ + ffi::OsString, + mem, + num::NonZeroUsize, + sync::{Condvar, Mutex, Once, OnceLock}, + thread::spawn, +}; + +fn get_or_make_client() -> &'static Client { + #[ctor] + static CLIENT: OnceLock = unsafe { + match Client::from_env() { + Some(client) => OnceLock::from(client), + None => OnceLock::new(), + } + }; + + CLIENT.get_or_init(|| { + let mut available_parallelism = None; + let mut args = std::env::args_os().skip(1); + while let Some(arg) = args.next() { + const TEST_THREADS_OPTION: &'static [u8] = b"--test-threads"; + if arg.as_encoded_bytes().starts_with(TEST_THREADS_OPTION) { + match arg.as_encoded_bytes().get(TEST_THREADS_OPTION.len()) { + Some(b'=') => { + let mut arg = arg.into_encoded_bytes(); + arg.drain(..=TEST_THREADS_OPTION.len()); + available_parallelism = Some(arg); + break; + } + None => { + available_parallelism = args.next().map(OsString::into_encoded_bytes); + break; + } + _ => {} + } + } + } + let available_parallelism = if let Some(available_parallelism) = available_parallelism + .as_deref() + .and_then(|v| std::str::from_utf8(v).ok()) + .and_then(|v| v.parse().ok()) + { + available_parallelism + } else if let Ok(available_parallelism) = std::thread::available_parallelism() { + available_parallelism + } else { + NonZeroUsize::new(1).unwrap() + }; + Client::new_with_fifo(available_parallelism.get() - 1).expect("failed to create job server") + }) +} + +struct State { + waiting_count: usize, + available: Vec, + implicit_available: bool, +} + +impl State { + fn total_available(&self) -> usize { + self.available.len() + self.implicit_available as usize + } + fn additional_waiting(&self) -> usize { + self.waiting_count.saturating_sub(self.total_available()) + } +} + +static STATE: Mutex = Mutex::new(State { + waiting_count: 0, + available: Vec::new(), + implicit_available: true, +}); +static COND_VAR: Condvar = Condvar::new(); + +#[derive(Debug)] +enum AcquiredJobInner { + FromJobServer(Acquired), + ImplicitJob, +} + +#[derive(Debug)] +pub struct AcquiredJob { + job: AcquiredJobInner, +} + +impl AcquiredJob { + fn start_acquire_thread() { + static STARTED_THREAD: Once = Once::new(); + STARTED_THREAD.call_once(|| { + spawn(|| { + let mut acquired = None; + let client = get_or_make_client(); + let mut state = STATE.lock().unwrap(); + loop { + state = if state.additional_waiting() == 0 { + if acquired.is_some() { + drop(state); + drop(acquired.take()); // drop Acquired outside of lock + STATE.lock().unwrap() + } else { + COND_VAR.wait(state).unwrap() + } + } else if acquired.is_some() { + // allocate space before moving Acquired to ensure we + // drop Acquired outside of the lock on panic + state.available.reserve(1); + state.available.push(acquired.take().unwrap()); + COND_VAR.notify_all(); + state + } else { + drop(state); + acquired = Some( + client + .acquire() + .expect("can't acquire token from job server"), + ); + STATE.lock().unwrap() + }; + } + }); + }); + } + fn acquire_inner(block: bool) -> Option { + Self::start_acquire_thread(); + let mut state = STATE.lock().unwrap(); + loop { + if let Some(acquired) = state.available.pop() { + return Some(Self { + job: AcquiredJobInner::FromJobServer(acquired), + }); + } + if state.implicit_available { + state.implicit_available = false; + return Some(Self { + job: AcquiredJobInner::ImplicitJob, + }); + } + if !block { + return None; + } + state.waiting_count += 1; + state = COND_VAR.wait(state).unwrap(); + state.waiting_count -= 1; + } + } + pub fn try_acquire() -> Option { + Self::acquire_inner(false) + } + pub fn acquire() -> Self { + Self::acquire_inner(true).expect("failed to acquire token") + } + pub fn run_command( + &mut self, + cmd: std::process::Command, + f: impl FnOnce(&mut std::process::Command) -> std::io::Result, + ) -> std::io::Result { + get_or_make_client().configure_make_and_run_with_fifo(cmd, f) + } +} + +impl Drop for AcquiredJob { + fn drop(&mut self) { + let mut state = STATE.lock().unwrap(); + match &self.job { + AcquiredJobInner::FromJobServer(_) => { + if state.waiting_count > state.available.len() + state.implicit_available as usize { + // allocate space before moving Acquired to ensure we + // drop Acquired outside of the lock on panic + state.available.reserve(1); + let AcquiredJobInner::FromJobServer(acquired) = + mem::replace(&mut self.job, AcquiredJobInner::ImplicitJob) + else { + unreachable!() + }; + state.available.push(acquired); + COND_VAR.notify_all(); + } + } + AcquiredJobInner::ImplicitJob => { + state.implicit_available = true; + if state.waiting_count > state.available.len() { + COND_VAR.notify_all(); + } + } + } + } +}