reduce parallelism to fit within the number of available cpus even when running sby in prove mode (which likes to run 2 smt solvers in parallel)

This commit is contained in:
Jacob Lifshay 2024-10-04 17:03:51 -07:00
parent 487af07154
commit b7f1101164
Signed by: programmerjake
SSH key fingerprint: SHA256:B1iRVvUJkvd7upMIiMqn6OyxvD2SgJkAH3ZnUOj6z+c

View file

@ -17,8 +17,10 @@ use std::{
ffi::OsString, ffi::OsString,
fmt::{self, Write}, fmt::{self, Write},
fs, io, mem, fs, io, mem,
num::NonZeroUsize,
path::{Path, PathBuf}, path::{Path, PathBuf},
process, process,
sync::{Condvar, Mutex, OnceLock},
}; };
use tempfile::TempDir; use tempfile::TempDir;
@ -46,9 +48,109 @@ impl From<io::Error> for CliError {
} }
} }
#[derive(Debug)]
pub struct AcquiredJob {
job_count: NonZeroUsize,
}
impl Drop for AcquiredJob {
fn drop(&mut self) {
Self::change_job_count(Some(self.job_count), None);
}
}
impl AcquiredJob {
pub fn max_available_job_count() -> NonZeroUsize {
static RETVAL: OnceLock<NonZeroUsize> = OnceLock::new();
*RETVAL.get_or_init(|| {
std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap())
})
}
fn change_job_count(released: Option<NonZeroUsize>, acquired: Option<NonZeroUsize>) {
static AVAILABLE_JOB_COUNT: OnceLock<Mutex<usize>> = OnceLock::new();
static COND_VAR: Condvar = Condvar::new();
let mut available_job_count_lock = AVAILABLE_JOB_COUNT
.get_or_init(|| Mutex::new(Self::max_available_job_count().get()))
.lock()
.unwrap();
if let Some(released) = released {
*available_job_count_lock = available_job_count_lock
.checked_add(released.get())
.expect("tried to release too many jobs");
COND_VAR.notify_all();
}
if let Some(acquired) = acquired {
loop {
match available_job_count_lock.checked_sub(acquired.get()) {
Some(jobs_left) => {
*available_job_count_lock = jobs_left;
break;
}
None => {
available_job_count_lock = COND_VAR.wait(available_job_count_lock).unwrap()
}
}
}
}
}
pub fn job_count(&self) -> NonZeroUsize {
self.job_count
}
pub fn increase_job_count<R>(
&mut self,
new_minimum_count: NonZeroUsize,
f: impl FnOnce(&mut AcquiredJob) -> R,
) -> R {
if new_minimum_count <= self.job_count {
return f(self);
}
struct ReleaseOnDrop<'a> {
acquired_job: &'a mut AcquiredJob,
old_job_count: NonZeroUsize,
}
impl Drop for ReleaseOnDrop<'_> {
fn drop(&mut self) {
AcquiredJob::change_job_count(
NonZeroUsize::new(self.acquired_job.job_count.get() - self.old_job_count.get()),
None,
);
self.acquired_job.job_count = self.old_job_count;
}
}
let release_on_drop = ReleaseOnDrop {
old_job_count: self.job_count,
acquired_job: self,
};
// release our current jobs when acquiring new jobs to avoid deadlock
Self::change_job_count(Some(release_on_drop.old_job_count), Some(new_minimum_count));
release_on_drop.acquired_job.job_count = new_minimum_count;
let retval = f(release_on_drop.acquired_job);
drop(release_on_drop);
retval
}
pub fn acquire_jobs(job_count: NonZeroUsize) -> Self {
Self::change_job_count(None, Some(job_count));
Self { job_count }
}
pub fn run_command<R>(
&mut self,
mut cmd: std::process::Command,
f: impl FnOnce(&mut std::process::Command) -> std::io::Result<R>,
) -> std::io::Result<R> {
// TODO: if we implement a make job server, add the proper env vars to cmd
f(&mut cmd)
}
}
pub trait RunPhase<Arg> { pub trait RunPhase<Arg> {
type Output; type Output;
fn run(&self, arg: Arg) -> Result<Self::Output>; fn run(&self, arg: Arg) -> Result<Self::Output> {
self.run_with_job(
arg,
&mut AcquiredJob::acquire_jobs(NonZeroUsize::new(1).unwrap()),
)
}
fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result<Self::Output>;
} }
#[derive(Parser, Debug, Clone)] #[derive(Parser, Debug, Clone)]
@ -93,6 +195,7 @@ impl BaseArgs {
/// handles possibly redirecting the command's output for Rust tests /// handles possibly redirecting the command's output for Rust tests
pub fn run_external_command( pub fn run_external_command(
&self, &self,
_acquired_job: &mut AcquiredJob,
mut command: process::Command, mut command: process::Command,
) -> io::Result<process::ExitStatus> { ) -> io::Result<process::ExitStatus> {
if self.redirect_output_for_rust_test { if self.redirect_output_for_rust_test {
@ -150,7 +253,11 @@ impl FirrtlOutput {
} }
impl FirrtlArgs { impl FirrtlArgs {
fn run_impl(&self, top_module: Module<Bundle>) -> Result<FirrtlOutput> { fn run_impl(
&self,
top_module: Module<Bundle>,
_acquired_job: &mut AcquiredJob,
) -> Result<FirrtlOutput> {
let (file_backend, temp_dir) = self.base.make_firrtl_file_backend()?; let (file_backend, temp_dir) = self.base.make_firrtl_file_backend()?;
let firrtl::FileBackend { let firrtl::FileBackend {
top_fir_file_stem, top_fir_file_stem,
@ -170,15 +277,23 @@ impl FirrtlArgs {
impl<T: BundleType> RunPhase<Module<T>> for FirrtlArgs { impl<T: BundleType> RunPhase<Module<T>> for FirrtlArgs {
type Output = FirrtlOutput; type Output = FirrtlOutput;
fn run(&self, top_module: Module<T>) -> Result<Self::Output> { fn run_with_job(
self.run_impl(top_module.canonical()) &self,
top_module: Module<T>,
acquired_job: &mut AcquiredJob,
) -> Result<Self::Output> {
self.run_impl(top_module.canonical(), acquired_job)
} }
} }
impl<T: BundleType> RunPhase<Interned<Module<T>>> for FirrtlArgs { impl<T: BundleType> RunPhase<Interned<Module<T>>> for FirrtlArgs {
type Output = FirrtlOutput; type Output = FirrtlOutput;
fn run(&self, top_module: Interned<Module<T>>) -> Result<Self::Output> { fn run_with_job(
self.run(*top_module) &self,
top_module: Interned<Module<T>>,
acquired_job: &mut AcquiredJob,
) -> Result<Self::Output> {
self.run_with_job(*top_module, acquired_job)
} }
} }
@ -298,7 +413,11 @@ impl VerilogArgs {
} }
Ok(output) Ok(output)
} }
fn run_impl(&self, firrtl_output: FirrtlOutput) -> Result<VerilogOutput> { fn run_impl(
&self,
firrtl_output: FirrtlOutput,
acquired_job: &mut AcquiredJob,
) -> Result<VerilogOutput> {
let Self { let Self {
firrtl, firrtl,
firtool, firtool,
@ -323,7 +442,7 @@ impl VerilogArgs {
} }
cmd.args(firtool_extra_args); cmd.args(firtool_extra_args);
cmd.current_dir(&output.firrtl.output_dir); cmd.current_dir(&output.firrtl.output_dir);
let status = firrtl.base.run_external_command(cmd)?; let status = firrtl.base.run_external_command(acquired_job, cmd)?;
if status.success() { if status.success() {
self.process_unadjusted_verilog_file(output) self.process_unadjusted_verilog_file(output)
} else { } else {
@ -340,9 +459,9 @@ where
FirrtlArgs: RunPhase<Arg, Output = FirrtlOutput>, FirrtlArgs: RunPhase<Arg, Output = FirrtlOutput>,
{ {
type Output = VerilogOutput; type Output = VerilogOutput;
fn run(&self, arg: Arg) -> Result<Self::Output> { fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result<Self::Output> {
let firrtl_output = self.firrtl.run(arg)?; let firrtl_output = self.firrtl.run_with_job(arg, acquired_job)?;
self.run_impl(firrtl_output) self.run_impl(firrtl_output, acquired_job)
} }
} }
@ -365,6 +484,14 @@ impl FormalMode {
FormalMode::Cover => "cover", FormalMode::Cover => "cover",
} }
} }
fn needs_extra_job(self) -> bool {
match self {
FormalMode::BMC => false,
FormalMode::Prove => true,
FormalMode::Live => false,
FormalMode::Cover => false,
}
}
} }
impl fmt::Display for FormalMode { impl fmt::Display for FormalMode {
@ -508,7 +635,11 @@ impl FormalArgs {
writeln!(retval, "prep -top {top_module}").unwrap(); writeln!(retval, "prep -top {top_module}").unwrap();
Ok(retval) Ok(retval)
} }
fn run_impl(&self, verilog_output: VerilogOutput) -> Result<FormalOutput> { fn run_impl(
&self,
verilog_output: VerilogOutput,
acquired_job: &mut AcquiredJob,
) -> Result<FormalOutput> {
let output = FormalOutput { let output = FormalOutput {
verilog: verilog_output, verilog: verilog_output,
}; };
@ -519,7 +650,18 @@ impl FormalArgs {
cmd.arg(sby_file.file_name().unwrap()); cmd.arg(sby_file.file_name().unwrap());
cmd.args(&self.sby_extra_args); cmd.args(&self.sby_extra_args);
cmd.current_dir(&output.verilog.firrtl.output_dir); cmd.current_dir(&output.verilog.firrtl.output_dir);
let status = self.verilog.firrtl.base.run_external_command(cmd)?; let new_minimum_count = if self.mode.needs_extra_job() {
NonZeroUsize::new(2).unwrap()
} else {
NonZeroUsize::new(1).unwrap()
};
let new_minimum_count = AcquiredJob::max_available_job_count().min(new_minimum_count);
let status = acquired_job.increase_job_count(new_minimum_count, |acquired_job| {
self.verilog
.firrtl
.base
.run_external_command(acquired_job, cmd)
})?;
if status.success() { if status.success() {
Ok(output) Ok(output)
} else { } else {
@ -536,9 +678,9 @@ where
VerilogArgs: RunPhase<Arg, Output = VerilogOutput>, VerilogArgs: RunPhase<Arg, Output = VerilogOutput>,
{ {
type Output = FormalOutput; type Output = FormalOutput;
fn run(&self, arg: Arg) -> Result<Self::Output> { fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result<Self::Output> {
let verilog_output = self.verilog.run(arg)?; let verilog_output = self.verilog.run_with_job(arg, acquired_job)?;
self.run_impl(verilog_output) self.run_impl(verilog_output, acquired_job)
} }
} }
@ -627,16 +769,16 @@ where
FirrtlArgs: RunPhase<T, Output = FirrtlOutput>, FirrtlArgs: RunPhase<T, Output = FirrtlOutput>,
{ {
type Output = (); type Output = ();
fn run(&self, arg: T) -> Result<Self::Output> { fn run_with_job(&self, arg: T, acquired_job: &mut AcquiredJob) -> Result<Self::Output> {
match &self.subcommand { match &self.subcommand {
CliCommand::Firrtl(c) => { CliCommand::Firrtl(c) => {
c.run(arg)?; c.run_with_job(arg, acquired_job)?;
} }
CliCommand::Verilog(c) => { CliCommand::Verilog(c) => {
c.run(arg)?; c.run_with_job(arg, acquired_job)?;
} }
CliCommand::Formal(c) => { CliCommand::Formal(c) => {
c.run(arg)?; c.run_with_job(arg, acquired_job)?;
} }
} }
Ok(()) Ok(())