3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-22 21:05:30 +00:00

Initial import

This commit is contained in:
Clifford Wolf 2017-01-22 16:47:47 +01:00
commit 3a13b116a6
11 changed files with 1402 additions and 0 deletions

20
sbysrc/demo.sby Normal file
View file

@ -0,0 +1,20 @@
[options]
mode bmc
depth 10
[engines]
smtbmc -s yices
smtbmc -s boolector
smtbmc -s z3 --nomem
abc_bmc3
[script]
read_verilog -formal -norestrict -assume-asserts picorv32.v
read_verilog -formal axicheck.v
prep -nordff -top testbench
[files]
picorv32.v ~/Work/picorv32/picorv32.v
axicheck.v ~/Work/picorv32/scripts/smtbmc/axicheck.v

88
sbysrc/sby.py Normal file
View file

@ -0,0 +1,88 @@
#!/usr/bin/env python3
#
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
#
# Copyright (C) 2016 Clifford Wolf <clifford@clifford.at>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import os, sys, getopt, shutil
##yosys-sys-path##
from sby_core import SbyJob
sbyfile = None
workdir = None
opt_force = False
opt_backup = False
def usage():
print("""
sby [options] <jobname>.sby
-d <dirname>
set workdir name. default: <jobname> (without .sby)
-f
remove workdir if it already exists
-b
backup workdir if it already exists
""")
sys.exit(1)
try:
opts, args = getopt.getopt(sys.argv[1:], "d:bf", [])
except:
usage()
for o, a in opts:
if o == "-d":
workdir = a
elif o == "-f":
opt_force = True
elif o == "-b":
opt_backup = True
else:
usage()
if len(args) != 1:
usage()
sbyfile = args[0]
early_logmsgs = list()
def early_log(msg):
early_logmsgs.append("SBY [%s] %s" % (workdir, msg))
print(early_logmsgs[-1])
if workdir is None:
assert sbyfile.endswith(".sby")
workdir = sbyfile[:-4]
if opt_backup:
backup_idx = 0
while os.path.exists("%s.bak%03d" % (workdir, backup_idx)):
backup_idx += 1
early_log("Moving direcory '%s' to '%s'." % (workdir, "%s.bak%03d" % (workdir, backup_idx)))
shutil.move(workdir, "%s.bak%03d" % (workdir, backup_idx))
if opt_force:
early_log("Removing direcory '%s'." % (workdir))
shutil.rmtree(workdir, ignore_errors=True)
job = SbyJob(sbyfile, workdir, early_logmsgs)
job.run()
sys.exit(0 if job.status == "PASS" else 1)

167
sbysrc/sby_bmc.py Normal file
View file

@ -0,0 +1,167 @@
#
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
#
# Copyright (C) 2016 Clifford Wolf <clifford@clifford.at>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import re, os, getopt
from sby_core import SbyTask
def run_smtbmc(job, engine_idx, engine):
smtbmc_opts = []
nomem_opt = False
syn_opt = False
opts, args = getopt.getopt(engine[1:], "s:", ["nomem", "syn"])
assert len(args) == 0
for o, a in opts:
if o == "-s":
smtbmc_opts += ["-s", a]
elif o == "--nomem":
nomem_opt = True
elif o == "--syn":
syn_opt = True
else:
assert False
model_name = "smt2"
if syn_opt: model_name += "_syn"
if nomem_opt: model_name += "_nomem"
task = SbyTask(job, "engine_%d" % engine_idx, job.model(model_name),
("cd %s; yosys-smtbmc --noprogress %s -t %d --dump-vcd engine_%d/trace.vcd --dump-vlogtb engine_%d/trace_tb.v " +
"--dump-smtc engine_%d/trace.smtc model/design_smt2.smt2") %
(job.workdir, " ".join(smtbmc_opts), job.opt_depth, engine_idx, engine_idx, engine_idx),
logfile=open("%s/engine_%d/logfile.txt" % (job.workdir, engine_idx), "w"))
task_status = None
def output_callback(line):
nonlocal task_status
match = re.match(r"^## [0-9: ]+ Status: FAILED", line)
if match: task_status = "FAIL"
match = re.match(r"^## [0-9: ]+ Status: PASSED", line)
if match: task_status = "PASS"
def exit_callback(retcode):
assert task_status is not None
job.status = task_status
job.log("engine_%d: Status returned by engine: %s" % (engine_idx, task_status))
job.summary.append("engine_%d (%s) returned %s" % (engine_idx, " ".join(engine), job.status))
if job.status == "FAIL":
job.summary.append("counterexample trace: %s/engine_%d/trace.vcd" % (job.workdir, engine_idx))
for t in job.engine_tasks:
if task is not t:
t.terminate()
task.output_callback = output_callback
task.exit_callback = exit_callback
job.engine_tasks.append(task)
def run_abc_bmc3(job, engine_idx, engine):
task = SbyTask(job, "engine_%d" % engine_idx, job.model("aig"),
("cd %s; yosys-abc -c 'read_aiger model/design_aiger.aig; fold; strash; bmc3 -F %d -v; " +
"undc -c; write_cex -n engine_%d/trace.cex'") % (job.workdir, job.opt_depth, engine_idx),
logfile=open("%s/engine_%d/logfile.txt" % (job.workdir, engine_idx), "w"))
task_status = None
def output_callback(line):
nonlocal task_status
match = re.match(r"^Output [0-9]+ of miter .* was asserted in frame [0-9]+.", line)
if match: task_status = "FAIL"
match = re.match(r"^No output asserted in [0-9]+ frames.", line)
if match: task_status = "PASS"
def exit_callback(retcode):
assert retcode == 0
assert task_status is not None
job.status = task_status
job.log("engine_%d: Status returned by engine: %s" % (engine_idx, task_status))
job.summary.append("engine_%d (%s) returned %s" % (engine_idx, " ".join(engine), job.status))
for t in job.engine_tasks:
if task is not t:
t.terminate()
if job.status == "FAIL":
task2 = SbyTask(job, "engine_%d" % engine_idx, job.model("smt2"),
("cd %s; yosys-smtbmc --noprogress %s -t %d --dump-vcd engine_%d/trace.vcd --dump-vlogtb engine_%d/trace_tb.v " +
"--dump-smtc engine_%d/trace.smtc --cex engine_%d/trace.cex model/design_smt2.smt2") %
(job.workdir, " ".join(engine[1:]), job.opt_depth, engine_idx, engine_idx, engine_idx, engine_idx),
logfile=open("%s/engine_%d/logfile2.txt" % (job.workdir, engine_idx), "w"))
task2_status = None
def output_callback2(line):
nonlocal task2_status
match = re.match(r"^## [0-9: ]+ Status: FAILED", line)
if match: task2_status = "FAIL"
match = re.match(r"^## [0-9: ]+ Status: PASSED", line)
if match: task2_status = "PASS"
def exit_callback2(line):
assert task2_status is not None
assert task2_status == "FAIL"
job.summary.append("counterexample trace: %s/engine_%d/trace.vcd" % (job.workdir, engine_idx))
task2.output_callback = output_callback2
task2.exit_callback = exit_callback2
task.output_callback = output_callback
task.exit_callback = exit_callback
job.engine_tasks.append(task)
def run(job):
job.opt_depth = 20
if "depth" in job.options:
job.opt_depth = int(job.options["depth"])
job.status = "UNKNOWN"
job.engine_tasks = list()
for engine_idx in range(len(job.engines)):
engine = job.engines[engine_idx]
assert len(engine) > 0
job.log("engine_%d: %s" % (engine_idx, " ".join(engine)))
os.makedirs("%s/engine_%d" % (job.workdir, engine_idx))
if engine[0] == "smtbmc":
run_smtbmc(job, engine_idx, engine)
elif engine[0] == "abc_bmc3":
run_abc_bmc3(job, engine_idx, engine)
else:
assert False
job.taskloop()

340
sbysrc/sby_core.py Normal file
View file

