3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-05 22:14:08 +00:00

Automatic engine selection

This commit is contained in:
Jannis Harder 2022-04-25 12:27:18 +02:00
parent 5014d74023
commit b4458d43d7
7 changed files with 731 additions and 21 deletions

View file

@ -46,6 +46,10 @@ parser.add_argument("-T", metavar="<taskname>", action="append", dest="tasknames
help="add taskname (useful when sby file is read from stdin)")
parser.add_argument("-E", action="store_true", dest="throw_err",
help="throw an exception (incl stack trace) for most errors")
parser.add_argument("--autotune", action="store_true", dest="autotune",
help="automatically find a well performing engine and engine configuration for each task")
parser.add_argument("--autotune-config", dest="autotune_config",
help="read an autotune configuration file (overrides the sby file's autotune options)")
parser.add_argument("--yosys", metavar="<path_to_executable>",
action=DictAction, dest="exe_paths")
@ -108,6 +112,8 @@ dump_taskinfo = args.dump_taskinfo
dump_files = args.dump_files
reusedir = False
setupmode = args.setupmode
autotune = args.autotune
autotune_config = args.autotune_config
init_config_file = args.init_config_file
if sbyfile is not None:
@ -462,7 +468,11 @@ def run_task(taskname):
task.exe_paths[k] = v
try:
task.run(setupmode)
if autotune:
import sby_autotune
sby_autotune.SbyAutotune(task, autotune_config).run()
else:
task.run(setupmode)
except SbyAbort:
if throw_err:
raise
@ -477,7 +487,7 @@ def run_task(taskname):
task.log(f"DONE ({task.status}, rc={task.retcode})")
task.logfile.close()
if not my_opt_tmpdir and not setupmode:
if not my_opt_tmpdir and not setupmode and not autotune:
with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f:
task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False)

679
sbysrc/sby_autotune.py Normal file
View file

@ -0,0 +1,679 @@
#
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
#
# Copyright (C) 2022 Jannis Harder <jix@yosyshq.com> <me@jix.one>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import os
import re
import subprocess
from shutil import rmtree, which
from time import monotonic
from sby_core import SbyAbort, SbyTask
class SbyAutotuneConfig:
"""Autotune configuration parsed from the sby file or an external autotune config
file.
"""
def __init__(self):
self.timeout = None
self.soft_timeout = 60
self.wait_percentage = 50
self.wait_seconds = 10
self.parallel = "auto"
self.presat = None
self.incr = "auto"
self.incr_threshold = 20
self.mem = "auto"
self.mem_threshold = 10240
self.forall = "auto"
def config_line(self, log, line, file_kind="sby file"):
option, *arg = line.split(None, 1)
if not arg:
log.error(f"{file_kind} syntax error: {line}")
arg = arg[0].strip()
BOOL_OR_ANY = {"on": True, "off": False, "any": None}
BOOL_ANY_OR_AUTO = {"on": True, "off": False, "any": None, "auto": "auto"}
ON_ANY_OR_AUTO = {"on": True, "any": None, "auto": "auto"}
def enum_option(values):
if arg not in values:
values_str = ', '.join(repr(value) for value in sorted(values))
log.error(f"{file_kind}: invalid value '{arg}' for autotune option {option}, valid values are: {values_str}")
return values[arg]
def int_option():
try:
return int(arg)
except ValueError:
log.error(f"{file_kind}: invalid value '{arg}' for autotune option {option}, expected an integer value")
if option == "timeout":
self.timeout = "none" if arg == "none" else int_option()
elif option == "soft_timeout":
self.soft_timeout = int_option()
elif option == "wait":
self.wait_percentage = 0
self.wait_seconds = 0
for part in arg.split("+"):
part = part.strip()
if part.endswith("%"):
self.wait_percentage += int(part[:-1].strip())
else:
self.wait_seconds += int(part)
elif option == "parallel":
self.parallel = "auto" if arg == "auto" else int_option()
elif option == "presat":
self.presat = enum_option(BOOL_OR_ANY)
elif option == "incr":
self.incr = enum_option(BOOL_ANY_OR_AUTO)
elif option == "incr_threshold":
self.incr_threshold = int_option()
elif option == "mem":
self.mem = enum_option(ON_ANY_OR_AUTO)
elif option == "mem_threshold":
self.mem_threshold = int_option()
elif option == "forall":
self.forall = enum_option(ON_ANY_OR_AUTO)
else:
log.error(f"{file_kind} syntax error: {line}")
def parse_file(self, log, file):
for line in file:
line = re.sub(r"\s*(\s#.*)?$", "", line)
if line == "" or line[0] == "#":
continue
self.config_line(log, line.rstrip(), "autotune configuration file")
class SbyAutotuneCandidate:
"""An engine configuration to try and its current state during autotuning.
"""
def __init__(self, autotune, engine):
self.autotune = autotune
self.engine = engine
self.state = "pending"
self.engine_idx = None
self.info = f"{' '.join(self.engine)}:"
self.suspended = 0
self.suspend = 1
self.engine_retcode = None
self.engine_status = None
self.total_adjusted_time = None
self.soft_timeout = self.autotune.config.soft_timeout
if tuple(self.engine) not in self.autotune.candidate_engines:
self.autotune.active_candidates.append(self)
self.autotune.candidate_engines.add(tuple(self.engine))
def set_engine_idx(self, idx):
self.engine_idx = idx
self.info = f"engine_{idx} ({' '.join(self.engine)}):"
def set_running(self):
assert not self.suspended
assert self.state == "pending"
assert self in self.autotune.active_candidates
self.state = "running"
def retry_later(self):
assert self.state == "running"
assert self in self.autotune.active_candidates
self.state = "pending"
self.soft_timeout *= 2
self.suspended = self.suspend
def timed_out(self):
assert self.state == "running"
self.autotune.active_candidates.remove(self)
self.state = "timeout"
def failed(self):
assert self.state == "running"
self.autotune.active_candidates.remove(self)
self.autotune.failed_candidates.append(self)
self.state = "failed"
def finished(self):
assert self.state == "running"
self.autotune.active_candidates.remove(self)
self.autotune.finished_candidates.append(self)
self.state = "finished"
def threads(self):
if self.autotune.config.mode == "prove" and self.engine[0] == "smtbmc":
return 2
return 1
class SbyAutotune:
"""Performs automatic engine selection for a given task.
"""
def __init__(self, task, config_file=None):
task.exit_callback = lambda: None
task.check_timeout = lambda: None
task.status = "TIMEOUT"
task.retcode = 8
task.proc_failed = self.proc_failed
self.config = None
if config_file:
with open(config_file) as config:
self.config.parse_file(task, config)
self.task = task
self.done = False
self.threads_running = 0
self.next_engine_idx = 0
self.model_requests = {}
self.timeout = None
self.best_time = None
self.have_pending_candidates = False
self.active_candidates = []
self.finished_candidates = []
self.failed_candidates = []
self.candidate_engines = set()
def available(self, tool):
if not which(tool):
return False
if tool == "btorsim":
error_msg = subprocess.run(
["btorsim", "--vcd"],
capture_output=True,
text=True,
).stderr
if "invalid command line option" in error_msg:
self.log('found version of "btorsim" is too old and does not support the --vcd option')
return False
return True
def candidate(self, *engine):
flat_engine = []
def flatten(part):
if part is None:
return
elif isinstance(part, (tuple, list)):
for subpart in part:
flatten(subpart)
else:
flat_engine.append(part)
flatten(engine)
SbyAutotuneCandidate(self, flat_engine)
def configure(self):
self.config.mode = self.task.opt_mode
self.config.skip = self.task.opt_skip
if self.config.incr == "auto":
self.config.incr = None
if self.config.mode != "live":
steps = self.task.opt_depth - (self.config.skip or 0)
if steps > self.config.incr_threshold:
self.log(f"checking more than {self.config.incr_threshold} timesteps ({steps}), disabling nonincremental smtbmc")
self.config.incr = True
if self.config.mem == "auto":
self.config.mem = None
if self.task.design is None:
self.log("warning: unknown amount of memory bits in design")
elif self.task.design.memory_bits > self.config.mem_threshold:
self.log(
f"more than {self.config.mem_threshold} bits of memory in design ({self.task.design.memory_bits} bits), "
"disabling engines without native memory support"
)
self.config.mem = True
if self.config.forall == "auto":
self.config.forall = None
if self.task.design.forall:
self.log("design uses $allconst/$allseq, disabling engines without forall support")
self.config.forall = True
if self.config.mode not in ["bmc", "prove"]:
self.config.presat = None
if self.config.parallel == "auto":
try:
self.config.parallel = len(os.sched_getaffinity(0))
except AttributeError:
self.config.parallel = os.cpu_count() # TODO is this correct?
if self.config.timeout is None:
self.config.timeout = self.task.opt_timeout
elif self.config.timeout == "none":
self.config.timeout = None
def build_candidates(self):
if self.config.mode == "live":
# Not much point in autotuning here...
self.candidate("aiger", "suprove")
return
if self.config.presat is None:
presat_flags = [None, "--nopresat"]
elif self.config.presat:
presat_flags = [None]
else:
presat_flags = ["--nopresat"]
if self.config.incr is None:
noincr_flags = [None, ["--", "--noincr"]]
elif self.config.incr:
noincr_flags = [None]
else:
noincr_flags = [["--", "--noincr"]]
if self.config.forall:
self.log('disabling engines "smtbmc boolector" and "smtbmc bitwuzla" as they do not support forall')
else:
for solver in ["boolector", "bitwuzla"]:
if not self.available(solver):
self.log(f'disabling engine "smtbmc {solver}" as the solver "{solver}" was not found')
continue
for noincr in noincr_flags:
for presat in presat_flags:
self.candidate("smtbmc", presat, solver, noincr)
if not self.available("btorsim"):
self.log('disabling engine "btor" as the "btorsim" tool was not found')
elif self.config.forall:
self.log('disabling engine "btor" as it does not support forall')
else:
if self.config.mode in ["bmc", "cover"]:
if not self.available("btormc"):
self.log('disabling engine "btor btormc" as the "btormc" tool was not found')
elif self.config.presat:
self.log('disabling engine "btor btormc" as it does not support presat checking')
else:
self.candidate("btor", "btormc")
if self.config.mode == "bmc":
if not self.available("pono"):
self.log('disabling engine "btor btormc" as the "btormc" tool was not found')
elif self.config.presat:
self.log('disabling engine "btor pono" as it does not support presat checking')
elif self.config.skip:
self.log('disabling engine "btor pono" as it does not support the "skip" option')
else:
self.candidate("btor", "pono")
for solver in ["yices", "z3"]:
if not self.available(solver):
self.log(f'disabling engine "smtbmc {solver}" as the solver "{solver}" was not found')
continue
for unroll in ["--unroll", "--nounroll"]:
if solver == "yices" and self.config.forall:
self.log('disabling engine "smtbmc yices" due to limited forall support')
# TODO yices implicitly uses --noincr for forall problems and
# requires --stbv which does not play well with memory, still test it?
continue
stmode = "--stdt" if self.config.forall else None
for noincr in noincr_flags:
for presat in presat_flags:
self.candidate("smtbmc", presat, stmode, unroll, solver, noincr)
if self.config.mode not in ["bmc", "prove"]:
pass
elif self.config.presat:
self.log('disabling engines "abc" and "aiger" as they do not support presat checking')
elif self.config.forall:
self.log('disabling engines "abc" and "aiger" as they do not support forall')
elif self.config.mem:
self.log('disabling engines "abc" and "aiger" as they do not support memory')
elif self.config.skip:
self.log('disabling engines "abc" and "aiger" as they do not support the "skip" option')
elif self.config.mode == "bmc":
self.candidate("abc", "bmc3")
if not self.available("aigbmc"):
self.log('disabling engine "aiger aigbmc" as the "aigbmc" tool was not found')
else:
self.candidate("aiger", "aigbmc")
# abc sim3 will never finish
elif self.config.mode == "prove":
self.candidate("abc", "pdr")
if not self.available("suprove"):
self.log('disabling engine "aiger suprove" as the "suprove" tool was not found')
else:
self.candidate("aiger", "suprove")
# avy seems to crash in the presence of assumptions
def log(self, message):
self.task.log(message)
def run(self):
self.task.handle_non_engine_options()
self.config = self.task.autotune_config or SbyAutotuneConfig()
if "expect" not in self.task.options:
self.task.expect = ["PASS", "FAIL"]
# TODO check that solvers produce consistent results?
if "TIMEOUT" in self.task.expect:
self.task.error("cannot autotune a task with option 'expect timeout'")
if self.task.reusedir:
rmtree(f"{self.task.workdir}/model", ignore_errors=True)
else:
self.task.copy_src()
self.model(None, "base")
self.task.taskloop.run()
if self.task.status == "ERROR":
return
self.configure()
self.build_candidates()
if not self.active_candidates:
self.error("no supported engines found for the current configuration and design")
self.log(f"testing {len(self.active_candidates)} engine configurations...")
self.start_engines()
self.task.taskloop.run()
self.finished_candidates.sort(key=lambda candidate: candidate.total_adjusted_time)
if self.failed_candidates:
self.log("failed engines:")
for candidate in self.failed_candidates:
self.log(
f" engine_{candidate.engine_idx}: {' '.join(candidate.engine)}"
f" (returncode={candidate.engine_retcode} status={candidate.engine_status})"
)
if self.finished_candidates:
self.log("finished engines:")
for place, candidate in list(enumerate(self.finished_candidates, 1))[::-1]:
self.log(
f" #{place}: engine_{candidate.engine_idx}: {' '.join(candidate.engine)}"
f" ({candidate.total_adjusted_time} seconds, status={candidate.engine_status})"
)
if self.finished_candidates:
self.task.status = "AUTOTUNED"
self.task.retcode = 0
elif self.failed_candidates:
self.task.status = "FAIL"
self.task.retcode = 2
def next_candidate(self, peek=False):
# peek=True is used to check whether we need to timeout running candidates to
# give other candidates a chance.
can_retry = None
for candidate in self.active_candidates:
if candidate.state == "pending":
if not candidate.suspended:
return candidate
if can_retry is None or can_retry.suspended > candidate.suspended:
can_retry = candidate
if can_retry and not peek:
shift = can_retry.suspended
for candidate in self.active_candidates:
if candidate.state == "pending":
candidate.suspended -= shift
return can_retry
def start_engines(self):
self.task.taskloop.poll_now = True
while self.threads_running < self.config.parallel:
candidate = self.next_candidate()
if candidate is None:
self.have_pending_candidates = False
return
candidate_threads = candidate.threads()
if self.threads_running:
if self.threads_running + candidate_threads > self.config.parallel:
break
candidate.set_running()
candidate.set_engine_idx(self.next_engine_idx)
self.next_engine_idx += 1
try:
engine_task = SbyAutotuneTask(self, candidate)
pending = sum(c.state == "pending" for c in self.active_candidates)
self.log(f"{candidate.info} starting... ({pending} configurations pending)")
self.threads_running += candidate_threads
engine_task.setup_procs(False)
except SbyAbort:
pass
self.have_pending_candidates = bool(self.next_candidate(peek=True))
def engine_finished(self, engine_task):
self.threads_running -= engine_task.candidate.threads()
candidate = engine_task.candidate
time = candidate.total_adjusted_time
if engine_task.status == "TIMEOUT":
if self.timeout is None or time < self.timeout:
candidate.retry_later()
self.log(f"{candidate.info} timeout ({time} seconds, will be retried if necessary)")
else:
candidate.timed_out()
self.log(f"{candidate.info} timeout ({time} seconds)")
elif engine_task.retcode:
candidate.failed()
self.log(f"{candidate.info} failed (returncode={candidate.engine_retcode} status={candidate.engine_status})")
else:
candidate.finished()
self.log(f"{candidate.info} succeeded (status={candidate.engine_status})")
if self.best_time is None:
self.log(f"{candidate.info} took {time} seconds (first engine to finish)")
self.best_time = time
elif time < self.best_time:
self.log(f"{candidate.info} took {time} seconds (best candidate, previous best: {self.best_time} seconds)")
self.best_time = time
else:
self.log(f"{candidate.info} took {time} seconds")
new_timeout = int(time + self.config.wait_seconds + time * self.config.wait_percentage // 100)
if self.timeout is None or new_timeout < self.timeout:
self.timeout = new_timeout
self.start_engines()
def model(self, engine_task, name):
if self.task not in self.task.taskloop.tasks:
self.task.taskloop.tasks.append(self.task)
if name in self.model_requests:
request = self.model_requests[name]
else:
self.model_requests[name] = request = SbyModelRequest(self, name)
request.attach_engine_task(engine_task)
return request.procs
def proc_failed(self, proc):
for name, request in self.model_requests.items():
if proc in request.procs:
for task in request.engine_tasks:
task = task or self.task
task.status = "ERROR"
task.log(f"could not prepare model '{name}', see toplevel logfile")
task.terminate()
pass
class SbyModelRequest:
"""Handles sharing and canceling of model generation from several SbyAutotuneTask
instances.
"""
def __init__(self, autotune, name):
self.autotune = autotune
self.name = name
self.engine_tasks = []
autotune.log(f"model '{name}': preparing now...")
self.make_model()
def make_model(self):
self.start_time = monotonic()
self.total_time = None
self.min_time = 0
self.procs = self.autotune.task.model(self.name)
for proc in self.procs:
proc.register_dep(self)
def attach_engine_task(self, engine_task):
if self.total_time is None:
if engine_task:
if self.start_time is None:
model_time = 0
extra_time = self.min_time
else:
model_time = monotonic() - self.start_time
extra_time = max(0, self.min_time - model_time)
engine_task.model_time += model_time
engine_task.check_timeout(extra_time)
if self.start_time is None:
self.make_model()
self.engine_tasks.append(engine_task)
if engine_task:
engine_task.model_requests.append(self)
else:
if engine_task:
engine_task.model_time += self.total_time
def detach_engine_task(self, engine_task):
self.engine_tasks.remove(engine_task)
if not self.engine_tasks and self.total_time is None:
self.autotune.log(f"cancelled model '{self.name}'")
del self.autotune.task.models[self.name]
for proc in self.procs:
proc.terminate(True)
self.min_time = max(self.min_time, monotonic() - self.start_time)
self.start_time = None
self.procs = []
def poll(self):
if self.total_time is None and all(proc.finished for proc in self.procs):
self.autotune.log(f"prepared model '{self.name}'")
self.total_time = self.min_time = monotonic() - self.start_time
class SbyAutotuneTask(SbyTask):
"""Task that shares the workdir with a parent task, runs in parallel to other
autotune tasks and can be cancelled independent from other autotune tasks while
sharing model generation with other tasks.
"""
def __init__(self, autotune, candidate):
task = autotune.task
self.autotune = autotune
self.candidate = candidate
super().__init__(
sbyconfig=None,
workdir=task.workdir,
early_logs=[],
reusedir=True,
taskloop=task.taskloop,
logfile=open(f"{task.workdir}/engine_{candidate.engine_idx}_autotune.txt", "a"),
)
self.task_local_abort = True
self.log_targets = [self.logfile]
self.exe_paths = autotune.task.exe_paths
self.reusedir = False
self.design = autotune.task.design
self.model_time = 0
self.model_requests = []
def parse_config(self, f):
super().parse_config(f)
self.engines = []
def engine_list(self):
return [(self.candidate.engine_idx, self.candidate.engine)]
def copy_src(self):
pass
def model(self, model_name):
self.log(f"using model '{model_name}'")
return self.autotune.model(self, model_name)
def exit_callback(self):
super().exit_callback()
self.candidate.total_adjusted_time = int(monotonic() - self.start_clock_time + self.model_time)
self.candidate.engine_retcode = self.retcode
self.candidate.engine_status = self.status
self.autotune.engine_finished(self)
for request in self.model_requests:
request.detach_engine_task(self)
def check_timeout(self, extra_time=0):
model_time = self.model_time + extra_time
total_adjusted_time = int(monotonic() - self.start_clock_time + model_time)
if self.autotune.timeout is not None:
timeout = self.autotune.timeout
else:
if not self.autotune.have_pending_candidates:
return
timeout = self.candidate.soft_timeout
if not self.timeout_reached and total_adjusted_time >= timeout:
self.log(f"Reached autotune TIMEOUT ({timeout} seconds). Terminating all subprocesses.")
self.status = "TIMEOUT"
self.total_adjusted_time = total_adjusted_time
self.terminate(timeout=True)

View file

@ -233,6 +233,7 @@ class SbyConfig:
self.options = dict()
self.engines = list()
self.script = list()
self.autotune_config = None
self.files = dict()
self.verbatim_files = dict()
pass
@ -242,7 +243,7 @@ class SbyConfig:
for line in f:
raw_line = line
if mode in ["options", "engines", "files"]:
if mode in ["options", "engines", "files", "autotune"]:
line = re.sub(r"\s*(\s#.*)?$", "", line)
if line == "" or line[0] == "#":
continue
@ -275,6 +276,15 @@ class SbyConfig:
self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "autotune":
mode = "autotune"
if self.autotune_config:
self.error(f"sby file syntax error: {line}")
import sby_autotune
self.autotune_config = sby_autotune.SbyAutotuneConfig()
continue
if entries[0] == "file":
mode = "file"
if len(entries) != 2:
@ -300,6 +310,10 @@ class SbyConfig:
self.options[entries[0]] = entries[1]
continue
if mode == "autotune":
self.autotune_config.config_line(self, line)
continue
if mode == "engines":
entries = line.split()
self.engines.append(entries)
@ -376,7 +390,7 @@ class SbyTaskloop:
class SbyTask(SbyConfig):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None, logfile=None):
super().__init__()
self.used_options = set()
self.models = dict()
@ -416,7 +430,8 @@ class SbyTask(SbyConfig):
self.summary = list()
self.logfile = open(f"{workdir}/logfile.txt", "a")
self.logfile = logfile or open(f"{workdir}/logfile.txt", "a")
self.log_targets = [sys.stdout, self.logfile]
for line in early_logs:
print(line, file=self.logfile, flush=True)
@ -426,6 +441,9 @@ class SbyTask(SbyConfig):
for line in sbyconfig:
print(line, file=f)
def engine_list(self):
return list(enumerate(self.engines))
def check_timeout(self):
if self.opt_timeout is not None:
total_clock_time = int(monotonic() - self.start_clock_time)
@ -457,13 +475,13 @@ class SbyTask(SbyConfig):
def log(self, logmessage):
tm = localtime()
print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True)
print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), file=self.logfile, flush=True)
line = "SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage)
for target in self.log_targets:
print(line, file=target, flush=True)
def error(self, logmessage):
tm = localtime()
print("SBY {:2d}:{:02d}:{:02d} [{}] ERROR: {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True)
print("SBY {:2d}:{:02d}:{:02d} [{}] ERROR: {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), file=self.logfile, flush=True)
self.log(f"ERROR: {logmessage}")
self.status = "ERROR"
if "ERROR" not in self.expect:
self.retcode = 16
@ -728,8 +746,9 @@ class SbyTask(SbyConfig):
self.setup_procs(setupmode)
if not setupmode:
self.taskloop.run()
self.write_summary_file()
def setup_procs(self, setupmode):
def handle_non_engine_options(self):
with open(f"{self.workdir}/config.sby", "r") as f:
self.parse_config(f)
@ -747,6 +766,9 @@ class SbyTask(SbyConfig):
if s not in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]:
self.error(f"Invalid expect value: {s}")
if self.opt_mode != "live":
self.handle_int_option("depth", 20)
self.handle_bool_option("multiclock", False)
self.handle_bool_option("wait", False)
self.handle_int_option("timeout", None)
@ -755,8 +777,10 @@ class SbyTask(SbyConfig):
self.handle_int_option("skip", None)
self.handle_str_option("tbtop", None)
def setup_procs(self, setupmode):
self.handle_non_engine_options()
if self.opt_smtc is not None:
for engine in self.engines:
for engine_idx, engine in self.engine_list():
if engine[0] != "smtbmc":
self.error("Option smtc is only valid for smtbmc engine.")
@ -764,11 +788,11 @@ class SbyTask(SbyConfig):
if self.opt_skip == 0:
self.opt_skip = None
else:
for engine in self.engines:
for engine_idx, engine in self.engine_list():
if engine[0] not in ["smtbmc", "btor"]:
self.error("Option skip is only valid for smtbmc and btor engines.")
if len(self.engines) == 0:
if len(self.engine_list()) == 0:
self.error("Config file is lacking engine configuration.")
if self.reusedir:
@ -838,6 +862,7 @@ class SbyTask(SbyConfig):
if self.status == "TIMEOUT": self.retcode = 8
if self.status == "ERROR": self.retcode = 16
def write_summary_file(self):
with open(f"{self.workdir}/{self.status}", "w") as f:
for line in self.summary:
print(line, file=f)

View file

@ -24,8 +24,7 @@ def run(task):
task.handle_int_option("append", 0)
task.handle_str_option("aigsmt", "yices")
for engine_idx in range(len(task.engines)):
engine = task.engines[engine_idx]
for engine_idx, engine in task.engine_list():
assert len(engine) > 0
task.log(f"""engine_{engine_idx}: {" ".join(engine)}""")

View file

@ -23,8 +23,7 @@ def run(task):
task.handle_int_option("depth", 20)
task.handle_int_option("append", 0)
for engine_idx in range(len(task.engines)):
engine = task.engines[engine_idx]
for engine_idx, engine in task.engine_list():
assert len(engine) > 0
task.log(f"""engine_{engine_idx}: {" ".join(engine)}""")

View file

@ -24,8 +24,7 @@ def run(task):
task.status = "UNKNOWN"
for engine_idx in range(len(task.engines)):
engine = task.engines[engine_idx]
for engine_idx, engine in task.engine_list():
assert len(engine) > 0
task.log(f"""engine_{engine_idx}: {" ".join(engine)}""")

View file

@ -31,8 +31,7 @@ def run(task):
task.basecase_procs = list()
task.induction_procs = list()
for engine_idx in range(len(task.engines)):
engine = task.engines[engine_idx]
for engine_idx, engine in task.engine_list():
assert len(engine) > 0
task.log(f"""engine_{engine_idx}: {" ".join(engine)}""")