mirror of
https://github.com/YosysHQ/sby.git
synced 2025-04-05 22:14:08 +00:00
The goal of this is to make sure that all backend flows are compatible and we can map between them, so that e.g. the aiger model can be used to minimize a counterexample trace produced by smtbmc. Reducing the parts that differ per backend (including parts that receive different input depending on the used backend) also makes testing more effective as the common parts are easier to cover.
1010 lines
39 KiB
Python
1010 lines
39 KiB
Python
#
|
|
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
|
|
#
|
|
# Copyright (C) 2016 Claire Xenia Wolf <claire@yosyshq.com>
|
|
#
|
|
# Permission to use, copy, modify, and/or distribute this software for any
|
|
# purpose with or without fee is hereby granted, provided that the above
|
|
# copyright notice and this permission notice appear in all copies.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
#
|
|
|
|
import os, re, sys, signal, platform
|
|
if os.name == "posix":
|
|
import resource, fcntl
|
|
import subprocess
|
|
from shutil import copyfile, copytree, rmtree
|
|
from select import select
|
|
from time import monotonic, localtime, sleep, strftime
|
|
from sby_design import SbyProperty, SbyModule, design_hierarchy
|
|
|
|
all_procs_running = []
|
|
|
|
def force_shutdown(signum, frame):
|
|
print("SBY ---- Keyboard interrupt or external termination signal ----", flush=True)
|
|
for proc in list(all_procs_running):
|
|
proc.terminate()
|
|
sys.exit(1)
|
|
|
|
if os.name == "posix":
|
|
signal.signal(signal.SIGHUP, force_shutdown)
|
|
signal.signal(signal.SIGINT, force_shutdown)
|
|
signal.signal(signal.SIGTERM, force_shutdown)
|
|
|
|
def process_filename(filename):
|
|
if filename.startswith("~/"):
|
|
filename = os.environ['HOME'] + filename[1:]
|
|
|
|
filename = os.path.expandvars(filename)
|
|
|
|
return filename
|
|
|
|
class SbyProc:
|
|
def __init__(self, task, info, deps, cmdline, logfile=None, logstderr=True, silent=False):
|
|
self.running = False
|
|
self.finished = False
|
|
self.terminated = False
|
|
self.exited = False
|
|
self.checkretcode = False
|
|
self.retcodes = [0]
|
|
self.task = task
|
|
self.info = info
|
|
self.deps = deps
|
|
if os.name == "posix":
|
|
self.cmdline = cmdline
|
|
else:
|
|
# Windows command interpreter equivalents for sequential
|
|
# commands (; => &) command grouping ({} => ()).
|
|
replacements = {
|
|
";" : "&",
|
|
"{" : "(",
|
|
"}" : ")",
|
|
}
|
|
parts = cmdline.split("'")
|
|
for i in range(len(parts)):
|
|
if i % 2 == 0:
|
|
cmdline_copy = parts[i]
|
|
for u, w in replacements.items():
|
|
cmdline_copy = cmdline_copy.replace(u, w)
|
|
parts[i] = cmdline_copy
|
|
self.cmdline = '"'.join(parts)
|
|
self.logfile = logfile
|
|
self.noprintregex = None
|
|
self.notify = []
|
|
self.linebuffer = ""
|
|
self.logstderr = logstderr
|
|
self.silent = silent
|
|
|
|
self.task.update_proc_pending(self)
|
|
|
|
for dep in self.deps:
|
|
dep.register_dep(self)
|
|
|
|
self.output_callback = None
|
|
self.exit_callback = None
|
|
self.error_callback = None
|
|
|
|
if self.task.timeout_reached:
|
|
self.terminate(True)
|
|
|
|
def register_dep(self, next_proc):
|
|
if self.finished:
|
|
next_proc.poll()
|
|
else:
|
|
self.notify.append(next_proc)
|
|
|
|
def log(self, line):
|
|
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
|
|
if self.logfile is not None:
|
|
print(line, file=self.logfile)
|
|
self.task.log(f"{self.info}: {line}")
|
|
|
|
def handle_output(self, line):
|
|
if self.terminated or len(line) == 0:
|
|
return
|
|
if self.output_callback is not None:
|
|
line = self.output_callback(line)
|
|
self.log(line)
|
|
|
|
def handle_exit(self, retcode):
|
|
if self.terminated:
|
|
return
|
|
if self.logfile is not None:
|
|
self.logfile.close()
|
|
if self.exit_callback is not None:
|
|
self.exit_callback(retcode)
|
|
|
|
def handle_error(self, retcode):
|
|
if self.terminated:
|
|
return
|
|
if self.logfile is not None:
|
|
self.logfile.close()
|
|
if self.error_callback is not None:
|
|
self.error_callback(retcode)
|
|
|
|
def terminate(self, timeout=False):
|
|
if self.task.opt_wait and not timeout:
|
|
return
|
|
if self.running:
|
|
if not self.silent:
|
|
self.task.log(f"{self.info}: terminating process")
|
|
if os.name == "posix":
|
|
try:
|
|
os.killpg(self.p.pid, signal.SIGTERM)
|
|
except PermissionError:
|
|
pass
|
|
self.p.terminate()
|
|
self.task.update_proc_stopped(self)
|
|
elif not self.finished and not self.terminated and not self.exited:
|
|
self.task.update_proc_canceled(self)
|
|
self.terminated = True
|
|
|
|
def poll(self, force_unchecked=False):
|
|
if self.task.task_local_abort and not force_unchecked:
|
|
try:
|
|
self.poll(True)
|
|
except SbyAbort:
|
|
self.task.terminate(True)
|
|
return
|
|
if self.finished or self.terminated or self.exited:
|
|
return
|
|
|
|
if not self.running:
|
|
for dep in self.deps:
|
|
if not dep.finished:
|
|
return
|
|
|
|
if not self.silent:
|
|
self.task.log(f"{self.info}: starting process \"{self.cmdline}\"")
|
|
|
|
if os.name == "posix":
|
|
def preexec_fn():
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
os.setpgrp()
|
|
|
|
self.p = subprocess.Popen(["/usr/bin/env", "bash", "-c", self.cmdline], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
|
|
stderr=(subprocess.STDOUT if self.logstderr else None), preexec_fn=preexec_fn)
|
|
|
|
fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL)
|
|
fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
|
|
else:
|
|
self.p = subprocess.Popen(self.cmdline + " & exit !errorlevel!", shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
|
|
stderr=(subprocess.STDOUT if self.logstderr else None))
|
|
|
|
self.task.update_proc_running(self)
|
|
self.running = True
|
|
return
|
|
|
|
self.read_output()
|
|
|
|
if self.p.poll() is not None:
|
|
# The process might have written something since the last time we checked
|
|
self.read_output()
|
|
|
|
if not self.silent:
|
|
self.task.log(f"{self.info}: finished (returncode={self.p.returncode})")
|
|
self.task.update_proc_stopped(self)
|
|
self.running = False
|
|
self.exited = True
|
|
|
|
if os.name == "nt":
|
|
if self.p.returncode == 9009:
|
|
returncode = 127
|
|
else:
|
|
returncode = self.p.returncode & 0xff
|
|
else:
|
|
returncode = self.p.returncode
|
|
|
|
if returncode == 127:
|
|
if not self.silent:
|
|
self.task.log(f"{self.info}: COMMAND NOT FOUND. ERROR.")
|
|
self.handle_error(returncode)
|
|
self.terminated = True
|
|
self.task.proc_failed(self)
|
|
return
|
|
|
|
if self.checkretcode and returncode not in self.retcodes:
|
|
if not self.silent:
|
|
self.task.log(f"{self.info}: task failed. ERROR.")
|
|
self.handle_error(returncode)
|
|
self.terminated = True
|
|
self.task.proc_failed(self)
|
|
return
|
|
|
|
self.handle_exit(returncode)
|
|
|
|
self.finished = True
|
|
for next_proc in self.notify:
|
|
next_proc.poll()
|
|
return
|
|
|
|
def read_output(self):
|
|
while True:
|
|
outs = self.p.stdout.readline().decode("utf-8")
|
|
if len(outs) == 0: break
|
|
if outs[-1] != '\n':
|
|
self.linebuffer += outs
|
|
break
|
|
outs = (self.linebuffer + outs).strip()
|
|
self.linebuffer = ""
|
|
self.handle_output(outs)
|
|
|
|
|
|
class SbyAbort(BaseException):
|
|
pass
|
|
|
|
|
|
class SbyConfig:
|
|
def __init__(self):
|
|
self.options = dict()
|
|
self.engines = list()
|
|
self.script = list()
|
|
self.autotune_config = None
|
|
self.files = dict()
|
|
self.verbatim_files = dict()
|
|
pass
|
|
|
|
def parse_config(self, f):
|
|
mode = None
|
|
|
|
for line in f:
|
|
raw_line = line
|
|
if mode in ["options", "engines", "files", "autotune"]:
|
|
line = re.sub(r"\s*(\s#.*)?$", "", line)
|
|
if line == "" or line[0] == "#":
|
|
continue
|
|
else:
|
|
line = line.rstrip()
|
|
# print(line)
|
|
if mode is None and (len(line) == 0 or line[0] == "#"):
|
|
continue
|
|
match = re.match(r"^\s*\[(.*)\]\s*$", line)
|
|
if match:
|
|
entries = match.group(1).strip().split(maxsplit = 1)
|
|
if len(entries) == 0:
|
|
self.error(f"sby file syntax error: Expected section header, got '{line}'")
|
|
elif len(entries) == 1:
|
|
section, args = (*entries, None)
|
|
else:
|
|
section, args = entries
|
|
|
|
if section == "options":
|
|
mode = "options"
|
|
if len(self.options) != 0:
|
|
self.error(f"sby file syntax error: '[options]' section already defined")
|
|
|
|
if args is not None:
|
|
self.error(f"sby file syntax error: '[options]' section does not accept any arguments. got {args}")
|
|
continue
|
|
|
|
if section == "engines":
|
|
mode = "engines"
|
|
if len(self.engines) != 0:
|
|
self.error(f"sby file syntax error: '[engines]' section already defined")
|
|
if args is not None:
|
|
self.error(f"sby file syntax error: '[engines]' section does not accept any arguments. got {args}")
|
|
continue
|
|
|
|
if section == "script":
|
|
mode = "script"
|
|
if len(self.script) != 0:
|
|
self.error(f"sby file syntax error: '[script]' section already defined")
|
|
if args is not None:
|
|
self.error(f"sby file syntax error: '[script]' section does not accept any arguments. got {args}")
|
|
|
|
continue
|
|
|
|
if section == "autotune":
|
|
mode = "autotune"
|
|
if self.autotune_config:
|
|
self.error(f"sby file syntax error: '[autotune]' section already defined")
|
|
|
|
import sby_autotune
|
|
self.autotune_config = sby_autotune.SbyAutotuneConfig()
|
|
continue
|
|
|
|
if section == "file":
|
|
mode = "file"
|
|
if args is None:
|
|
self.error(f"sby file syntax error: '[file]' section expects a file name argument")
|
|
|
|
section_args = args.split()
|
|
|
|
if len(section_args) > 1:
|
|
self.error(f"sby file syntax error: '[file]' section expects exactly one file name argument, got {len(section_args)}")
|
|
current_verbatim_file = section_args[0]
|
|
if current_verbatim_file in self.verbatim_files:
|
|
self.error(f"duplicate file: {current_verbatim_file}")
|
|
self.verbatim_files[current_verbatim_file] = list()
|
|
continue
|
|
|
|
if section == "files":
|
|
mode = "files"
|
|
if args is not None:
|
|
self.error(f"sby file syntax error: '[files]' section does not accept any arguments. got {args}")
|
|
continue
|
|
|
|
self.error(f"sby file syntax error: unexpected section '{section}', expected one of 'options, engines, script, autotune, file, files'")
|
|
|
|
if mode == "options":
|
|
entries = line.strip().split(maxsplit = 1)
|
|
if len(entries) != 2:
|
|
self.error(f"sby file syntax error: '[options]' section entry does not have an argument '{line}'")
|
|
self.options[entries[0]] = entries[1]
|
|
continue
|
|
|
|
if mode == "autotune":
|
|
self.autotune_config.config_line(self, line)
|
|
continue
|
|
|
|
if mode == "engines":
|
|
entries = line.split()
|
|
self.engines.append(entries)
|
|
continue
|
|
|
|
if mode == "script":
|
|
self.script.append(line)
|
|
continue
|
|
|
|
if mode == "files":
|
|
entries = line.split()
|
|
if len(entries) < 1 or len(entries) > 2:
|
|
self.error(f"sby file syntax error: '[files]' section entry expects up to 2 arguments, {len(entries)} specified")
|
|
|
|
if len(entries) == 1:
|
|
self.files[os.path.basename(entries[0])] = entries[0]
|
|
elif len(entries) == 2:
|
|
self.files[entries[0]] = entries[1]
|
|
|
|
continue
|
|
|
|
if mode == "file":
|
|
self.verbatim_files[current_verbatim_file].append(raw_line)
|
|
continue
|
|
|
|
self.error(f"sby file syntax error: In an incomprehensible mode '{mode}'")
|
|
|
|
def error(self, logmessage):
|
|
raise SbyAbort(logmessage)
|
|
|
|
|
|
class SbyTaskloop:
|
|
def __init__(self):
|
|
self.procs_pending = []
|
|
self.procs_running = []
|
|
self.tasks = []
|
|
self.poll_now = False
|
|
|
|
def run(self):
|
|
for proc in self.procs_pending:
|
|
proc.poll()
|
|
|
|
while len(self.procs_running) or self.poll_now:
|
|
fds = []
|
|
for proc in self.procs_running:
|
|
if proc.running:
|
|
fds.append(proc.p.stdout)
|
|
|
|
if not self.poll_now:
|
|
if os.name == "posix":
|
|
try:
|
|
select(fds, [], [], 1.0) == ([], [], [])
|
|
except InterruptedError:
|
|
pass
|
|
else:
|
|
sleep(0.1)
|
|
self.poll_now = False
|
|
|
|
for proc in self.procs_running:
|
|
proc.poll()
|
|
|
|
for proc in self.procs_pending:
|
|
proc.poll()
|
|
|
|
tasks = self.tasks
|
|
self.tasks = []
|
|
for task in tasks:
|
|
task.check_timeout()
|
|
if task.procs_pending or task.procs_running:
|
|
self.tasks.append(task)
|
|
else:
|
|
task.exit_callback()
|
|
|
|
for task in self.tasks:
|
|
task.exit_callback()
|
|
|
|
|
|
class SbyTask(SbyConfig):
|
|
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None, logfile=None):
|
|
super().__init__()
|
|
self.used_options = set()
|
|
self.models = dict()
|
|
self.workdir = workdir
|
|
self.reusedir = reusedir
|
|
self.status = "UNKNOWN"
|
|
self.total_time = 0
|
|
self.expect = list()
|
|
self.design = None
|
|
self.precise_prop_status = False
|
|
self.timeout_reached = False
|
|
self.task_local_abort = False
|
|
|
|
yosys_program_prefix = "" ##yosys-program-prefix##
|
|
self.exe_paths = {
|
|
"yosys": os.getenv("YOSYS", yosys_program_prefix + "yosys"),
|
|
"abc": os.getenv("ABC", yosys_program_prefix + "yosys-abc"),
|
|
"smtbmc": os.getenv("SMTBMC", yosys_program_prefix + "yosys-smtbmc"),
|
|
"suprove": os.getenv("SUPROVE", "suprove"),
|
|
"aigbmc": os.getenv("AIGBMC", "aigbmc"),
|
|
"avy": os.getenv("AVY", "avy"),
|
|
"btormc": os.getenv("BTORMC", "btormc"),
|
|
"pono": os.getenv("PONO", "pono"),
|
|
}
|
|
|
|
self.taskloop = taskloop or SbyTaskloop()
|
|
self.taskloop.tasks.append(self)
|
|
|
|
self.procs_running = []
|
|
self.procs_pending = []
|
|
|
|
self.start_clock_time = monotonic()
|
|
|
|
if os.name == "posix":
|
|
ru = resource.getrusage(resource.RUSAGE_CHILDREN)
|
|
self.start_process_time = ru.ru_utime + ru.ru_stime
|
|
|
|
self.summary = list()
|
|
|
|
self.logfile = logfile or open(f"{workdir}/logfile.txt", "a")
|
|
self.log_targets = [sys.stdout, self.logfile]
|
|
|
|
for line in early_logs:
|
|
print(line, file=self.logfile, flush=True)
|
|
|
|
if not reusedir:
|
|
with open(f"{workdir}/config.sby", "w") as f:
|
|
for line in sbyconfig:
|
|
print(line, file=f)
|
|
|
|
def engine_list(self):
|
|
return list(enumerate(self.engines))
|
|
|
|
def check_timeout(self):
|
|
if self.opt_timeout is not None:
|
|
total_clock_time = int(monotonic() - self.start_clock_time)
|
|
if total_clock_time > self.opt_timeout:
|
|
self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all subprocesses.")
|
|
self.status = "TIMEOUT"
|
|
self.terminate(timeout=True)
|
|
|
|
def update_proc_pending(self, proc):
|
|
self.procs_pending.append(proc)
|
|
self.taskloop.procs_pending.append(proc)
|
|
|
|
def update_proc_running(self, proc):
|
|
self.procs_pending.remove(proc)
|
|
self.taskloop.procs_pending.remove(proc)
|
|
|
|
self.procs_running.append(proc)
|
|
self.taskloop.procs_running.append(proc)
|
|
all_procs_running.append(proc)
|
|
|
|
def update_proc_stopped(self, proc):
|
|
self.procs_running.remove(proc)
|
|
self.taskloop.procs_running.remove(proc)
|
|
all_procs_running.remove(proc)
|
|
|
|
def update_proc_canceled(self, proc):
|
|
self.procs_pending.remove(proc)
|
|
self.taskloop.procs_pending.remove(proc)
|
|
|
|
def log(self, logmessage):
|
|
tm = localtime()
|
|
line = "SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage)
|
|
for target in self.log_targets:
|
|
print(line, file=target, flush=True)
|
|
|
|
def error(self, logmessage):
|
|
tm = localtime()
|
|
self.log(f"ERROR: {logmessage}")
|
|
self.status = "ERROR"
|
|
if "ERROR" not in self.expect:
|
|
self.retcode = 16
|
|
else:
|
|
self.retcode = 0
|
|
self.terminate()
|
|
with open(f"{self.workdir}/{self.status}", "w") as f:
|
|
print(f"ERROR: {logmessage}", file=f)
|
|
raise SbyAbort(logmessage)
|
|
|
|
def makedirs(self, path):
|
|
if self.reusedir and os.path.isdir(path):
|
|
rmtree(path, ignore_errors=True)
|
|
os.makedirs(path)
|
|
|
|
def copy_src(self):
|
|
os.makedirs(self.workdir + "/src")
|
|
|
|
for dstfile, lines in self.verbatim_files.items():
|
|
dstfile = self.workdir + "/src/" + dstfile
|
|
self.log(f"Writing '{dstfile}'.")
|
|
|
|
with open(dstfile, "w") as f:
|
|
for line in lines:
|
|
f.write(line)
|
|
|
|
for dstfile, srcfile in self.files.items():
|
|
if dstfile.startswith("/") or dstfile.startswith("../") or ("/../" in dstfile):
|
|
self.error(f"destination filename must be a relative path without /../: {dstfile}")
|
|
dstfile = self.workdir + "/src/" + dstfile
|
|
|
|
srcfile = process_filename(srcfile)
|
|
|
|
basedir = os.path.dirname(dstfile)
|
|
if basedir != "" and not os.path.exists(basedir):
|
|
os.makedirs(basedir)
|
|
|
|
self.log(f"Copy '{os.path.abspath(srcfile)}' to '{os.path.abspath(dstfile)}'.")
|
|
if os.path.isdir(srcfile):
|
|
copytree(srcfile, dstfile, dirs_exist_ok=True)
|
|
else:
|
|
copyfile(srcfile, dstfile)
|
|
|
|
def handle_str_option(self, option_name, default_value):
|
|
if option_name in self.options:
|
|
self.__dict__["opt_" + option_name] = self.options[option_name]
|
|
self.used_options.add(option_name)
|
|
else:
|
|
self.__dict__["opt_" + option_name] = default_value
|
|
|
|
def handle_int_option(self, option_name, default_value):
|
|
if option_name in self.options:
|
|
self.__dict__["opt_" + option_name] = int(self.options[option_name])
|
|
self.used_options.add(option_name)
|
|
else:
|
|
self.__dict__["opt_" + option_name] = default_value
|
|
|
|
def handle_bool_option(self, option_name, default_value):
|
|
if option_name in self.options:
|
|
if self.options[option_name] not in ["on", "off"]:
|
|
self.error(f"Invalid value '{self.options[option_name]}' for boolean option {option_name}.")
|
|
self.__dict__["opt_" + option_name] = self.options[option_name] == "on"
|
|
self.used_options.add(option_name)
|
|
else:
|
|
self.__dict__["opt_" + option_name] = default_value
|
|
|
|
def make_model(self, model_name):
|
|
if not os.path.isdir(f"{self.workdir}/model"):
|
|
os.makedirs(f"{self.workdir}/model")
|
|
|
|
if model_name == "prep":
|
|
with open(f"""{self.workdir}/model/design_prep.ys""", "w") as f:
|
|
print(f"# running in {self.workdir}/model/", file=f)
|
|
print(f"""read_ilang design.il""", file=f)
|
|
print("scc -select; simplemap; select -clear", file=f)
|
|
print("memory_nordff", file=f)
|
|
if self.opt_multiclock:
|
|
print("clk2fflogic", file=f)
|
|
else:
|
|
print("async2sync", file=f)
|
|
print("chformal -assume -early", file=f)
|
|
print("formalff -clk2ff -ff2anyinit", file=f)
|
|
if self.opt_mode in ["bmc", "prove"]:
|
|
print("chformal -live -fair -cover -remove", file=f)
|
|
if self.opt_mode == "cover":
|
|
print("chformal -live -fair -remove", file=f)
|
|
if self.opt_mode == "live":
|
|
print("chformal -assert2assume", file=f)
|
|
print("chformal -cover -remove", file=f)
|
|
print("opt_clean", file=f)
|
|
print("check", file=f) # can't detect undriven wires past this point
|
|
print("setundef -undriven -anyseq", file=f)
|
|
print("opt -fast", file=f)
|
|
# running opt before the renames below results in fewer unnamed witness signals
|
|
for celltype in ["anyconst", "anyseq", "anyinit", "allconst", "allseq"]:
|
|
print(f"rename -enumerate -pattern _sby_witness_{celltype}_% t:${celltype} %co w:* %i", file=f)
|
|
print("opt_clean", file=f)
|
|
print(f"""write_rtlil ../model/design_prep.il""", file=f)
|
|
|
|
proc = SbyProc(
|
|
self,
|
|
model_name,
|
|
self.model("base"),
|
|
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
|
|
)
|
|
proc.checkretcode = True
|
|
|
|
return [proc]
|
|
|
|
if model_name == "base":
|
|
with open(f"""{self.workdir}/model/design.ys""", "w") as f:
|
|
print(f"# running in {self.workdir}/src/", file=f)
|
|
for cmd in self.script:
|
|
print(cmd, file=f)
|
|
# the user must designate a top module in [script]
|
|
print("hierarchy -smtcheck", file=f)
|
|
print(f"""write_jny -no-connections ../model/design.json""", file=f)
|
|
print(f"""write_rtlil ../model/design.il""", file=f)
|
|
|
|
proc = SbyProc(
|
|
self,
|
|
model_name,
|
|
[],
|
|
"cd {}/src; {} -ql ../model/design.log ../model/design.ys".format(self.workdir, self.exe_paths["yosys"])
|
|
)
|
|
proc.checkretcode = True
|
|
|
|
def instance_hierarchy_callback(retcode):
|
|
if self.design == None:
|
|
with open(f"{self.workdir}/model/design.json") as f:
|
|
self.design = design_hierarchy(f)
|
|
|
|
def instance_hierarchy_error_callback(retcode):
|
|
self.precise_prop_status = False
|
|
|
|
proc.exit_callback = instance_hierarchy_callback
|
|
proc.error_callback = instance_hierarchy_error_callback
|
|
|
|
return [proc]
|
|
|
|
if re.match(r"^smt2(_syn)?(_nomem)?(_stbv|_stdt)?$", model_name):
|
|
with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
|
|
print(f"# running in {self.workdir}/model/", file=f)
|
|
print(f"""read_ilang design_prep.il""", file=f)
|
|
print("hierarchy -smtcheck", file=f)
|
|
if "_nomem" in model_name:
|
|
print("memory_map", file=f)
|
|
if "_syn" in model_name:
|
|
print("techmap", file=f)
|
|
print("opt -fast", file=f)
|
|
print("abc", file=f)
|
|
print("opt_clean", file=f)
|
|
print("dffunmap", file=f)
|
|
print("stat", file=f)
|
|
if "_stbv" in model_name:
|
|
print(f"write_smt2 -stbv -wires design_{model_name}.smt2", file=f)
|
|
elif "_stdt" in model_name:
|
|
print(f"write_smt2 -stdt -wires design_{model_name}.smt2", file=f)
|
|
else:
|
|
print(f"write_smt2 -wires design_{model_name}.smt2", file=f)
|
|
|
|
proc = SbyProc(
|
|
self,
|
|
model_name,
|
|
self.model("prep"),
|
|
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
|
|
)
|
|
proc.checkretcode = True
|
|
|
|
return [proc]
|
|
|
|
if re.match(r"^btor(_syn)?(_nomem)?$", model_name):
|
|
with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
|
|
print(f"# running in {self.workdir}/model/", file=f)
|
|
print(f"""read_ilang design_prep.il""", file=f)
|
|
print("hierarchy -simcheck", file=f)
|
|
if "_nomem" in model_name:
|
|
print("memory_map", file=f)
|
|
print("flatten", file=f)
|
|
print("setundef -undriven -anyseq", file=f)
|
|
if "_syn" in model_name:
|
|
print("opt -full", file=f)
|
|
print("techmap", file=f)
|
|
print("opt -fast", file=f)
|
|
print("abc", file=f)
|
|
print("opt_clean", file=f)
|
|
else:
|
|
print("opt -fast", file=f)
|
|
print("delete -output", file=f)
|
|
print("dffunmap", file=f)
|
|
print("stat", file=f)
|
|
print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
|
|
print("write_btor -s {}-i design_{m}_single.info design_{m}_single.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
|
|
|
|
proc = SbyProc(
|
|
self,
|
|
model_name,
|
|
self.model("prep"),
|
|
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
|
|
)
|
|
proc.checkretcode = True
|
|
|
|
return [proc]
|
|
|
|
if model_name == "aig":
|
|
with open(f"{self.workdir}/model/design_aiger.ys", "w") as f:
|
|
print(f"# running in {self.workdir}/model/", file=f)
|
|
print("read_ilang design_prep.il", file=f)
|
|
print("hierarchy -simcheck", file=f)
|
|
print("memory_map", file=f)
|
|
print("flatten", file=f)
|
|
print("setundef -undriven -anyseq", file=f)
|
|
print("setattr -unset keep", file=f)
|
|
print("delete -output", file=f)
|
|
print("opt -full", file=f)
|
|
print("techmap", file=f)
|
|
print("opt -fast", file=f)
|
|
print("dffunmap", file=f)
|
|
print("abc -g AND -fast", file=f)
|
|
print("opt_clean", file=f)
|
|
print("stat", file=f)
|
|
print("write_aiger -I -B -zinit -no-startoffset -map design_aiger.aim design_aiger.aig", file=f)
|
|
|
|
proc = SbyProc(
|
|
self,
|
|
"aig",
|
|
self.model("prep"),
|
|
f"""cd {self.workdir}/model; {self.exe_paths["yosys"]} -ql design_aiger.log design_aiger.ys"""
|
|
)
|
|
proc.checkretcode = True
|
|
|
|
return [proc]
|
|
|
|
assert False
|
|
|
|
def model(self, model_name):
|
|
if model_name not in self.models:
|
|
self.models[model_name] = self.make_model(model_name)
|
|
return self.models[model_name]
|
|
|
|
def terminate(self, timeout=False):
|
|
if timeout:
|
|
self.timeout_reached = True
|
|
for proc in list(self.procs_running):
|
|
proc.terminate(timeout=timeout)
|
|
for proc in list(self.procs_pending):
|
|
proc.terminate(timeout=timeout)
|
|
|
|
def proc_failed(self, proc):
|
|
# proc parameter used by autotune override
|
|
self.status = "ERROR"
|
|
self.terminate()
|
|
|
|
def update_status(self, new_status):
|
|
assert new_status in ["PASS", "FAIL", "UNKNOWN", "ERROR"]
|
|
|
|
if new_status == "UNKNOWN":
|
|
return
|
|
|
|
if self.status == "ERROR":
|
|
return
|
|
|
|
if new_status == "PASS":
|
|
assert self.status != "FAIL"
|
|
self.status = "PASS"
|
|
|
|
elif new_status == "FAIL":
|
|
assert self.status != "PASS"
|
|
self.status = "FAIL"
|
|
|
|
elif new_status == "ERROR":
|
|
self.status = "ERROR"
|
|
|
|
else:
|
|
assert 0
|
|
|
|
def run(self, setupmode):
|
|
self.setup_procs(setupmode)
|
|
if not setupmode:
|
|
self.taskloop.run()
|
|
self.write_summary_file()
|
|
|
|
def handle_non_engine_options(self):
|
|
with open(f"{self.workdir}/config.sby", "r") as f:
|
|
self.parse_config(f)
|
|
|
|
self.handle_str_option("mode", None)
|
|
|
|
if self.opt_mode not in ["bmc", "prove", "cover", "live"]:
|
|
self.error(f"Invalid mode: {self.opt_mode}")
|
|
|
|
self.expect = ["PASS"]
|
|
if "expect" in self.options:
|
|
self.expect = self.options["expect"].upper().split(",")
|
|
self.used_options.add("expect")
|
|
|
|
for s in self.expect:
|
|
if s not in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]:
|
|
self.error(f"Invalid expect value: {s}")
|
|
|
|
if self.opt_mode != "live":
|
|
self.handle_int_option("depth", 20)
|
|
|
|
self.handle_bool_option("multiclock", False)
|
|
self.handle_bool_option("wait", False)
|
|
self.handle_int_option("timeout", None)
|
|
|
|
self.handle_str_option("smtc", None)
|
|
self.handle_int_option("skip", None)
|
|
self.handle_str_option("tbtop", None)
|
|
|
|
def setup_procs(self, setupmode):
|
|
self.handle_non_engine_options()
|
|
if self.opt_smtc is not None:
|
|
for engine_idx, engine in self.engine_list():
|
|
if engine[0] != "smtbmc":
|
|
self.error("Option smtc is only valid for smtbmc engine.")
|
|
|
|
if self.opt_skip is not None:
|
|
if self.opt_skip == 0:
|
|
self.opt_skip = None
|
|
else:
|
|
for engine_idx, engine in self.engine_list():
|
|
if engine[0] not in ["smtbmc", "btor"]:
|
|
self.error("Option skip is only valid for smtbmc and btor engines.")
|
|
|
|
if len(self.engine_list()) == 0:
|
|
self.error("Config file is lacking engine configuration.")
|
|
|
|
if self.reusedir:
|
|
rmtree(f"{self.workdir}/model", ignore_errors=True)
|
|
else:
|
|
self.copy_src()
|
|
|
|
if setupmode:
|
|
self.retcode = 0
|
|
return
|
|
|
|
if self.opt_mode == "bmc":
|
|
import sby_mode_bmc
|
|
sby_mode_bmc.run(self)
|
|
|
|
elif self.opt_mode == "prove":
|
|
import sby_mode_prove
|
|
sby_mode_prove.run(self)
|
|
|
|
elif self.opt_mode == "live":
|
|
import sby_mode_live
|
|
sby_mode_live.run(self)
|
|
|
|
elif self.opt_mode == "cover":
|
|
import sby_mode_cover
|
|
sby_mode_cover.run(self)
|
|
|
|
else:
|
|
assert False
|
|
|
|
for opt in self.options.keys():
|
|
if opt not in self.used_options:
|
|
self.error(f"Unused option: {opt}")
|
|
|
|
def summarize(self):
|
|
total_clock_time = int(monotonic() - self.start_clock_time)
|
|
|
|
if os.name == "posix":
|
|
ru = resource.getrusage(resource.RUSAGE_CHILDREN)
|
|
total_process_time = int((ru.ru_utime + ru.ru_stime) - self.start_process_time)
|
|
self.total_time = total_process_time
|
|
|
|
self.summary = [
|
|
"Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
|
|
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
|
|
"Elapsed process time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
|
|
(total_process_time // (60*60), (total_process_time // 60) % 60, total_process_time % 60, total_process_time),
|
|
] + self.summary
|
|
else:
|
|
self.summary = [
|
|
"Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
|
|
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
|
|
"Elapsed process time unvailable on Windows"
|
|
] + self.summary
|
|
|
|
for line in self.summary:
|
|
self.log(f"summary: {line}")
|
|
|
|
assert self.status in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]
|
|
|
|
if self.status in self.expect:
|
|
self.retcode = 0
|
|
else:
|
|
if self.status == "PASS": self.retcode = 1
|
|
if self.status == "FAIL": self.retcode = 2
|
|
if self.status == "UNKNOWN": self.retcode = 4
|
|
if self.status == "TIMEOUT": self.retcode = 8
|
|
if self.status == "ERROR": self.retcode = 16
|
|
|
|
def write_summary_file(self):
|
|
with open(f"{self.workdir}/{self.status}", "w") as f:
|
|
for line in self.summary:
|
|
print(line, file=f)
|
|
|
|
def exit_callback(self):
|
|
self.summarize()
|
|
|
|
def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False):
|
|
junit_time = strftime('%Y-%m-%dT%H:%M:%S')
|
|
if not self.design:
|
|
self.precise_prop_status = False
|
|
if self.precise_prop_status:
|
|
checks = self.design.hierarchy.get_property_list()
|
|
junit_tests = len(checks)
|
|
junit_failures = 0
|
|
junit_errors = 0
|
|
junit_skipped = 0
|
|
for check in checks:
|
|
if check.status == "PASS":
|
|
pass
|
|
elif check.status == "FAIL":
|
|
junit_failures += 1
|
|
elif check.status == "UNKNOWN":
|
|
junit_skipped += 1
|
|
else:
|
|
junit_errors += 1
|
|
if self.retcode == 16:
|
|
junit_errors += 1
|
|
elif self.retcode != 0:
|
|
junit_failures += 1
|
|
else:
|
|
junit_tests = 1
|
|
junit_errors = 1 if self.retcode == 16 else 0
|
|
junit_failures = 1 if self.retcode != 0 and junit_errors == 0 else 0
|
|
junit_skipped = 0
|
|
print(f'<?xml version="1.0" encoding="UTF-8"?>', file=f)
|
|
print(f'<testsuites>', file=f)
|
|
print(f'<testsuite timestamp="{junit_time}" hostname="{platform.node()}" package="{junit_ts_name}" id="0" name="{junit_tc_name}" tests="{junit_tests}" errors="{junit_errors}" failures="{junit_failures}" time="{self.total_time}" skipped="{junit_skipped}">', file=f)
|
|
print(f'<properties>', file=f)
|
|
print(f'<property name="os" value="{platform.system()}"/>', file=f)
|
|
print(f'<property name="expect" value="{", ".join(self.expect)}"/>', file=f)
|
|
print(f'<property name="status" value="{self.status}"/>', file=f)
|
|
print(f'</properties>', file=f)
|
|
if self.precise_prop_status:
|
|
print(f'<testcase classname="{junit_tc_name}" name="build execution" time="0">', file=f)
|
|
if self.retcode == 16:
|
|
print(f'<error type="ERROR"/>', file=f) # type mandatory, message optional
|
|
elif self.retcode != 0:
|
|
if len(self.expect) > 1 or "PASS" not in self.expect:
|
|
expected = " ".join(self.expect)
|
|
print(f'<failure type="EXPECT" message="Task returned status {self.status}. Expected values were: {expected}" />', file=f)
|
|
else:
|
|
print(f'<failure type="{self.status}" message="Task returned status {self.status}." />', file=f)
|
|
print(f'</testcase>', file=f)
|
|
|
|
for check in checks:
|
|
if junit_format_strict:
|
|
detail_attrs = ''
|
|
else:
|
|
detail_attrs = f' type="{check.type}" location="{check.location}" id="{check.name}"'
|
|
if check.tracefile:
|
|
detail_attrs += f' tracefile="{check.tracefile}"'
|
|
if check.location:
|
|
junit_prop_name = f"Property {check.type} in {check.hierarchy} at {check.location}"
|
|
else:
|
|
junit_prop_name = f"Property {check.type} {check.name} in {check.hierarchy}"
|
|
print(f'<testcase classname="{junit_tc_name}" name="{junit_prop_name}" time="0"{detail_attrs}>', file=f)
|
|
if check.status == "PASS":
|
|
pass
|
|
elif check.status == "UNKNOWN":
|
|
print(f'<skipped />', file=f)
|
|
elif check.status == "FAIL":
|
|
traceinfo = f' Trace file: {check.tracefile}' if check.type == check.Type.ASSERT else ''
|
|
print(f'<failure type="{check.type}" message="{junit_prop_name} failed.{traceinfo}" />', file=f)
|
|
elif check.status == "ERROR":
|
|
print(f'<error type="ERROR"/>', file=f) # type mandatory, message optional
|
|
print(f'</testcase>', file=f)
|
|
else:
|
|
print(f'<testcase classname="{junit_tc_name}" name="{junit_tc_name}" time="{self.total_time}">', file=f)
|
|
if junit_errors:
|
|
print(f'<error type="ERROR"/>', file=f) # type mandatory, message optional
|
|
elif junit_failures:
|
|
junit_type = "assert" if self.opt_mode in ["bmc", "prove"] else self.opt_mode
|
|
print(f'<failure type="{junit_type}" message="{self.status}" />', file=f)
|
|
print(f'</testcase>', file=f)
|
|
print('<system-out>', end="", file=f)
|
|
with open(f"{self.workdir}/logfile.txt", "r") as logf:
|
|
for line in logf:
|
|
print(line.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """), end="", file=f)
|
|
print('</system-out>', file=f)
|
|
print('<system-err>', file=f)
|
|
#TODO: can we handle errors and still output this file?
|
|
print('</system-err>', file=f)
|
|
print(f'</testsuite>', file=f)
|
|
print(f'</testsuites>', file=f)
|