3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-05 22:14:08 +00:00

Decouple taskloop from task

This commit is contained in:
Jannis Harder 2022-04-21 16:22:32 +02:00
parent c944a9c99c
commit 0fe8c223cf
2 changed files with 119 additions and 48 deletions

View file

@ -458,13 +458,11 @@ def run_task(taskname):
for k, v in exe_paths.items():
task.exe_paths[k] = v
if throw_err:
try:
task.run(setupmode)
else:
try:
task.run(setupmode)
except SbyAbort:
pass
except SbyAbort:
if throw_err:
raise
if my_opt_tmpdir:
task.log(f"Removing directory '{my_workdir}'.")

View file

@ -51,6 +51,7 @@ class SbyProc:
self.running = False
self.finished = False
self.terminated = False
self.exited = False
self.checkretcode = False
self.retcodes = [0]
self.task = task
@ -81,7 +82,7 @@ class SbyProc:
self.logstderr = logstderr
self.silent = silent
self.task.procs_pending.append(self)
self.task.update_proc_pending(self)
for dep in self.deps:
dep.register_dep(self)
@ -90,6 +91,9 @@ class SbyProc:
self.exit_callback = None
self.error_callback = None
if self.task.timeout_reached:
self.terminate(True)
def register_dep(self, next_proc):
if self.finished:
next_proc.poll()
@ -137,12 +141,19 @@ class SbyProc:
except PermissionError:
pass
self.p.terminate()
self.task.procs_running.remove(self)
all_procs_running.remove(self)
self.task.update_proc_stopped(self)
elif not self.finished and not self.terminated and not self.exited:
self.task.update_proc_canceled(self)
self.terminated = True
def poll(self):
if self.finished or self.terminated:
def poll(self, force_unchecked=False):
if self.task.task_local_abort and not force_unchecked:
try:
self.poll(True)
except SbyAbort:
self.task.terminate(True)
return
if self.finished or self.terminated or self.exited:
return
if not self.running:
@ -168,9 +179,7 @@ class SbyProc:
self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=(subprocess.STDOUT if self.logstderr else None))
self.task.procs_pending.remove(self)
self.task.procs_running.append(self)
all_procs_running.append(self)
self.task.update_proc_running(self)
self.running = True
return
@ -187,26 +196,24 @@ class SbyProc:
if self.p.poll() is not None:
if not self.silent:
self.task.log(f"{self.info}: finished (returncode={self.p.returncode})")
self.task.procs_running.remove(self)
all_procs_running.remove(self)
self.task.update_proc_stopped(self)
self.running = False
self.exited = True
if self.p.returncode == 127:
self.task.status = "ERROR"
if not self.silent:
self.task.log(f"{self.info}: COMMAND NOT FOUND. ERROR.")
self.handle_error(self.p.returncode)
self.terminated = True
self.task.terminate()
self.task.proc_failed(self)
return
if self.checkretcode and self.p.returncode not in self.retcodes:
self.task.status = "ERROR"
if not self.silent:
self.task.log(f"{self.info}: task failed. ERROR.")
self.handle_error(self.p.returncode)
self.terminated = True
self.task.terminate()
self.task.proc_failed(self)
return
self.handle_exit(self.p.returncode)
@ -321,8 +328,55 @@ class SbyConfig:
def error(self, logmessage):
raise SbyAbort(logmessage)
class SbyTaskloop:
def __init__(self):
self.procs_pending = []
self.procs_running = []
self.tasks = []
self.poll_now = False
def run(self):
for proc in self.procs_pending:
proc.poll()
while len(self.procs_running) or self.poll_now:
fds = []
for proc in self.procs_running:
if proc.running:
fds.append(proc.p.stdout)
if not self.poll_now:
if os.name == "posix":
try:
select(fds, [], [], 1.0) == ([], [], [])
except InterruptedError:
pass
else:
sleep(0.1)
self.poll_now = False
for proc in self.procs_running:
proc.poll()
for proc in self.procs_pending:
proc.poll()
tasks = self.tasks
self.tasks = []
for task in tasks:
task.check_timeout()
if task.procs_pending or task.procs_running:
self.tasks.append(task)
else:
task.exit_callback()
for task in self.tasks:
task.exit_callback()
class SbyTask(SbyConfig):
def __init__(self, sbyconfig, workdir, early_logs, reusedir):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None):
super().__init__()
self.used_options = set()
self.models = dict()
@ -333,6 +387,8 @@ class SbyTask(SbyConfig):
self.expect = list()
self.design_hierarchy = None
self.precise_prop_status = False
self.timeout_reached = False
self.task_local_abort = False
yosys_program_prefix = "" ##yosys-program-prefix##
self.exe_paths = {
@ -346,6 +402,9 @@ class SbyTask(SbyConfig):
"pono": os.getenv("PONO", "pono"),
}
self.taskloop = taskloop or SbyTaskloop()
self.taskloop.tasks.append(self)
self.procs_running = []
self.procs_pending = []
@ -367,36 +426,34 @@ class SbyTask(SbyConfig):
for line in sbyconfig:
print(line, file=f)
def taskloop(self):
for proc in self.procs_pending:
proc.poll()
def check_timeout(self):
if self.opt_timeout is not None:
total_clock_time = int(monotonic() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all subprocesses.")
self.status = "TIMEOUT"
self.terminate(timeout=True)
while len(self.procs_running):
fds = []
for proc in self.procs_running:
if proc.running:
fds.append(proc.p.stdout)
def update_proc_pending(self, proc):
self.procs_pending.append(proc)
self.taskloop.procs_pending.append(proc)
if os.name == "posix":
try:
select(fds, [], [], 1.0) == ([], [], [])
except InterruptedError:
pass
else:
sleep(0.1)
def update_proc_running(self, proc):
self.procs_pending.remove(proc)
self.taskloop.procs_pending.remove(proc)
for proc in self.procs_running:
proc.poll()
self.procs_running.append(proc)
self.taskloop.procs_running.append(proc)
all_procs_running.append(proc)
for proc in self.procs_pending:
proc.poll()
def update_proc_stopped(self, proc):
self.procs_running.remove(proc)
self.taskloop.procs_running.remove(proc)
all_procs_running.remove(proc)
if self.opt_timeout is not None:
total_clock_time = int(monotonic() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all subprocesses.")
self.status = "TIMEOUT"
self.terminate(timeout=True)
def update_proc_canceled(self, proc):
self.procs_pending.remove(proc)
self.taskloop.procs_pending.remove(proc)
def log(self, logmessage):
tm = localtime()
@ -632,8 +689,17 @@ class SbyTask(SbyConfig):
return self.models[model_name]
def terminate(self, timeout=False):
if timeout:
self.timeout_reached = True
for proc in list(self.procs_running):
proc.terminate(timeout=timeout)
for proc in list(self.procs_pending):
proc.terminate(timeout=timeout)
def proc_failed(self, proc):
# proc parameter used by autotune override
self.status = "ERROR"
self.terminate()
def update_status(self, new_status):
assert new_status in ["PASS", "FAIL", "UNKNOWN", "ERROR"]
@ -659,6 +725,11 @@ class SbyTask(SbyConfig):
assert 0
def run(self, setupmode):
self.setup_procs(setupmode)
if not setupmode:
self.taskloop.run()
def setup_procs(self, setupmode):
with open(f"{self.workdir}/config.sby", "r") as f:
self.parse_config(f)
@ -732,8 +803,7 @@ class SbyTask(SbyConfig):
if opt not in self.used_options:
self.error(f"Unused option: {opt}")
self.taskloop()
def summarize(self):
total_clock_time = int(monotonic() - self.start_clock_time)
if os.name == "posix":
@ -772,6 +842,9 @@ class SbyTask(SbyConfig):
for line in self.summary:
print(line, file=f)
def exit_callback(self):
self.summarize()
def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False):
junit_time = strftime('%Y-%m-%dT%H:%M:%S')
if self.precise_prop_status: