switch to using a make job server for managing test parallelism

This commit is contained in:
Jacob Lifshay 2024-10-15 20:32:33 -07:00
parent 3d0f95cfe5
commit 42effd1132
Signed by: programmerjake
SSH key fingerprint: SHA256:B1iRVvUJkvd7upMIiMqn6OyxvD2SgJkAH3ZnUOj6z+c
6 changed files with 265 additions and 122 deletions

View file

@ -17,9 +17,11 @@ version.workspace = true
bitvec.workspace = true
blake3.workspace = true
clap.workspace = true
ctor.workspace = true
eyre.workspace = true
fayalite-proc-macros.workspace = true
hashbrown.workspace = true
jobslot.workspace = true
num-bigint.workspace = true
num-traits.workspace = true
os_pipe.workspace = true

View file

@ -5,7 +5,7 @@ use crate::{
firrtl::{self, ExportOptions},
intern::Interned,
module::Module,
util::streaming_read_utf8::streaming_read_utf8,
util::{job_server::AcquiredJob, streaming_read_utf8::streaming_read_utf8},
};
use clap::{
builder::{OsStringValueParser, TypedValueParser},
@ -18,10 +18,8 @@ use std::{
ffi::OsString,
fmt::{self, Write},
fs, io, mem,
num::NonZeroUsize,
path::{Path, PathBuf},
process,
sync::{Condvar, Mutex, OnceLock},
};
use tempfile::TempDir;
@ -49,107 +47,10 @@ impl From<io::Error> for CliError {
}
}
#[derive(Debug)]
pub struct AcquiredJob {
job_count: NonZeroUsize,
}
impl Drop for AcquiredJob {
fn drop(&mut self) {
Self::change_job_count(Some(self.job_count), None);
}
}
impl AcquiredJob {
pub fn max_available_job_count() -> NonZeroUsize {
static RETVAL: OnceLock<NonZeroUsize> = OnceLock::new();
*RETVAL.get_or_init(|| {
std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap())
})
}
fn change_job_count(released: Option<NonZeroUsize>, acquired: Option<NonZeroUsize>) {
static AVAILABLE_JOB_COUNT: OnceLock<Mutex<usize>> = OnceLock::new();
static COND_VAR: Condvar = Condvar::new();
let mut available_job_count_lock = AVAILABLE_JOB_COUNT
.get_or_init(|| Mutex::new(Self::max_available_job_count().get()))
.lock()
.unwrap();
if let Some(released) = released {
*available_job_count_lock = available_job_count_lock
.checked_add(released.get())
.expect("tried to release too many jobs");
COND_VAR.notify_all();
}
if let Some(acquired) = acquired {
loop {
match available_job_count_lock.checked_sub(acquired.get()) {
Some(jobs_left) => {
*available_job_count_lock = jobs_left;
break;
}
None => {
available_job_count_lock = COND_VAR.wait(available_job_count_lock).unwrap()
}
}
}
}
}
pub fn job_count(&self) -> NonZeroUsize {
self.job_count
}
pub fn increase_job_count<R>(
&mut self,
new_minimum_count: NonZeroUsize,
f: impl FnOnce(&mut AcquiredJob) -> R,
) -> R {
if new_minimum_count <= self.job_count {
return f(self);
}
struct ReleaseOnDrop<'a> {
acquired_job: &'a mut AcquiredJob,
old_job_count: NonZeroUsize,
}
impl Drop for ReleaseOnDrop<'_> {
fn drop(&mut self) {
AcquiredJob::change_job_count(
NonZeroUsize::new(self.acquired_job.job_count.get() - self.old_job_count.get()),
None,
);
self.acquired_job.job_count = self.old_job_count;
}
}
let release_on_drop = ReleaseOnDrop {
old_job_count: self.job_count,
acquired_job: self,
};
// release our current jobs when acquiring new jobs to avoid deadlock
Self::change_job_count(Some(release_on_drop.old_job_count), Some(new_minimum_count));
release_on_drop.acquired_job.job_count = new_minimum_count;
let retval = f(release_on_drop.acquired_job);
drop(release_on_drop);
retval
}
pub fn acquire_jobs(job_count: NonZeroUsize) -> Self {
Self::change_job_count(None, Some(job_count));
Self { job_count }
}
pub fn run_command<R>(
&mut self,
mut cmd: std::process::Command,
f: impl FnOnce(&mut std::process::Command) -> std::io::Result<R>,
) -> std::io::Result<R> {
// TODO: if we implement a make job server, add the proper env vars to cmd
f(&mut cmd)
}
}
pub trait RunPhase<Arg> {
type Output;
fn run(&self, arg: Arg) -> Result<Self::Output> {
self.run_with_job(
arg,
&mut AcquiredJob::acquire_jobs(NonZeroUsize::new(1).unwrap()),
)
self.run_with_job(arg, &mut AcquiredJob::acquire())
}
fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result<Self::Output>;
}
@ -492,14 +393,6 @@ impl FormalMode {
FormalMode::Cover => "cover",
}
}
fn needs_extra_job(self) -> bool {
match self {
FormalMode::BMC => false,
FormalMode::Prove => true,
FormalMode::Live => false,
FormalMode::Cover => false,
}
}
}
impl fmt::Display for FormalMode {
@ -721,12 +614,6 @@ impl FormalArgs {
cmd.arg(sby_file.file_name().unwrap());
cmd.args(&self.sby_extra_args);
cmd.current_dir(&output.verilog.firrtl.output_dir);
let new_minimum_count = if self.mode.needs_extra_job() {
NonZeroUsize::new(2).unwrap()
} else {
NonZeroUsize::new(1).unwrap()
};
let new_minimum_count = AcquiredJob::max_available_job_count().min(new_minimum_count);
let mut captured_output = String::new();
let cache_file = output.cache_file();
let do_cache = if let Some(contents_hash) = contents_hash.filter(|_| self.cache_results) {
@ -752,13 +639,11 @@ impl FormalArgs {
false
};
let _ = fs::remove_file(&cache_file);
let status = acquired_job.increase_job_count(new_minimum_count, |acquired_job| {
self.verilog.firrtl.base.run_external_command(
acquired_job,
cmd,
do_cache.then_some(&mut captured_output),
)
})?;
let status = self.verilog.firrtl.base.run_external_command(
acquired_job,
cmd,
do_cache.then_some(&mut captured_output),
)?;
let result = if status.success() {
Ok(output)
} else {

View file

@ -27,4 +27,5 @@ pub use misc::{
interned_bit, iter_eq_by, BitSliceWriteWithBase, DebugAsDisplay, DebugAsRawString, MakeMutSlice,
};
pub mod job_server;
pub mod ready_valid;

View file

@ -0,0 +1,193 @@
// SPDX-License-Identifier: LGPL-3.0-or-later
// See Notices.txt for copyright information
use ctor::ctor;
use jobslot::{Acquired, Client};
use std::{
ffi::OsString,
mem,
num::NonZeroUsize,
sync::{Condvar, Mutex, Once, OnceLock},
thread::spawn,
};
fn get_or_make_client() -> &'static Client {
#[ctor]
static CLIENT: OnceLock<Client> = unsafe {
match Client::from_env() {
Some(client) => OnceLock::from(client),
None => OnceLock::new(),
}
};
CLIENT.get_or_init(|| {
let mut available_parallelism = None;
let mut args = std::env::args_os().skip(1);
while let Some(arg) = args.next() {
const TEST_THREADS_OPTION: &'static [u8] = b"--test-threads";
if arg.as_encoded_bytes().starts_with(TEST_THREADS_OPTION) {
match arg.as_encoded_bytes().get(TEST_THREADS_OPTION.len()) {
Some(b'=') => {
let mut arg = arg.into_encoded_bytes();
arg.drain(..=TEST_THREADS_OPTION.len());
available_parallelism = Some(arg);
break;
}
None => {
available_parallelism = args.next().map(OsString::into_encoded_bytes);
break;
}
_ => {}
}
}
}
let available_parallelism = if let Some(available_parallelism) = available_parallelism
.as_deref()
.and_then(|v| std::str::from_utf8(v).ok())
.and_then(|v| v.parse().ok())
{
available_parallelism
} else if let Ok(available_parallelism) = std::thread::available_parallelism() {
available_parallelism
} else {
NonZeroUsize::new(1).unwrap()
};
Client::new_with_fifo(available_parallelism.get() - 1).expect("failed to create job server")
})
}
struct State {
waiting_count: usize,
available: Vec<Acquired>,
implicit_available: bool,
}
impl State {
fn total_available(&self) -> usize {
self.available.len() + self.implicit_available as usize
}
fn additional_waiting(&self) -> usize {
self.waiting_count.saturating_sub(self.total_available())
}
}
static STATE: Mutex<State> = Mutex::new(State {
waiting_count: 0,
available: Vec::new(),
implicit_available: true,
});
static COND_VAR: Condvar = Condvar::new();
#[derive(Debug)]
enum AcquiredJobInner {
FromJobServer(Acquired),
ImplicitJob,
}
#[derive(Debug)]
pub struct AcquiredJob {
job: AcquiredJobInner,
}
impl AcquiredJob {
fn start_acquire_thread() {
static STARTED_THREAD: Once = Once::new();
STARTED_THREAD.call_once(|| {
spawn(|| {
let mut acquired = None;
let client = get_or_make_client();
let mut state = STATE.lock().unwrap();
loop {
state = if state.additional_waiting() == 0 {
if acquired.is_some() {
drop(state);
drop(acquired.take()); // drop Acquired outside of lock
STATE.lock().unwrap()
} else {
COND_VAR.wait(state).unwrap()
}
} else if acquired.is_some() {
// allocate space before moving Acquired to ensure we
// drop Acquired outside of the lock on panic
state.available.reserve(1);
state.available.push(acquired.take().unwrap());
COND_VAR.notify_all();
state
} else {
drop(state);
acquired = Some(
client
.acquire()
.expect("can't acquire token from job server"),
);
STATE.lock().unwrap()
};
}
});
});
}
fn acquire_inner(block: bool) -> Option<Self> {
Self::start_acquire_thread();
let mut state = STATE.lock().unwrap();
loop {
if let Some(acquired) = state.available.pop() {
return Some(Self {
job: AcquiredJobInner::FromJobServer(acquired),
});
}
if state.implicit_available {
state.implicit_available = false;
return Some(Self {
job: AcquiredJobInner::ImplicitJob,
});
}
if !block {
return None;
}
state.waiting_count += 1;
state = COND_VAR.wait(state).unwrap();
state.waiting_count -= 1;
}
}
pub fn try_acquire() -> Option<Self> {
Self::acquire_inner(false)
}
pub fn acquire() -> Self {
Self::acquire_inner(true).expect("failed to acquire token")
}
pub fn run_command<R>(
&mut self,
cmd: std::process::Command,
f: impl FnOnce(&mut std::process::Command) -> std::io::Result<R>,
) -> std::io::Result<R> {
get_or_make_client().configure_make_and_run_with_fifo(cmd, f)
}
}
impl Drop for AcquiredJob {
fn drop(&mut self) {
let mut state = STATE.lock().unwrap();
match &self.job {
AcquiredJobInner::FromJobServer(_) => {
if state.waiting_count > state.available.len() + state.implicit_available as usize {
// allocate space before moving Acquired to ensure we
// drop Acquired outside of the lock on panic
state.available.reserve(1);
let AcquiredJobInner::FromJobServer(acquired) =
mem::replace(&mut self.job, AcquiredJobInner::ImplicitJob)
else {
unreachable!()
};
state.available.push(acquired);
COND_VAR.notify_all();
}
}
AcquiredJobInner::ImplicitJob => {
state.implicit_available = true;
if state.waiting_count > state.available.len() {
COND_VAR.notify_all();
}
}
}
}
}