mirror of
https://github.com/YosysHQ/sby.git
synced 2025-04-27 23:45:50 +00:00
Add aigcxemin and cexenum.py tools
This commit is contained in:
parent
9e35ec9948
commit
040b8deef2
11 changed files with 2223 additions and 0 deletions
584
tools/cexenum/cexenum.py
Executable file
584
tools/cexenum/cexenum.py
Executable file
|
@ -0,0 +1,584 @@
|
|||
#!/usr/bin/env tabbypy3
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
|
||||
import json
|
||||
import traceback
|
||||
import argparse
|
||||
import shutil
|
||||
import shlex
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Awaitable, Literal
|
||||
|
||||
import yosys_mau.task_loop.job_server as job
|
||||
from yosys_mau import task_loop as tl
|
||||
|
||||
|
||||
libexec = Path(__file__).parent.resolve() / "libexec"
|
||||
|
||||
if libexec.exists():
|
||||
os.environb[b"PATH"] = bytes(libexec) + b":" + os.environb[b"PATH"]
|
||||
|
||||
|
||||
def arg_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="cexenum", usage="%(prog)s [options] <sby_workdir>"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"work_dir",
|
||||
metavar="<workdir>",
|
||||
help="existing SBY work directory",
|
||||
type=Path,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--depth",
|
||||
type=int,
|
||||
metavar="N",
|
||||
help="BMC depth for the initial assertion failure (default: %(default)s)",
|
||||
default=100,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--enum-depth",
|
||||
type=int,
|
||||
metavar="N",
|
||||
help="number of time steps to run enumeration for, starting with"
|
||||
" and including the time step of the first assertion failure"
|
||||
" (default: %(default)s)",
|
||||
default=10,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--no-sim",
|
||||
dest="sim",
|
||||
action="store_false",
|
||||
help="do not run sim to obtain .fst traces for the enumerated counter examples",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--smtbmc-options",
|
||||
metavar='"..."',
|
||||
type=shlex.split,
|
||||
help='command line options to pass to smtbmc (default: "%(default)s")',
|
||||
default="-s yices --unroll",
|
||||
)
|
||||
|
||||
parser.add_argument("--debug", action="store_true", help="enable debug logging")
|
||||
parser.add_argument(
|
||||
"--debug-events", action="store_true", help="enable debug event logging"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-j",
|
||||
metavar="<N>",
|
||||
type=int,
|
||||
dest="jobs",
|
||||
help="maximum number of processes to run in parallel",
|
||||
default=None,
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def lines(*args):
|
||||
return "".join(f"{line}\n" for line in args)
|
||||
|
||||
|
||||
@tl.task_context
|
||||
class App:
|
||||
raw_args: argparse.Namespace
|
||||
|
||||
debug: bool = False
|
||||
debug_events: bool = False
|
||||
|
||||
depth: int
|
||||
enum_depth: int
|
||||
sim: bool
|
||||
|
||||
smtbmc_options: list[str]
|
||||
|
||||
work_dir: Path
|
||||
|
||||
work_subdir: Path
|
||||
trace_dir_full: Path
|
||||
trace_dir_min: Path
|
||||
cache_dir: Path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = arg_parser().parse_args()
|
||||
|
||||
job.global_client(args.jobs)
|
||||
|
||||
# Move command line arguments into the App context
|
||||
for name in dir(args):
|
||||
if name in type(App).__mro__[1].__annotations__:
|
||||
setattr(App, name, getattr(args, name))
|
||||
|
||||
App.raw_args = args
|
||||
|
||||
try:
|
||||
tl.run_task_loop(task_loop_main)
|
||||
except tl.TaskCancelled:
|
||||
exit(1)
|
||||
except BaseException as e:
|
||||
if App.debug or App.debug_events:
|
||||
traceback.print_exc()
|
||||
tl.log_exception(e, raise_error=False) # Automatically avoids double logging
|
||||
exit(1)
|
||||
|
||||
|
||||
def setup_logging():
|
||||
tl.LogContext.app_name = "CEXENUM"
|
||||
tl.logging.start_logging()
|
||||
|
||||
if App.debug_events:
|
||||
tl.logging.start_debug_event_logging()
|
||||
if App.debug:
|
||||
tl.LogContext.level = "debug"
|
||||
|
||||
def error_handler(err: BaseException):
|
||||
if isinstance(err, tl.TaskCancelled):
|
||||
return
|
||||
tl.log_exception(err, raise_error=True)
|
||||
|
||||
tl.current_task().set_error_handler(None, error_handler)
|
||||
|
||||
|
||||
async def batch(*args):
|
||||
result = None
|
||||
for arg in args:
|
||||
result = await arg
|
||||
return result
|
||||
|
||||
|
||||
async def task_loop_main() -> None:
|
||||
setup_logging()
|
||||
|
||||
cached = False
|
||||
|
||||
App.cache_dir = App.work_dir / "cexenum_cache"
|
||||
try:
|
||||
App.cache_dir.mkdir()
|
||||
except FileExistsError:
|
||||
if (App.cache_dir / "done").exists():
|
||||
cached = True
|
||||
else:
|
||||
shutil.rmtree(App.cache_dir)
|
||||
App.cache_dir.mkdir()
|
||||
|
||||
App.work_subdir = App.work_dir / "cexenum"
|
||||
try:
|
||||
App.work_subdir.mkdir()
|
||||
except FileExistsError:
|
||||
shutil.rmtree(App.work_subdir)
|
||||
App.work_subdir.mkdir()
|
||||
|
||||
App.trace_dir_full = App.work_subdir / "full"
|
||||
App.trace_dir_full.mkdir()
|
||||
App.trace_dir_min = App.work_subdir / "min"
|
||||
App.trace_dir_min.mkdir()
|
||||
|
||||
if cached:
|
||||
tl.log("Reusing cached AIGER model")
|
||||
aig_model = tl.Task()
|
||||
else:
|
||||
aig_model = AigModel()
|
||||
|
||||
Enumeration(aig_model)
|
||||
|
||||
|
||||
class AigModel(tl.process.Process):
|
||||
def __init__(self):
|
||||
self[tl.LogContext].scope = "aiger"
|
||||
(App.cache_dir / "design_aiger.ys").write_text(
|
||||
lines(
|
||||
"read_ilang ../model/design_prep.il",
|
||||
"hierarchy -simcheck",
|
||||
"flatten",
|
||||
"setundef -undriven -anyseq",
|
||||
"setattr -set keep 1 w:\*",
|
||||
"delete -output",
|
||||
"opt -full",
|
||||
"techmap",
|
||||
"opt -fast",
|
||||
"memory_map -formal",
|
||||
"formalff -clk2ff -ff2anyinit",
|
||||
"simplemap",
|
||||
"dffunmap",
|
||||
"abc -g AND -fast",
|
||||
"opt_clean",
|
||||
"stat",
|
||||
"write_rtlil design_aiger.il",
|
||||
"write_aiger -I -B -zinit"
|
||||
" -map design_aiger.aim -ywmap design_aiger.ywa design_aiger.aig",
|
||||
)
|
||||
)
|
||||
super().__init__(
|
||||
["yosys", "-ql", "design_aiger.log", "design_aiger.ys"], cwd=App.cache_dir
|
||||
)
|
||||
self.name = "aiger"
|
||||
self.log_output()
|
||||
|
||||
async def on_run(self) -> None:
|
||||
await super().on_run()
|
||||
(App.cache_dir / "done").write_text("")
|
||||
|
||||
|
||||
class MinimizeTrace(tl.Task):
|
||||
def __init__(self, trace_name: str, aig_model: tl.Task):
|
||||
super().__init__()
|
||||
self.trace_name = trace_name
|
||||
|
||||
full_yw = App.trace_dir_full / self.trace_name
|
||||
min_yw = App.trace_dir_min / self.trace_name
|
||||
|
||||
stem = full_yw.stem
|
||||
|
||||
full_aiw = full_yw.with_suffix(".aiw")
|
||||
min_aiw = min_yw.with_suffix(".aiw")
|
||||
|
||||
yw2aiw = YosysWitness(
|
||||
"yw2aiw",
|
||||
full_yw,
|
||||
App.cache_dir / "design_aiger.ywa",
|
||||
full_aiw,
|
||||
cwd=App.trace_dir_full,
|
||||
)
|
||||
yw2aiw.depends_on(aig_model)
|
||||
yw2aiw[tl.LogContext].scope = f"yw2aiw[{stem}]"
|
||||
|
||||
aigcexmin = AigCexMin(
|
||||
App.cache_dir / "design_aiger.aig",
|
||||
full_aiw,
|
||||
min_aiw,
|
||||
cwd=App.trace_dir_min,
|
||||
)
|
||||
aigcexmin.depends_on(yw2aiw)
|
||||
aigcexmin[tl.LogContext].scope = f"aigcexmin[{stem}]"
|
||||
|
||||
self.aiw2yw = aiw2yw = YosysWitness(
|
||||
"aiw2yw",
|
||||
min_aiw,
|
||||
App.cache_dir / "design_aiger.ywa",
|
||||
min_yw,
|
||||
cwd=App.trace_dir_min,
|
||||
)
|
||||
aiw2yw[tl.LogContext].scope = f"aiw2yw[{stem}]"
|
||||
aiw2yw.depends_on(aigcexmin)
|
||||
|
||||
if App.sim:
|
||||
sim = SimTrace(
|
||||
App.cache_dir / "design_aiger.il",
|
||||
min_yw,
|
||||
min_yw.with_suffix(".fst"),
|
||||
cwd=App.trace_dir_min,
|
||||
)
|
||||
|
||||
sim[tl.LogContext].scope = f"sim[{stem}]"
|
||||
sim.depends_on(aiw2yw)
|
||||
|
||||
|
||||
def relative_to(target: Path, cwd: Path) -> Path:
|
||||
prefix = Path("")
|
||||
target = target.resolve()
|
||||
cwd = cwd.resolve()
|
||||
while True:
|
||||
try:
|
||||
return prefix / (target.relative_to(cwd))
|
||||
except ValueError:
|
||||
prefix = prefix / ".."
|
||||
if cwd == cwd.parent:
|
||||
return target
|
||||
cwd = cwd.parent
|
||||
|
||||
|
||||
class YosysWitness(tl.process.Process):
|
||||
def __init__(
|
||||
self,
|
||||
mode: Literal["yw2aiw"] | Literal["aiw2yw"],
|
||||
input: Path,
|
||||
mapfile: Path,
|
||||
output: Path,
|
||||
cwd: Path,
|
||||
):
|
||||
super().__init__(
|
||||
[
|
||||
"yosys-witness",
|
||||
mode,
|
||||
str(relative_to(input, cwd)),
|
||||
str(relative_to(mapfile, cwd)),
|
||||
str(relative_to(output, cwd)),
|
||||
],
|
||||
cwd=cwd,
|
||||
)
|
||||
|
||||
def handler(event: tl.process.OutputEvent):
|
||||
tl.log_debug(event.output.rstrip("\n"))
|
||||
|
||||
self.sync_handle_events(tl.process.OutputEvent, handler)
|
||||
|
||||
|
||||
class AigCexMin(tl.process.Process):
|
||||
def __init__(self, design_aig: Path, input_aiw: Path, output_aiw: Path, cwd: Path):
|
||||
super().__init__(
|
||||
[
|
||||
"aigcexmin",
|
||||
str(relative_to(design_aig, cwd)),
|
||||
str(relative_to(input_aiw, cwd)),
|
||||
str(relative_to(output_aiw, cwd)),
|
||||
],
|
||||
cwd=cwd,
|
||||
)
|
||||
|
||||
self.log_path = output_aiw.with_suffix(".log")
|
||||
self.log_file = None
|
||||
|
||||
def handler(event: tl.process.OutputEvent):
|
||||
if self.log_file is None:
|
||||
self.log_file = self.log_path.open("w")
|
||||
self.log_file.write(event.output)
|
||||
self.log_file.flush()
|
||||
tl.log_debug(event.output.rstrip("\n"))
|
||||
|
||||
self.sync_handle_events(tl.process.OutputEvent, handler)
|
||||
|
||||
def on_cleanup(self):
|
||||
if self.log_file is not None:
|
||||
self.log_file.close()
|
||||
super().on_cleanup()
|
||||
|
||||
|
||||
class SimTrace(tl.process.Process):
|
||||
def __init__(self, design_il: Path, input_yw: Path, output_fst: Path, cwd: Path):
|
||||
self[tl.LogContext].scope = "sim"
|
||||
|
||||
script_file = output_fst.with_suffix(".fst.ys")
|
||||
log_file = output_fst.with_suffix(".fst.log")
|
||||
|
||||
script_file.write_text(
|
||||
lines(
|
||||
f"read_rtlil {relative_to(design_il, cwd)}",
|
||||
"logger -nowarn"
|
||||
' "Yosys witness trace has an unexpected value for the clock input"',
|
||||
f"sim -zinit -r {relative_to(input_yw, cwd)} -hdlname"
|
||||
f" -fst {relative_to(output_fst, cwd)}",
|
||||
)
|
||||
)
|
||||
super().__init__(
|
||||
[
|
||||
"yosys",
|
||||
"-ql",
|
||||
str(relative_to(log_file, cwd)),
|
||||
str(relative_to(script_file, cwd)),
|
||||
],
|
||||
cwd=cwd,
|
||||
)
|
||||
self.name = "sim"
|
||||
self.log_output()
|
||||
|
||||
|
||||
class Enumeration(tl.Task):
|
||||
def __init__(self, aig_model: tl.Task):
|
||||
self.aig_model = aig_model
|
||||
super().__init__()
|
||||
|
||||
async def on_run(self) -> None:
|
||||
smtbmc = Smtbmc(App.work_dir / "model" / "design_smt2.smt2")
|
||||
|
||||
await smtbmc.ping()
|
||||
|
||||
pred = None
|
||||
|
||||
i = 0
|
||||
limit = App.depth
|
||||
first_failure = None
|
||||
|
||||
while i <= limit:
|
||||
tl.log(f"Checking assumptions in step {i}..")
|
||||
presat_checked = await batch(
|
||||
smtbmc.bmc_step(i, initial=i == 0, assertions=None, pred=pred),
|
||||
smtbmc.check(),
|
||||
)
|
||||
if presat_checked != "sat":
|
||||
if first_failure is None:
|
||||
tl.log_error("Assumptions are not satisfiable")
|
||||
else:
|
||||
tl.log("No further counter-examples are reachable")
|
||||
return
|
||||
|
||||
tl.log(f"Checking assertions in step {i}..")
|
||||
checked = await batch(
|
||||
smtbmc.push(),
|
||||
smtbmc.assertions(i, False),
|
||||
smtbmc.check(),
|
||||
)
|
||||
pred = i
|
||||
if checked != "unsat":
|
||||
if first_failure is None:
|
||||
first_failure = i
|
||||
limit = i + App.enum_depth
|
||||
tl.log("BMC failed! Enumerating counter-examples..")
|
||||
counter = 0
|
||||
|
||||
assert checked == "sat"
|
||||
path = App.trace_dir_full / f"trace{i}_{counter}.yw"
|
||||
|
||||
while checked == "sat":
|
||||
await smtbmc.incremental_command(
|
||||
cmd="write_yw_trace", path=str(path)
|
||||
)
|
||||
tl.log(f"Written counter-example to {path.name}")
|
||||
|
||||
minimize = MinimizeTrace(path.name, self.aig_model)
|
||||
minimize.depends_on(self.aig_model)
|
||||
|
||||
await minimize.aiw2yw.finished
|
||||
|
||||
min_path = App.trace_dir_min / f"trace{i}_{counter}.yw"
|
||||
|
||||
checked = await batch(
|
||||
smtbmc.incremental_command(
|
||||
cmd="read_yw_trace",
|
||||
name="last",
|
||||
path=str(min_path),
|
||||
skip_x=True,
|
||||
),
|
||||
smtbmc.assert_(
|
||||
["not", ["and", *(["yw", "last", k] for k in range(i + 1))]]
|
||||
),
|
||||
smtbmc.check(),
|
||||
)
|
||||
|
||||
counter += 1
|
||||
path = App.trace_dir_full / f"trace{i}_{counter}.yw"
|
||||
|
||||
await batch(smtbmc.pop(), smtbmc.assertions(i))
|
||||
|
||||
i += 1
|
||||
|
||||
smtbmc.close_stdin()
|
||||
|
||||
|
||||
class Smtbmc(tl.process.Process):
|
||||
def __init__(self, smt2_model: Path):
|
||||
self[tl.LogContext].scope = "smtbmc"
|
||||
super().__init__(
|
||||
[
|
||||
"yosys-smtbmc",
|
||||
"--incremental",
|
||||
*App.smtbmc_options,
|
||||
str(smt2_model),
|
||||
],
|
||||
interact=True,
|
||||
)
|
||||
self.name = "smtbmc"
|
||||
|
||||
self.expected_results = []
|
||||
|
||||
async def on_run(self) -> None:
|
||||
def output_handler(event: tl.process.StderrEvent):
|
||||
result = json.loads(event.output)
|
||||
tl.log_debug(f"smtbmc > {result!r}")
|
||||
if "err" in result:
|
||||
exception = tl.logging.LoggedError(
|
||||
tl.log_error(result["err"], raise_error=False)
|
||||
)
|
||||
self.expected_results.pop(0).set_exception(exception)
|
||||
if "msg" in result:
|
||||
tl.log(result["msg"])
|
||||
if "ok" in result:
|
||||
assert self.expected_results
|
||||
self.expected_results.pop(0).set_result(result["ok"])
|
||||
|
||||
self.sync_handle_events(tl.process.StdoutEvent, output_handler)
|
||||
|
||||
return await super().on_run()
|
||||
|
||||
def ping(self) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="ping")
|
||||
|
||||
def incremental_command(self, **command: dict[Any]) -> Awaitable[Any]:
|
||||
tl.log_debug(f"smtbmc < {command!r}")
|
||||
self.write(json.dumps(command))
|
||||
self.write("\n")
|
||||
result = asyncio.Future()
|
||||
self.expected_results.append(result)
|
||||
|
||||
return result
|
||||
|
||||
def new_step(self, step: int) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="new_step", step=step)
|
||||
|
||||
def push(self) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="push")
|
||||
|
||||
def pop(self) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="pop")
|
||||
|
||||
def check(self) -> Awaitable[str]:
|
||||
return self.incremental_command(cmd="check")
|
||||
|
||||
def assert_antecedent(self, expr: Any) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="assert_antecedent", expr=expr)
|
||||
|
||||
def assert_consequent(self, expr: Any) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="assert_consequent", expr=expr)
|
||||
|
||||
def assert_(self, expr: Any) -> Awaitable[None]:
|
||||
return self.incremental_command(cmd="assert", expr=expr)
|
||||
|
||||
def hierarchy(self, step: int) -> Awaitable[None]:
|
||||
return self.assert_antecedent(["mod_h", ["step", step]])
|
||||
|
||||
def assumptions(self, step: int, valid: bool = True) -> Awaitable[None]:
|
||||
expr = ["mod_u", ["step", step]]
|
||||
if not valid:
|
||||
expr = ["not", expr]
|
||||
return self.assert_consequent(expr)
|
||||
|
||||
def assertions(self, step: int, valid: bool = True) -> Awaitable[None]:
|
||||
expr = ["mod_a", ["step", step]]
|
||||
if not valid:
|
||||
expr = ["not", expr]
|
||||
return self.assert_(expr)
|
||||
|
||||
def initial(self, step: int, initial: bool) -> Awaitable[None]:
|
||||
if initial:
|
||||
return batch(
|
||||
self.assert_antecedent(["mod_i", ["step", step]]),
|
||||
self.assert_antecedent(["mod_is", ["step", step]]),
|
||||
)
|
||||
else:
|
||||
return self.assert_antecedent(["not", ["mod_is", ["step", step]]])
|
||||
|
||||
def transition(self, pred: int, succ: int) -> Awaitable[None]:
|
||||
return self.assert_antecedent(["mod_t", ["step", pred], ["step", succ]])
|
||||
|
||||
def bmc_step(
|
||||
self,
|
||||
step: int,
|
||||
initial: bool = False,
|
||||
assertions: bool | None = True,
|
||||
pred: int | None = None,
|
||||
) -> Awaitable[None]:
|
||||
futures = []
|
||||
futures.append(self.new_step(step))
|
||||
futures.append(self.hierarchy(step))
|
||||
futures.append(self.assumptions(step))
|
||||
futures.append(self.initial(step, initial))
|
||||
|
||||
if pred is not None:
|
||||
futures.append(self.transition(pred, step))
|
||||
|
||||
if assertions is not None:
|
||||
futures.append(self.assertions(assertions))
|
||||
|
||||
return batch(*futures)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
0
tools/cexenum/examples/.gitignore
vendored
Normal file
0
tools/cexenum/examples/.gitignore
vendored
Normal file
49
tools/cexenum/examples/factor.sby
Normal file
49
tools/cexenum/examples/factor.sby
Normal file
|
@ -0,0 +1,49 @@
|
|||
# Run using:
|
||||
#
|
||||
# sby -f factor.sby
|
||||
# tabbypy3 cexenum.py factor --enum-depth=0
|
||||
#
|
||||
[options]
|
||||
mode bmc
|
||||
make_model prep,smt2
|
||||
expect unknown
|
||||
|
||||
[engines]
|
||||
none
|
||||
|
||||
[script]
|
||||
read_verilog -sv top.sv
|
||||
prep -top top
|
||||
|
||||
[file top.sv]
|
||||
module top(input clk, input b_bit, output [15:0] acc);
|
||||
reg [7:0] a;
|
||||
reg [7:0] b_mask = 8'hff;
|
||||
|
||||
|
||||
reg [15:0] a_shift = 0;
|
||||
reg [15:0] acc = 0;
|
||||
|
||||
|
||||
always @(posedge clk) begin
|
||||
assume (!clk);
|
||||
if ($initstate) begin
|
||||
a_shift <= a;
|
||||
acc <= 0;
|
||||
end else begin
|
||||
|
||||
if (b_bit) begin
|
||||
acc <= acc + a_shift;
|
||||
end
|
||||
a_shift <= a_shift << 1;
|
||||
b_mask <= b_mask >> 1;
|
||||
end
|
||||
|
||||
if (b_mask == 0) begin
|
||||
a <= 0;
|
||||
assert (acc != 100);
|
||||
end;
|
||||
|
||||
end
|
||||
|
||||
endmodule
|
Loading…
Add table
Add a link
Reference in a new issue