@ -0,0 +1,340 @@
#
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
#
# Copyright (C) 2016 Clifford Wolf <clifford@clifford.at>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import os, re, resource
import subprocess, fcntl
from shutil import copyfile
from select import select
from time import time
class SbyTask:
def __init__(self, job, info, deps, cmdline, logfile=None):
self.running = False
self.finished = False
self.terminated = False
self.job = job
self.info = info
self.deps = deps
self.cmdline = cmdline
self.logfile = logfile
self.notify = []
for dep in self.deps:
dep.register_dep(self)
self.output_callback = None
self.exit_callback = None
self.job.tasks_all.append(self)
def register_dep(self, next_task):
if self.finished:
next_task.poll()
else:
self.notify.append(next_task)
def handle_output(self, line):
if self.terminated:
return
if self.logfile is not None:
print(line, file=self.logfile)
if self.output_callback is not None:
self.output_callback(line)
def handle_exit(self, retcode):
if self.terminated:
return
if self.logfile is not None:
self.logfile.close()
if self.exit_callback is not None:
self.exit_callback(retcode)
def terminate(self):
if self.running:
self.job.log("%s: terminating process" % self.info)
self.p.terminate()
self.job.tasks_running.remove(self)
self.terminated = True
def poll(self):
if self.finished or self.terminated:
return
if not self.running:
for dep in self.deps:
if not dep.finished:
return
self.job.log("%s: starting process \"%s\"" % (self.info, self.cmdline))
self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.job.tasks_running.append(self)
self.running = True
return
while True:
outs = self.p.stdout.readline().decode("utf-8")
if len(outs) == 0: break
self.job.log("%s: %s" % (self.info, outs.strip()))
self.handle_output(outs.strip())
if self.p.poll() is not None:
self.handle_exit(self.p.returncode)
self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode))
self.job.tasks_running.remove(self)
self.running = False
self.finished = True
for next_task in self.notify:
next_task.poll()
return
class SbyJob:
def __init__(self, filename, workdir, early_logs):
self.options = dict()
self.engines = list()
self.script = list()
self.files = dict()
self.models = dict()
self.workdir = workdir
self.tasks_running = []
self.tasks_all = []
self.start_clock_time = time()
ru = resource.getrusage(resource.RUSAGE_CHILDREN)
self.start_process_time = ru.ru_utime + ru.ru_stime
self.logprefix = "SBY [%s]" % self.workdir
self.summary = list()
os.makedirs(workdir)
self.logfile = open("%s/logfile.txt" % workdir, "w")
for line in early_logs:
print(line, file=self.logfile)
self.logfile.flush()
mode = None
key = None
with open(filename, "r") as f:
for line in f:
line = line.strip()
# print(line)
if line == "" or line[0] == "#":
continue
match = re.match(r"^\s*\[(.*)\]\s*$", line)
if match:
entries = match.group(1).split()
assert len(entries) > 0
if entries[0] == "options":
mode = "options"
assert len(self.options) == 0
assert len(entries) == 1
continue
if entries[0] == "engines":
mode = "engines"
assert len(self.engines) == 0
assert len(entries) == 1
continue
if entries[0] == "script":
mode = "script"
assert len(self.script) == 0
assert len(entries) == 1
continue
if entries[0] == "files":
mode = "files"
assert len(entries) == 1
continue
assert False
if mode == "options":
entries = line.split()
assert len(entries) == 2
self.options[entries[0]] = entries[1]
continue
if mode == "engines":
entries = line.split()
self.engines.append(entries)
continue
if mode == "script":
self.script.append(line)
continue
if mode == "files":
entries = line.split()
if len(entries) == 1:
self.files[os.path.basename(entries[0])] = entries[0]
elif len(entries) == 2:
self.files[entries[0]] = entries[1]
else:
assert False
continue
assert False
def taskloop(self):
for task in self.tasks_all:
task.poll()
while len(self.tasks_running):
fds = []
for task in self.tasks_running:
if task.running:
fds.append(task.p.stdout)
try:
while select(fds, [], [], 1.0) == ([], [], []):
pass
except:
pass
for task in self.tasks_running:
task.poll()
def log(self, logmessage):
print("%s %s" % (self.logprefix, logmessage))
print("%s %s" % (self.logprefix, logmessage), file=self.logfile)
self.logfile.flush()
def copy_src(self):
for dstfile, srcfile in self.files.items():
dstfile = self.workdir + "/src/" + dstfile
if srcfile.startswith("~/"):
srcfile = os.environ['HOME'] + srcfile[1:]
basedir = os.path.dirname(dstfile)
if basedir != "" and not os.path.exists(basedir):
os.makedirs(basedir)
self.log("Copy '%s' to '%s'." % (srcfile, dstfile))
copyfile(srcfile, dstfile)
def make_model(self, model_name):
if not os.path.exists("%s/model" % self.workdir):
os.makedirs("%s/model" % self.workdir)
if model_name == "ilang":
with open("%s/model/design.ys" % (self.workdir), "w") as f:
print("# running in %s/src/" % self.workdir, file=f)
for cmd in self.script:
print(cmd, file=f)
print("write_ilang ../model/design.il", file=f)
task = SbyTask(self, "script", [],
"cd %s/src; yosys -ql ../model/design.log ../model/design.ys" % (self.workdir))
return [task]
if model_name in ["smt2", "smt2_syn", "smt2_nomem", "smt2_syn_nomem"]:
with open("%s/model/design_%s.ys" % (self.workdir, model_name), "w") as f:
print("# running in %s/model/" % (self.workdir), file=f)
print("read_ilang design.il", file=f)
if model_name in ["smt2_nomem", "smt2_syn_nomem"]:
print("memory_map", file=f)
print("opt -keepdc -fast", file=f)
if model_name in ["smt2_syn", "smt2_syn_nomem"]:
print("techmap", file=f)
print("opt -fast", file=f)
print("abc", file=f)
print("opt_clean", file=f)
print("stat", file=f)
print("write_smt2 -wires design_%s.smt2" % model_name, file=f)
task = SbyTask(self, model_name, self.model("ilang"),
"cd %s/model; yosys -ql design_%s.log design_%s.ys" % (self.workdir, model_name, model_name))
return [task]
if model_name == "aig":
with open("%s/model/design_aiger.ys" % (self.workdir), "w") as f:
print("# running in %s/model/" % (self.workdir), file=f)
print("read_ilang design.il", file=f)
print("flatten", file=f)
print("setattr -unset keep", file=f)
print("delete -output", file=f)
print("memory_map", file=f)
print("opt -full", file=f)
print("techmap", file=f)
print("opt -fast", file=f)
print("abc -g AND -fast", file=f)
print("opt_clean", file=f)
print("stat", file=f)
print("write_aiger -zinit -map design_aiger.aim design_aiger.aig", file=f)
task = SbyTask(self, "aig", self.model("ilang"),
"cd %s/model; yosys -ql design_aiger.log design_aiger.ys" % (self.workdir))
return [task]
assert False
def model(self, model_name):
if model_name not in self.models:
self.models[model_name] = self.make_model(model_name)
return self.models[model_name]
def run(self):
assert "mode" in self.options
self.copy_src()
if self.options["mode"] == "bmc":
import sby_bmc
sby_bmc.run(self)
else:
assert False
total_clock_time = int(time() - self.start_clock_time)
ru = resource.getrusage(resource.RUSAGE_CHILDREN)
total_process_time = int((ru.ru_utime + ru.ru_stime) - self.start_process_time)
self.summary = [
"Elapsed clock time [H:MM:SS (secs)]: %d:%02d:%02d (%d)" %
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
"Elapsed process time [H:MM:SS (secs)]: %d:%02d:%02d (%d)" %
(total_process_time // (60*60), (total_process_time // 60) % 60, total_process_time % 60, total_process_time),
] + self.summary
for line in self.summary:
self.log("summary: %s" % line)
with open("%s/%s" % (self.workdir, self.status), "w") as f:
for line in self.summary:
print(line, file=f)
self.log("DONE (%s)" % self.status)
assert self.status in ["PASS", "FAIL"]