From ecc5ffd9eb455a953931af0cc96a3bbfc9cc0ae5 Mon Sep 17 00:00:00 2001 From: Jacob Lifshay Date: Wed, 24 Sep 2025 00:40:23 -0700 Subject: [PATCH] added automatically-added dependencies; added caching for external jobs --- Cargo.lock | 9 +- Cargo.toml | 1 + crates/fayalite/Cargo.toml | 1 + crates/fayalite/src/build.rs | 271 ++++++++------- crates/fayalite/src/build/external.rs | 475 ++++++++++++++++++++++++-- 5 files changed, 606 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0c32e9..221e10c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "allocator-api2" @@ -81,6 +81,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "basic-toml" version = "0.1.8" @@ -291,6 +297,7 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" name = "fayalite" version = "0.3.0" dependencies = [ + "base64", "bitvec", "blake3", "clap", diff --git a/Cargo.toml b/Cargo.toml index 5a792c6..88bdbbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ fayalite-proc-macros = { version = "=0.3.0", path = "crates/fayalite-proc-macros fayalite-proc-macros-impl = { version = "=0.3.0", path = "crates/fayalite-proc-macros-impl" } fayalite-visit-gen = { version = "=0.3.0", path = "crates/fayalite-visit-gen" } base16ct = "0.2.0" +base64 = "0.22.1" bitvec = { version = "1.0.1", features = ["serde"] } blake3 = { version = "1.5.4", features = ["serde"] } clap = { version = "4.5.9", features = ["derive", "env", "string"] } diff --git a/crates/fayalite/Cargo.toml b/crates/fayalite/Cargo.toml index 082e607..3fd6c6d 100644 --- a/crates/fayalite/Cargo.toml +++ b/crates/fayalite/Cargo.toml @@ -14,6 +14,7 @@ rust-version.workspace = true version.workspace = true [dependencies] +base64.workspace = true bitvec.workspace = true blake3.workspace = true clap.workspace = true diff --git a/crates/fayalite/src/build.rs b/crates/fayalite/src/build.rs index 7729a8f..d34d6e6 100644 --- a/crates/fayalite/src/build.rs +++ b/crates/fayalite/src/build.rs @@ -19,22 +19,18 @@ use std::{ any::{Any, TypeId}, borrow::Cow, cell::OnceCell, + cmp::Ordering, collections::{BTreeMap, BTreeSet, VecDeque}, fmt::{self, Write}, hash::{Hash, Hasher}, - iter, marker::PhantomData, - mem, panic, + panic, rc::Rc, sync::{Arc, OnceLock, RwLock, RwLockWriteGuard, mpsc}, thread::{self, ScopedJoinHandle}, }; -mod external; - -pub use external::{ - TemplateParseError, TemplatedExternalJob, TemplatedExternalJobKind, find_program, -}; +pub mod external; macro_rules! write_str { ($s:expr, $($rest:tt)*) => { @@ -63,11 +59,47 @@ pub enum JobItemName { File { path: Interned }, } +impl JobItemName { + fn as_ref(&self) -> JobItemNameRef<'_> { + match self { + JobItemName::Module { name } => JobItemNameRef::Module { name }, + JobItemName::File { path } => JobItemNameRef::File { path }, + } + } +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +enum JobItemNameRef<'a> { + Module { name: &'a str }, + File { path: &'a str }, +} + +/// ordered by string contents, not by `Interned` +impl PartialOrd for JobItemName { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// ordered by string contents, not by `Interned` +impl Ord for JobItemName { + fn cmp(&self, other: &Self) -> Ordering { + if self == other { + Ordering::Equal + } else { + self.as_ref().cmp(&other.as_ref()) + } + } +} + pub struct CommandLine {} pub trait JobKind: 'static + Send + Sync + Hash + Eq + fmt::Debug { type Job: 'static + Send + Sync + Hash + Eq + fmt::Debug; - fn inputs(&self, job: &Self::Job) -> Interned<[JobItemName]>; + fn inputs_and_direct_dependencies<'a>( + &'a self, + job: &'a Self::Job, + ) -> Cow<'a, BTreeMap>>; fn outputs(&self, job: &Self::Job) -> Interned<[JobItemName]>; /// gets the part of the command line that is common for all members of this job kind -- usually the executable name/path and any global options and/or subcommands fn command_line_prefix(&self) -> Interned<[Interned]>; @@ -155,14 +187,7 @@ impl DynJobKindTrait for T { matches: &mut clap::ArgMatches, ) -> clap::error::Result { let job = self.from_arg_matches(matches)?; - let inputs = self.inputs(&job); - let outputs = self.outputs(&job); - Ok(DynJob(Arc::new(inner::DynJob { - kind: self, - job, - inputs, - outputs, - }))) + Ok(DynJob::from_arc(self, job)) } fn parse_command_line_dyn( @@ -170,14 +195,7 @@ impl DynJobKindTrait for T { command_line: Interned<[Interned]>, ) -> clap::error::Result { let job = self.parse_command_line(command_line)?; - let inputs = self.inputs(&job); - let outputs = self.outputs(&job); - Ok(DynJob(Arc::new(inner::DynJob { - kind: self, - job, - inputs, - outputs, - }))) + Ok(DynJob::from_arc(self, job)) } } @@ -530,7 +548,7 @@ trait DynJobTrait: 'static + Send + Sync + fmt::Debug { fn hash_dyn(&self, state: &mut dyn Hasher); fn kind_type_id(&self) -> TypeId; fn kind(&self) -> DynJobKind; - fn inputs(&self) -> Interned<[JobItemName]>; + fn inputs_and_direct_dependencies<'a>(&'a self) -> &'a BTreeMap>; fn outputs(&self) -> Interned<[JobItemName]>; fn to_command_line(&self) -> Interned<[Interned]>; fn debug_name(&self) -> String; @@ -545,7 +563,7 @@ mod inner { pub(crate) struct DynJob { pub(crate) kind: Arc, pub(crate) job: T::Job, - pub(crate) inputs: Interned<[JobItemName]>, + pub(crate) inputs_and_direct_dependencies: BTreeMap>, pub(crate) outputs: Interned<[JobItemName]>, } } @@ -574,8 +592,8 @@ impl DynJobTrait for inner::DynJob { DynJobKind(self.kind.clone()) } - fn inputs(&self) -> Interned<[JobItemName]> { - self.inputs + fn inputs_and_direct_dependencies<'a>(&'a self) -> &'a BTreeMap> { + &self.inputs_and_direct_dependencies } fn outputs(&self) -> Interned<[JobItemName]> { @@ -603,20 +621,24 @@ impl DynJobTrait for inner::DynJob { pub struct DynJob(Arc); impl DynJob { + fn new_unchecked(job_kind: Arc, job: T::Job) -> Self { + let inputs_and_direct_dependencies = + job_kind.inputs_and_direct_dependencies(&job).into_owned(); + let outputs = job_kind.outputs(&job); + Self(Arc::new(inner::DynJob { + kind: job_kind, + job, + inputs_and_direct_dependencies, + outputs, + })) + } pub fn from_arc(job_kind: Arc, job: T::Job) -> Self { if TypeId::of::() == TypeId::of::() { ::downcast_ref::(&job) .expect("already checked type") .clone() } else { - let inputs = job_kind.inputs(&job); - let outputs = job_kind.outputs(&job); - Self(Arc::new(inner::DynJob { - kind: job_kind, - job, - inputs, - outputs, - })) + Self::new_unchecked(job_kind, job) } } pub fn new(job_kind: T, job: T::Job) -> Self { @@ -625,14 +647,7 @@ impl DynJob { .expect("already checked type") .clone() } else { - let inputs = job_kind.inputs(&job); - let outputs = job_kind.outputs(&job); - Self(Arc::new(inner::DynJob { - kind: Arc::new(job_kind), - job, - inputs, - outputs, - })) + Self::new_unchecked(Arc::new(job_kind), job) } } pub fn kind_type_id(&self) -> TypeId { @@ -645,8 +660,10 @@ impl DynJob { pub fn kind(&self) -> DynJobKind { DynJobTrait::kind(&*self.0) } - pub fn inputs(&self) -> Interned<[JobItemName]> { - DynJobTrait::inputs(&*self.0) + pub fn inputs_and_direct_dependencies<'a>( + &'a self, + ) -> &'a BTreeMap> { + DynJobTrait::inputs_and_direct_dependencies(&*self.0) } pub fn outputs(&self) -> Interned<[JobItemName]> { DynJobTrait::outputs(&*self.0) @@ -714,8 +731,11 @@ impl<'de> Deserialize<'de> for DynJob { impl JobKind for DynJobKind { type Job = DynJob; - fn inputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { - job.inputs() + fn inputs_and_direct_dependencies<'a>( + &'a self, + job: &'a Self::Job, + ) -> Cow<'a, BTreeMap>> { + Cow::Borrowed(job.inputs_and_direct_dependencies()) } fn outputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { @@ -759,7 +779,6 @@ impl JobKind for DynJobKind { enum JobGraphNode { Job(DynJob), Item { - #[allow(dead_code, reason = "used for Debug")] name: JobItemName, source_job: Option, }, @@ -1069,6 +1088,46 @@ impl JobGraph { pub fn new() -> Self { Self::default() } + fn try_add_item_node( + &mut self, + name: JobItemName, + new_source_job: Option, + new_nodes: &mut HashSet<::NodeId>, + ) -> Result<::NodeId, JobGraphError> { + match self.items.entry(name) { + Entry::Occupied(item_entry) => { + let node_id = *item_entry.get(); + let JobGraphNode::Item { + name: _, + source_job, + } = &mut self.graph[node_id] + else { + unreachable!("known to be an item"); + }; + if let Some(new_source_job) = new_source_job { + if let Some(source_job) = source_job { + return Err(JobGraphError::MultipleJobsCreateSameOutput { + output_item: item_entry.key().clone(), + existing_job: source_job.clone(), + new_job: new_source_job, + }); + } else { + *source_job = Some(new_source_job); + } + } + Ok(node_id) + } + Entry::Vacant(item_entry) => { + let node_id = self.graph.add_node(JobGraphNode::Item { + name, + source_job: new_source_job, + }); + new_nodes.insert(node_id); + item_entry.insert(node_id); + Ok(node_id) + } + } + } pub fn try_add_jobs>( &mut self, jobs: I, @@ -1091,60 +1150,24 @@ impl JobGraph { }; let new_nodes = &mut remove_new_nodes_on_error.new_nodes; let this = &mut *remove_new_nodes_on_error.this; - for job in jobs { - let Entry::Vacant(job_entry) = this.jobs.entry(job) else { + let mut worklist = Vec::from_iter(jobs); + while let Some(job) = worklist.pop() { + let Entry::Vacant(job_entry) = this.jobs.entry(job.clone()) else { continue; }; let job_node_id = this .graph .add_node(JobGraphNode::Job(job_entry.key().clone())); new_nodes.insert(job_node_id); - let job_entry = job_entry.insert_entry(job_node_id); - for (item, is_output) in job_entry - .key() - .inputs() - .iter() - .zip(iter::repeat(false)) - .chain(job_entry.key().outputs().iter().zip(iter::repeat(true))) - { - let item_node_id; - match this.items.entry(*item) { - Entry::Occupied(item_entry) => { - item_node_id = *item_entry.get(); - if is_output { - let JobGraphNode::Item { - name: _, - source_job, - } = &mut this.graph[item_node_id] - else { - unreachable!("known to be an item"); - }; - if let Some(source_job) = source_job { - return Err(JobGraphError::MultipleJobsCreateSameOutput { - output_item: item_entry.key().clone(), - existing_job: source_job.clone(), - new_job: job_entry.key().clone(), - }); - } else { - *source_job = Some(job_entry.key().clone()); - } - } - } - Entry::Vacant(item_entry) => { - item_node_id = this.graph.add_node(JobGraphNode::Item { - name: *item, - source_job: is_output.then(|| job_entry.key().clone()), - }); - new_nodes.insert(item_node_id); - item_entry.insert(item_node_id); - } - } - let mut source = item_node_id; - let mut dest = job_node_id; - if is_output { - mem::swap(&mut source, &mut dest); - } - this.graph.add_edge(source, dest, ()); + job_entry.insert(job_node_id); + for name in job.outputs() { + let item_node_id = this.try_add_item_node(name, Some(job.clone()), new_nodes)?; + this.graph.add_edge(job_node_id, item_node_id, ()); + } + for (&name, direct_dependency) in job.inputs_and_direct_dependencies() { + worklist.extend(direct_dependency.clone()); + let item_node_id = this.try_add_item_node(name, None, new_nodes)?; + this.graph.add_edge(item_node_id, job_node_id, ()); } } match toposort(&this.graph, Some(&mut this.space)) { @@ -1222,7 +1245,7 @@ impl JobGraph { } } retval.push_str(":"); - for input in job.inputs() { + for input in job.inputs_and_direct_dependencies().keys() { match input { JobItemName::Module { .. } => continue, JobItemName::File { path } => { @@ -1288,7 +1311,7 @@ impl JobGraph { struct WaitingJobState { job_node_id: ::NodeId, job: DynJob, - inputs: Vec>, + inputs: BTreeMap>, } let mut ready_jobs = VecDeque::new(); let mut item_name_to_waiting_jobs_map = HashMap::<_, Vec<_>>::default(); @@ -1296,21 +1319,24 @@ impl JobGraph { let JobGraphNode::Job(job) = &self.graph[node_id] else { continue; }; - let inputs = job.inputs(); let waiting_job = WaitingJobState { job_node_id: node_id, job: job.clone(), - inputs: inputs.iter().map(|_| OnceCell::new()).collect(), + inputs: job + .inputs_and_direct_dependencies() + .keys() + .map(|&name| (name, OnceCell::new())) + .collect(), }; - if inputs.is_empty() { + if waiting_job.inputs.is_empty() { ready_jobs.push_back(waiting_job); } else { let waiting_job = Rc::new(waiting_job); - for (input_index, input_item) in inputs.into_iter().enumerate() { + for &input_item in waiting_job.inputs.keys() { item_name_to_waiting_jobs_map .entry(input_item) .or_default() - .push((input_index, waiting_job.clone())); + .push(waiting_job.clone()); } } } @@ -1327,18 +1353,24 @@ impl JobGraph { unreachable!(); }; let output_items = thread.join().map_err(panic::resume_unwind)??; - let output_names = job.outputs(); - assert_eq!( - output_items.len(), - output_names.len(), - "job's run() method returned the wrong number of output items: {job:?}" + assert!( + output_items.iter().map(JobItem::name).eq(job.outputs()), + "job's run() method returned the wrong output items:\n\ + output items:\n\ + {output_items:?}\n\ + expected outputs:\n\ + {:?}\n\ + job:\n\ + {job:?}", + job.outputs(), ); - for (output_item, output_name) in output_items.into_iter().zip(output_names) { - for (input_index, waiting_job) in item_name_to_waiting_jobs_map - .remove(&output_name) + for output_item in output_items { + for waiting_job in item_name_to_waiting_jobs_map + .remove(&output_item.name()) .unwrap_or_default() { - let Ok(()) = waiting_job.inputs[input_index].set(output_item.clone()) + let Ok(()) = + waiting_job.inputs[&output_item.name()].set(output_item.clone()) else { unreachable!(); }; @@ -1377,7 +1409,7 @@ impl JobGraph { job: job.clone(), inputs: Vec::from_iter( inputs - .into_iter() + .into_values() .map(|input| input.into_inner().expect("was set earlier")), ), acquired_job: AcquiredJob::acquire(), @@ -1457,7 +1489,9 @@ pub fn program_name_for_internal_jobs() -> Interned { pub trait InternalJobTrait: clap::Args + 'static + fmt::Debug + Eq + Hash + Send + Sync { fn subcommand_name() -> Interned; fn to_args(&self) -> Vec>; - fn inputs(&self) -> Interned<[JobItemName]>; + fn inputs_and_direct_dependencies<'a>( + &'a self, + ) -> Cow<'a, BTreeMap>>; fn outputs(&self) -> Interned<[JobItemName]>; fn run(&self, inputs: &[JobItem], acquired_job: &mut AcquiredJob) -> eyre::Result>; @@ -1495,8 +1529,11 @@ impl fmt::Debug for InternalJobKind { impl JobKind for InternalJobKind { type Job = InternalJob; - fn inputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { - job.0.inputs() + fn inputs_and_direct_dependencies<'a>( + &'a self, + job: &'a Self::Job, + ) -> Cow<'a, BTreeMap>> { + job.0.inputs_and_direct_dependencies() } fn outputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { diff --git a/crates/fayalite/src/build/external.rs b/crates/fayalite/src/build/external.rs index 4ca4549..360ad6e 100644 --- a/crates/fayalite/src/build/external.rs +++ b/crates/fayalite/src/build/external.rs @@ -2,13 +2,17 @@ // See Notices.txt for copyright information use crate::{ - build::{EscapeForUnixShell, JobItem, JobItemName, JobKind}, + build::{DynJob, EscapeForUnixShell, JobItem, JobItemName, JobKind}, intern::{Intern, Interned}, - util::job_server::AcquiredJob, + util::{job_server::AcquiredJob, streaming_read_utf8::streaming_read_utf8}, }; +use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD}; use clap::builder::StyledStr; -use eyre::{Context, eyre}; +use eyre::{Context, ensure, eyre}; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error}; use std::{ + borrow::Cow, + collections::BTreeMap, env, fmt::{self, Write}, mem, @@ -31,10 +35,326 @@ impl TemplateArg { } } +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize, Deserialize)] +#[non_exhaustive] +pub enum ExternalJobCacheVersion { + /// not used, used to be for `FormalCacheVersion` + V1, + V2, +} + +impl ExternalJobCacheVersion { + pub const CURRENT: Self = Self::V2; +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +#[non_exhaustive] +pub enum MaybeUtf8 { + Utf8(String), + Binary(Vec), +} + +impl MaybeUtf8 { + pub fn as_bytes(&self) -> &[u8] { + match self { + MaybeUtf8::Utf8(v) => v.as_bytes(), + MaybeUtf8::Binary(v) => v, + } + } +} + +#[derive(Serialize, Deserialize)] +#[serde(rename = "MaybeUtf8")] +enum MaybeUtf8Serde<'a> { + Utf8(Cow<'a, str>), + Binary(String), +} + +impl<'de> Deserialize<'de> for MaybeUtf8 { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Ok(match MaybeUtf8Serde::deserialize(deserializer)? { + MaybeUtf8Serde::Utf8(v) => Self::Utf8(v.into_owned()), + MaybeUtf8Serde::Binary(v) => BASE64_URL_SAFE_NO_PAD + .decode(&*v) + .map_err(D::Error::custom)? + .into(), + }) + } +} + +impl Serialize for MaybeUtf8 { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + MaybeUtf8::Utf8(v) => MaybeUtf8Serde::Utf8(Cow::Borrowed(v)), + MaybeUtf8::Binary(v) => MaybeUtf8Serde::Binary(BASE64_URL_SAFE_NO_PAD.encode(v)), + } + .serialize(serializer) + } +} + +impl From> for MaybeUtf8 { + fn from(value: Vec) -> Self { + match String::from_utf8(value) { + Ok(value) => Self::Utf8(value), + Err(e) => Self::Binary(e.into_bytes()), + } + } +} + +impl From for MaybeUtf8 { + fn from(value: String) -> Self { + Self::Utf8(value) + } +} + +#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct ExternalJobCache { + pub version: ExternalJobCacheVersion, + pub inputs_hash: blake3::Hash, + pub stdout_stderr: String, + pub result: Result, String>, +} + +impl ExternalJobCache { + fn read_from_file(cache_json_path: Interned) -> eyre::Result { + let cache_str = std::fs::read_to_string(&*cache_json_path) + .wrap_err_with(|| format!("can't read {cache_json_path}"))?; + serde_json::from_str(&cache_str).wrap_err_with(|| format!("can't decode {cache_json_path}")) + } + fn write_to_file(&self, cache_json_path: Interned) -> eyre::Result<()> { + let cache_str = serde_json::to_string_pretty(&self).expect("serialization can't fail"); + std::fs::write(&*cache_json_path, cache_str) + .wrap_err_with(|| format!("can't write {cache_json_path}")) + } +} + +#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +pub struct ExternalJobCaching { + pub cache_json_path: Interned, + pub run_even_if_cached: bool, +} + +#[derive(Default)] +struct JobCacheHasher(blake3::Hasher); + +impl JobCacheHasher { + fn hash_size(&mut self, size: usize) { + self.0.update(&u64::to_le_bytes( + size.try_into().expect("size should fit in u64"), + )); + } + fn hash_sized_bytes(&mut self, bytes: &[u8]) { + self.hash_size(bytes.len()); + self.0.update(bytes); + } + fn hash_sized_str(&mut self, s: &str) { + self.hash_sized_bytes(s.as_bytes()); + } + fn hash_iter>( + &mut self, + iter: I, + mut f: F, + ) { + let iter = iter.into_iter(); + self.hash_size(iter.len()); + iter.for_each(|item| f(self, item)); + } + fn try_hash_iter< + F: FnMut(&mut Self, I::Item) -> Result<(), E>, + E, + I: IntoIterator, + >( + &mut self, + iter: I, + mut f: F, + ) -> Result<(), E> { + let mut iter = iter.into_iter(); + self.hash_size(iter.len()); + iter.try_for_each(|item| f(self, item)) + } +} + +impl ExternalJobCaching { + pub fn new(cache_json_path: Interned) -> Self { + Self { + cache_json_path, + run_even_if_cached: false, + } + } + #[track_caller] + pub fn from_path(cache_json_path: impl AsRef) -> Self { + let cache_json_path = cache_json_path.as_ref(); + let Some(cache_json_path) = cache_json_path.as_os_str().to_str() else { + panic!("non-UTF-8 path to cache json: {cache_json_path:?}"); + }; + Self::new(cache_json_path.intern()) + } + fn write_stdout_stderr(stdout_stderr: &str) { + if stdout_stderr == "" { + return; + } + // use print! so output goes to Rust test output capture + if stdout_stderr.ends_with('\n') { + print!("{stdout_stderr}"); + } else { + println!("{stdout_stderr}"); + } + } + /// returns `Err(_)` if reading the cache failed, otherwise returns `Ok(_)` with the results from the cache + fn run_from_cache( + self, + inputs_hash: blake3::Hash, + output_file_paths: impl IntoIterator>, + ) -> Result, ()> { + if self.run_even_if_cached { + return Err(()); + } + let Ok(ExternalJobCache { + version: ExternalJobCacheVersion::CURRENT, + inputs_hash: cached_inputs_hash, + stdout_stderr, + result, + }) = ExternalJobCache::read_from_file(self.cache_json_path) + else { + return Err(()); + }; + if inputs_hash != cached_inputs_hash { + return Err(()); + } + match result { + Ok(outputs) => { + for output_file_path in output_file_paths { + let Some(output_data) = outputs.get(&*output_file_path) else { + if let Ok(true) = std::fs::exists(&*output_file_path) { + // assume the existing file is the correct one + continue; + } + return Err(()); + }; + let Ok(()) = std::fs::write(&*output_file_path, output_data.as_bytes()) else { + return Err(()); + }; + } + Self::write_stdout_stderr(&stdout_stderr); + Ok(Ok(())) + } + Err(error) => { + Self::write_stdout_stderr(&stdout_stderr); + Ok(Err(error)) + } + } + } + fn make_command( + command_line: Interned<[Interned]>, + ) -> eyre::Result { + ensure!(command_line.is_empty(), "command line must not be empty"); + let mut cmd = std::process::Command::new(&*command_line[0]); + cmd.args(command_line[1..].iter().map(|arg| &**arg)) + .stdin(std::process::Stdio::null()); + Ok(cmd) + } + pub fn run eyre::Result<()>>( + self, + command_line: Interned<[Interned]>, + input_file_paths: impl IntoIterator>, + output_file_paths: impl IntoIterator> + Clone, + run_fn: F, + ) -> eyre::Result<()> { + let mut hasher = JobCacheHasher::default(); + hasher.hash_iter(command_line.iter(), |hasher, arg| { + hasher.hash_sized_str(arg) + }); + let mut input_file_paths = + Vec::<&str>::from_iter(input_file_paths.into_iter().map(Interned::into_inner)); + input_file_paths.sort_unstable(); + input_file_paths.dedup(); + hasher.try_hash_iter( + &input_file_paths, + |hasher, input_file_path| -> eyre::Result<()> { + hasher.hash_sized_str(input_file_path); + hasher.hash_sized_bytes( + &std::fs::read(input_file_path).wrap_err_with(|| { + format!("can't read job input file: {input_file_path}") + })?, + ); + Ok(()) + }, + )?; + let inputs_hash = hasher.0.finalize(); + match self.run_from_cache(inputs_hash, output_file_paths.clone()) { + Ok(result) => return result.map_err(|e| eyre!(e)), + Err(()) => {} + } + let (pipe_reader, stdout, stderr) = std::io::pipe() + .and_then(|(r, w)| Ok((r, w.try_clone()?, w))) + .wrap_err_with(|| format!("when trying to create a pipe to run: {command_line:?}"))?; + let mut cmd = Self::make_command(command_line)?; + cmd.stdout(stdout).stderr(stderr); + let mut stdout_stderr = String::new(); + let result = std::thread::scope(|scope| { + std::thread::Builder::new() + .name(format!("stdout:{}", command_line[0])) + .spawn_scoped(scope, || { + let _ = streaming_read_utf8(std::io::BufReader::new(pipe_reader), |s| { + stdout_stderr.push_str(s); + // use print! so output goes to Rust test output capture + print!("{s}"); + std::io::Result::Ok(()) + }); + if !stdout_stderr.is_empty() && !stdout_stderr.ends_with('\n') { + println!(); + } + }) + .expect("spawn shouldn't fail"); + run_fn(cmd) + }); + ExternalJobCache { + version: ExternalJobCacheVersion::CURRENT, + inputs_hash, + stdout_stderr, + result: match &result { + Ok(()) => Ok(Result::from_iter(output_file_paths.into_iter().map( + |output_file_path: Interned| -> eyre::Result<_> { + let output_file_path = &*output_file_path; + Ok(( + String::from(output_file_path), + MaybeUtf8::from(std::fs::read(output_file_path).wrap_err_with( + || format!("can't read job output file: {output_file_path}"), + )?), + )) + }, + ))?), + Err(e) => Err(format!("{e:#}")), + }, + } + .write_to_file(self.cache_json_path)?; + result + } + pub fn run_maybe_cached eyre::Result<()>>( + this: Option, + command_line: Interned<[Interned]>, + input_file_paths: impl IntoIterator>, + output_file_paths: impl IntoIterator> + Clone, + run_fn: F, + ) -> eyre::Result<()> { + match this { + Some(this) => this.run(command_line, input_file_paths, output_file_paths, run_fn), + None => run_fn(Self::make_command(command_line)?), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TemplatedExternalJobKind { template: Interned<[TemplateArg]>, command_line_prefix: Interned<[Interned]>, + caching: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -164,7 +484,11 @@ impl<'a> Parser<'a> { } Ok(()) } - fn finish(self, program_path: String) -> TemplatedExternalJobKind { + fn finish( + self, + program_path: String, + caching: Option, + ) -> TemplatedExternalJobKind { let Self { mut tokens, mut template, @@ -190,6 +514,7 @@ impl<'a> Parser<'a> { TemplatedExternalJobKind { template, command_line_prefix: Intern::intern_owned(command_line_prefix), + caching, } } } @@ -250,19 +575,26 @@ impl TemplatedExternalJobKind { default_program_name: &str, program_path_env_var: Option<&str>, args_template: &[&str], + caching: Option, ) -> Result, TemplateParseError> { let mut parser = Parser::new(args_template); parser.parse()?; Ok(find_program(default_program_name, program_path_env_var) - .map(|program_path| parser.finish(program_path))) + .map(|program_path| parser.finish(program_path, caching))) } #[track_caller] pub fn new( default_program_name: &str, program_path_env_var: Option<&str>, args_template: &[&str], + caching: Option, ) -> eyre::Result { - match Self::try_new(default_program_name, program_path_env_var, args_template) { + match Self::try_new( + default_program_name, + program_path_env_var, + args_template, + caching, + ) { Ok(retval) => retval, Err(e) => panic!("{e}"), } @@ -312,8 +644,11 @@ impl TemplatedExternalJobKind { impl JobKind for TemplatedExternalJobKind { type Job = TemplatedExternalJob; - fn inputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { - job.inputs + fn inputs_and_direct_dependencies<'a>( + &'a self, + job: &'a Self::Job, + ) -> Cow<'a, BTreeMap>> { + Cow::Borrowed(&job.inputs) } fn outputs(&self, job: &Self::Job) -> Interned<[JobItemName]> { @@ -342,7 +677,7 @@ impl JobKind for TemplatedExternalJobKind { &self, command_line: Interned<[Interned]>, ) -> clap::error::Result { - let mut inputs = Vec::new(); + let mut inputs = BTreeMap::new(); let mut outputs = Vec::new(); let mut command_line_iter = command_line.iter(); for template_arg in &self.template { @@ -372,7 +707,9 @@ impl JobKind for TemplatedExternalJobKind { ))); } } - TemplateArg::InputPath { before, after } => inputs.push(match_io(before, after)?), + TemplateArg::InputPath { before, after } => { + inputs.insert(match_io(before, after)?, None); + } TemplateArg::OutputPath { before, after } => outputs.push(match_io(before, after)?), } } @@ -381,7 +718,7 @@ impl JobKind for TemplatedExternalJobKind { } else { Ok(TemplatedExternalJob { command_line, - inputs: Intern::intern_owned(inputs), + inputs, outputs: Intern::intern_owned(outputs), }) } @@ -393,34 +730,106 @@ impl JobKind for TemplatedExternalJobKind { inputs: &[JobItem], acquired_job: &mut AcquiredJob, ) -> eyre::Result> { - assert!(inputs.iter().map(JobItem::name).eq(job.inputs)); - let mut cmd: std::process::Command = std::process::Command::new(&*job.command_line[0]); - cmd.args(job.command_line[1..].iter().map(|arg| &**arg)); - acquired_job - .run_command(cmd, |cmd: &mut std::process::Command| { - let status = cmd.status()?; - if status.success() { - Ok(()) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("process exited with status: {status}"), - )) - } - }) - .wrap_err_with(|| format!("when trying to run: {:?}", job.command_line))?; - Ok(Vec::from_iter(job.outputs.iter().map( - |&output| match output { - JobItemName::Module { .. } => unreachable!(), - JobItemName::File { path } => JobItem::File { path }, + assert!( + inputs + .iter() + .map(JobItem::name) + .eq(job.inputs.keys().copied()) + ); + let output_file_paths = job.outputs.iter().map(|&output| match output { + JobItemName::Module { .. } => unreachable!(), + JobItemName::File { path } => path, + }); + let input_file_paths = job.inputs.keys().map(|name| match name { + JobItemName::Module { .. } => unreachable!(), + JobItemName::File { path } => *path, + }); + ExternalJobCaching::run_maybe_cached( + self.caching, + job.command_line, + input_file_paths, + output_file_paths.clone(), + |cmd| { + acquired_job + .run_command(cmd, |cmd: &mut std::process::Command| { + let status = cmd.status()?; + if status.success() { + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("process exited with status: {status}"), + )) + } + }) + .wrap_err_with(|| format!("when trying to run: {:?}", job.command_line)) }, - ))) + )?; + Ok(Vec::from_iter( + output_file_paths.map(|path| JobItem::File { path }), + )) } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TemplatedExternalJob { command_line: Interned<[Interned]>, - inputs: Interned<[JobItemName]>, + inputs: BTreeMap>, outputs: Interned<[JobItemName]>, } + +impl TemplatedExternalJob { + pub fn try_add_direct_dependency(&mut self, new_dependency: DynJob) -> eyre::Result<()> { + let mut added = false; + for output in new_dependency.outputs() { + if let Some(existing_dependency) = self.inputs.get_mut(&output) { + eyre::ensure!( + existing_dependency + .as_ref() + .is_none_or(|v| *v == new_dependency), + "job can't have the same output as some other job:\n\ + output: {output:?}\n\ + existing job:\n\ + {existing_dependency:?}\n\ + new job:\n\ + {new_dependency:?}", + ); + *existing_dependency = Some(new_dependency.clone()); + added = true; + } + } + eyre::ensure!( + added, + "job (that we'll call `A`) can't be added as a direct dependency of another\n\ + job (that we'll call `B`) when none of job `A`'s outputs are an input of job `B`\n\ + job `A`:\n\ + {new_dependency:?}\n\ + job `B`:\n\ + {self:?}" + ); + Ok(()) + } + pub fn try_add_direct_dependencies>( + &mut self, + dependencies: I, + ) -> eyre::Result<()> { + dependencies + .into_iter() + .try_for_each(|new_dependency| self.try_add_direct_dependency(new_dependency)) + } + #[track_caller] + pub fn add_direct_dependencies>(&mut self, dependencies: I) { + match self.try_add_direct_dependencies(dependencies) { + Ok(()) => {} + Err(e) => panic!("error adding dependencies: {e}"), + } + } + #[track_caller] + pub fn with_direct_dependencies>( + mut self, + dependencies: I, + ) -> Self { + self.add_direct_dependencies(dependencies); + self + } +}