From b7f11011643f4e4ab5fc32c9ddab6aa6eb726060 Mon Sep 17 00:00:00 2001 From: Jacob Lifshay Date: Fri, 4 Oct 2024 17:03:51 -0700 Subject: [PATCH] reduce parallelism to fit within the number of available cpus even when running sby in prove mode (which likes to run 2 smt solvers in parallel) --- crates/fayalite/src/cli.rs | 182 +++++++++++++++++++++++++++++++++---- 1 file changed, 162 insertions(+), 20 deletions(-) diff --git a/crates/fayalite/src/cli.rs b/crates/fayalite/src/cli.rs index f1d69d2..fa1b247 100644 --- a/crates/fayalite/src/cli.rs +++ b/crates/fayalite/src/cli.rs @@ -17,8 +17,10 @@ use std::{ ffi::OsString, fmt::{self, Write}, fs, io, mem, + num::NonZeroUsize, path::{Path, PathBuf}, process, + sync::{Condvar, Mutex, OnceLock}, }; use tempfile::TempDir; @@ -46,9 +48,109 @@ impl From for CliError { } } +#[derive(Debug)] +pub struct AcquiredJob { + job_count: NonZeroUsize, +} + +impl Drop for AcquiredJob { + fn drop(&mut self) { + Self::change_job_count(Some(self.job_count), None); + } +} + +impl AcquiredJob { + pub fn max_available_job_count() -> NonZeroUsize { + static RETVAL: OnceLock = OnceLock::new(); + *RETVAL.get_or_init(|| { + std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap()) + }) + } + fn change_job_count(released: Option, acquired: Option) { + static AVAILABLE_JOB_COUNT: OnceLock> = OnceLock::new(); + static COND_VAR: Condvar = Condvar::new(); + let mut available_job_count_lock = AVAILABLE_JOB_COUNT + .get_or_init(|| Mutex::new(Self::max_available_job_count().get())) + .lock() + .unwrap(); + if let Some(released) = released { + *available_job_count_lock = available_job_count_lock + .checked_add(released.get()) + .expect("tried to release too many jobs"); + COND_VAR.notify_all(); + } + if let Some(acquired) = acquired { + loop { + match available_job_count_lock.checked_sub(acquired.get()) { + Some(jobs_left) => { + *available_job_count_lock = jobs_left; + break; + } + None => { + available_job_count_lock = COND_VAR.wait(available_job_count_lock).unwrap() + } + } + } + } + } + pub fn job_count(&self) -> NonZeroUsize { + self.job_count + } + pub fn increase_job_count( + &mut self, + new_minimum_count: NonZeroUsize, + f: impl FnOnce(&mut AcquiredJob) -> R, + ) -> R { + if new_minimum_count <= self.job_count { + return f(self); + } + struct ReleaseOnDrop<'a> { + acquired_job: &'a mut AcquiredJob, + old_job_count: NonZeroUsize, + } + impl Drop for ReleaseOnDrop<'_> { + fn drop(&mut self) { + AcquiredJob::change_job_count( + NonZeroUsize::new(self.acquired_job.job_count.get() - self.old_job_count.get()), + None, + ); + self.acquired_job.job_count = self.old_job_count; + } + } + let release_on_drop = ReleaseOnDrop { + old_job_count: self.job_count, + acquired_job: self, + }; + // release our current jobs when acquiring new jobs to avoid deadlock + Self::change_job_count(Some(release_on_drop.old_job_count), Some(new_minimum_count)); + release_on_drop.acquired_job.job_count = new_minimum_count; + let retval = f(release_on_drop.acquired_job); + drop(release_on_drop); + retval + } + pub fn acquire_jobs(job_count: NonZeroUsize) -> Self { + Self::change_job_count(None, Some(job_count)); + Self { job_count } + } + pub fn run_command( + &mut self, + mut cmd: std::process::Command, + f: impl FnOnce(&mut std::process::Command) -> std::io::Result, + ) -> std::io::Result { + // TODO: if we implement a make job server, add the proper env vars to cmd + f(&mut cmd) + } +} + pub trait RunPhase { type Output; - fn run(&self, arg: Arg) -> Result; + fn run(&self, arg: Arg) -> Result { + self.run_with_job( + arg, + &mut AcquiredJob::acquire_jobs(NonZeroUsize::new(1).unwrap()), + ) + } + fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result; } #[derive(Parser, Debug, Clone)] @@ -93,6 +195,7 @@ impl BaseArgs { /// handles possibly redirecting the command's output for Rust tests pub fn run_external_command( &self, + _acquired_job: &mut AcquiredJob, mut command: process::Command, ) -> io::Result { if self.redirect_output_for_rust_test { @@ -150,7 +253,11 @@ impl FirrtlOutput { } impl FirrtlArgs { - fn run_impl(&self, top_module: Module) -> Result { + fn run_impl( + &self, + top_module: Module, + _acquired_job: &mut AcquiredJob, + ) -> Result { let (file_backend, temp_dir) = self.base.make_firrtl_file_backend()?; let firrtl::FileBackend { top_fir_file_stem, @@ -170,15 +277,23 @@ impl FirrtlArgs { impl RunPhase> for FirrtlArgs { type Output = FirrtlOutput; - fn run(&self, top_module: Module) -> Result { - self.run_impl(top_module.canonical()) + fn run_with_job( + &self, + top_module: Module, + acquired_job: &mut AcquiredJob, + ) -> Result { + self.run_impl(top_module.canonical(), acquired_job) } } impl RunPhase>> for FirrtlArgs { type Output = FirrtlOutput; - fn run(&self, top_module: Interned>) -> Result { - self.run(*top_module) + fn run_with_job( + &self, + top_module: Interned>, + acquired_job: &mut AcquiredJob, + ) -> Result { + self.run_with_job(*top_module, acquired_job) } } @@ -298,7 +413,11 @@ impl VerilogArgs { } Ok(output) } - fn run_impl(&self, firrtl_output: FirrtlOutput) -> Result { + fn run_impl( + &self, + firrtl_output: FirrtlOutput, + acquired_job: &mut AcquiredJob, + ) -> Result { let Self { firrtl, firtool, @@ -323,7 +442,7 @@ impl VerilogArgs { } cmd.args(firtool_extra_args); cmd.current_dir(&output.firrtl.output_dir); - let status = firrtl.base.run_external_command(cmd)?; + let status = firrtl.base.run_external_command(acquired_job, cmd)?; if status.success() { self.process_unadjusted_verilog_file(output) } else { @@ -340,9 +459,9 @@ where FirrtlArgs: RunPhase, { type Output = VerilogOutput; - fn run(&self, arg: Arg) -> Result { - let firrtl_output = self.firrtl.run(arg)?; - self.run_impl(firrtl_output) + fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result { + let firrtl_output = self.firrtl.run_with_job(arg, acquired_job)?; + self.run_impl(firrtl_output, acquired_job) } } @@ -365,6 +484,14 @@ impl FormalMode { FormalMode::Cover => "cover", } } + fn needs_extra_job(self) -> bool { + match self { + FormalMode::BMC => false, + FormalMode::Prove => true, + FormalMode::Live => false, + FormalMode::Cover => false, + } + } } impl fmt::Display for FormalMode { @@ -508,7 +635,11 @@ impl FormalArgs { writeln!(retval, "prep -top {top_module}").unwrap(); Ok(retval) } - fn run_impl(&self, verilog_output: VerilogOutput) -> Result { + fn run_impl( + &self, + verilog_output: VerilogOutput, + acquired_job: &mut AcquiredJob, + ) -> Result { let output = FormalOutput { verilog: verilog_output, }; @@ -519,7 +650,18 @@ impl FormalArgs { cmd.arg(sby_file.file_name().unwrap()); cmd.args(&self.sby_extra_args); cmd.current_dir(&output.verilog.firrtl.output_dir); - let status = self.verilog.firrtl.base.run_external_command(cmd)?; + let new_minimum_count = if self.mode.needs_extra_job() { + NonZeroUsize::new(2).unwrap() + } else { + NonZeroUsize::new(1).unwrap() + }; + let new_minimum_count = AcquiredJob::max_available_job_count().min(new_minimum_count); + let status = acquired_job.increase_job_count(new_minimum_count, |acquired_job| { + self.verilog + .firrtl + .base + .run_external_command(acquired_job, cmd) + })?; if status.success() { Ok(output) } else { @@ -536,9 +678,9 @@ where VerilogArgs: RunPhase, { type Output = FormalOutput; - fn run(&self, arg: Arg) -> Result { - let verilog_output = self.verilog.run(arg)?; - self.run_impl(verilog_output) + fn run_with_job(&self, arg: Arg, acquired_job: &mut AcquiredJob) -> Result { + let verilog_output = self.verilog.run_with_job(arg, acquired_job)?; + self.run_impl(verilog_output, acquired_job) } } @@ -627,16 +769,16 @@ where FirrtlArgs: RunPhase, { type Output = (); - fn run(&self, arg: T) -> Result { + fn run_with_job(&self, arg: T, acquired_job: &mut AcquiredJob) -> Result { match &self.subcommand { CliCommand::Firrtl(c) => { - c.run(arg)?; + c.run_with_job(arg, acquired_job)?; } CliCommand::Verilog(c) => { - c.run(arg)?; + c.run_with_job(arg, acquired_job)?; } CliCommand::Formal(c) => { - c.run(arg)?; + c.run_with_job(arg, acquired_job)?; } } Ok(())