From de939e279a66236f91ae98d6e03e94483b0fb70a Mon Sep 17 00:00:00 2001 From: Jannis Harder Date: Wed, 6 Jul 2022 14:28:20 +0200 Subject: [PATCH] Run tasks in parallel --- sbysrc/sby.py | 99 +++++++++++++++++++++++++++++------------- sbysrc/sby_autotune.py | 9 +++- sbysrc/sby_core.py | 12 ++--- 3 files changed, 78 insertions(+), 42 deletions(-) diff --git a/sbysrc/sby.py b/sbysrc/sby.py index 0628c6d..7e55da8 100644 --- a/sbysrc/sby.py +++ b/sbysrc/sby.py @@ -19,7 +19,7 @@ import argparse, json, os, sys, shutil, tempfile, re ##yosys-sys-path## -from sby_core import SbyConfig, SbyTask, SbyAbort, process_filename +from sby_core import SbyConfig, SbyTask, SbyAbort, SbyTaskloop, process_filename import time, platform class DictAction(argparse.Action): @@ -401,7 +401,7 @@ if (workdir is not None) and (len(tasknames) != 1): print("ERROR: Exactly one task is required when workdir is specified. Specify the task or use --prefix instead of -d.", file=sys.stderr) sys.exit(1) -def run_task(taskname): +def start_task(taskloop, taskname): sbyconfig, _, _, _ = read_sbyconfig(sbydata, taskname) my_opt_tmpdir = opt_tmpdir @@ -463,48 +463,85 @@ def run_task(taskname): else: junit_filename = "junit" - task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir) + task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir, taskloop) for k, v in exe_paths.items(): task.exe_paths[k] = v - try: - if autotune: - import sby_autotune - sby_autotune.SbyAutotune(task, autotune_config).run() + def exit_callback(): + if not autotune and not setupmode: + task.summarize() + task.write_summary_file() + + if my_opt_tmpdir: + task.log(f"Removing directory '{my_workdir}'.") + shutil.rmtree(my_workdir, ignore_errors=True) + + if setupmode: + task.log(f"SETUP COMPLETE (rc={task.retcode})") else: - task.run(setupmode) - except SbyAbort: - if throw_err: - raise + task.log(f"DONE ({task.status}, rc={task.retcode})") + task.logfile.close() - if my_opt_tmpdir: - task.log(f"Removing directory '{my_workdir}'.") - shutil.rmtree(my_workdir, ignore_errors=True) + if not my_opt_tmpdir and not setupmode and not autotune: + with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f: + task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False) - if setupmode: - task.log(f"SETUP COMPLETE (rc={task.retcode})") - else: - task.log(f"DONE ({task.status}, rc={task.retcode})") - task.logfile.close() + with open(f"{task.workdir}/status", "w") as f: + print(f"{task.status} {task.retcode} {task.total_time}", file=f) - if not my_opt_tmpdir and not setupmode and not autotune: - with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f: - task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False) + task.exit_callback = exit_callback - with open(f"{task.workdir}/status", "w") as f: - print(f"{task.status} {task.retcode} {task.total_time}", file=f) - - return task.retcode + if not autotune: + task.setup_procs(setupmode) + task.task_local_abort = not throw_err + return task failed = [] retcode = 0 -for task in tasknames: - task_retcode = run_task(task) - retcode |= task_retcode - if task_retcode: - failed.append(task) + +# Autotune is already parallel, parallelizing it across tasks needs some more work +sequential = autotune # TODO selection between parallel/sequential + +if sequential: + for taskname in tasknames: + taskloop = SbyTaskloop() + try: + task = start_task(taskloop, taskname) + except SbyAbort: + if throw_err: + raise + sys.exit(1) + + if autotune: + from sby_autotune import SbyAutotune + SbyAutotune(task, autotune_config).run() + elif setupmode: + task.exit_callback() + else: + taskloop.run() + retcode |= task.retcode + if task.retcode: + failed.append(taskname) +else: + taskloop = SbyTaskloop() + + tasks = {} + for taskname in tasknames: + try: + tasks[taskname] = start_task(taskloop, taskname) + except SbyAbort: + if throw_err: + raise + sys.exit(1) + + taskloop.run() + + for taskname, task in tasks.items(): + retcode |= task.retcode + if task.retcode: + failed.append(taskname) if failed and (len(tasknames) > 1 or tasknames[0] is not None): tm = time.localtime() diff --git a/sbysrc/sby_autotune.py b/sbysrc/sby_autotune.py index c7d741c..771a9a0 100644 --- a/sbysrc/sby_autotune.py +++ b/sbysrc/sby_autotune.py @@ -168,6 +168,7 @@ class SbyAutotune: """Performs automatic engine selection for a given task. """ def __init__(self, task, config_file=None): + self.task_exit_callback = task.exit_callback task.exit_callback = lambda: None task.check_timeout = lambda: None task.status = "TIMEOUT" @@ -432,6 +433,8 @@ class SbyAutotune: self.task.status = "FAIL" self.task.retcode = 2 + self.task_exit_callback() + def next_candidate(self, peek=False): # peek=True is used to check whether we need to timeout running candidates to # give other candidates a chance. @@ -635,6 +638,8 @@ class SbyAutotuneTask(SbyTask): self.model_time = 0 self.model_requests = [] + self.exit_callback = self.autotune_exit_callback + def parse_config(self, f): super().parse_config(f) @@ -650,8 +655,8 @@ class SbyAutotuneTask(SbyTask): self.log(f"using model '{model_name}'") return self.autotune.model(self, model_name) - def exit_callback(self): - super().exit_callback() + def autotune_exit_callback(self): + self.summarize() self.candidate.total_adjusted_time = int(monotonic() - self.start_clock_time + self.model_time) self.candidate.engine_retcode = self.retcode diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 366817f..f49aad0 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -437,6 +437,7 @@ class SbyTask(SbyConfig): self.precise_prop_status = False self.timeout_reached = False self.task_local_abort = False + self.exit_callback = self.summarize yosys_program_prefix = "" ##yosys-program-prefix## self.exe_paths = { @@ -795,12 +796,6 @@ class SbyTask(SbyConfig): else: assert 0 - def run(self, setupmode): - self.setup_procs(setupmode) - if not setupmode: - self.taskloop.run() - self.write_summary_file() - def handle_non_engine_options(self): with open(f"{self.workdir}/config.sby", "r") as f: self.parse_config(f) @@ -897,6 +892,8 @@ class SbyTask(SbyConfig): total_process_time = int((ru.ru_utime + ru.ru_stime) - self.start_process_time) self.total_time = total_process_time + # TODO process time is incorrect when running in parallel + self.summary = [ "Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format (total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time), @@ -929,9 +926,6 @@ class SbyTask(SbyConfig): for line in self.summary: print(line, file=f) - def exit_callback(self): - self.summarize() - def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False): junit_time = strftime('%Y-%m-%dT%H:%M:%S') if not self.design: