mirror of
https://github.com/YosysHQ/sby.git
synced 2025-04-12 08:28:16 +00:00
1521 lines
48 KiB
Python
Executable file
1521 lines
48 KiB
Python
Executable file
#!/usr/bin/env tabbypy3
|
|
from __future__ import annotations
|
|
import asyncio
|
|
|
|
import json
|
|
import threading
|
|
import traceback
|
|
import argparse
|
|
import shutil
|
|
import shlex
|
|
import os
|
|
import sys
|
|
import urllib.parse
|
|
import random
|
|
from collections import Counter, defaultdict, deque
|
|
from pathlib import Path
|
|
from typing import Any, Awaitable, Iterable, Literal
|
|
|
|
try:
|
|
import readline # type: ignore # noqa
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
libexec = Path(__file__).parent.resolve() / "libexec"
|
|
|
|
if libexec.exists():
|
|
os.environb[b"PATH"] = bytes(libexec) + b":" + os.environb[b"PATH"]
|
|
sys.path.insert(0, str(libexec))
|
|
|
|
if True:
|
|
import yosys_mau.task_loop.job_server as job
|
|
from yosys_mau import task_loop as tl
|
|
from yosys_mau.stable_set import StableSet
|
|
|
|
|
|
def safe_smtlib_id(name: str) -> str:
|
|
return f"|{urllib.parse.quote_plus(name).replace('+', ' ')}|"
|
|
|
|
|
|
def arg_parser():
|
|
parser = argparse.ArgumentParser(
|
|
prog="cexenum", usage="%(prog)s [options] <sby_workdir>"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"work_dir",
|
|
metavar="<workdir>",
|
|
help="existing SBY work directory",
|
|
type=Path,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--depth",
|
|
type=int,
|
|
metavar="N",
|
|
help="BMC depth for the initial assertion failure (default: %(default)s)",
|
|
default=100,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--enum-depth",
|
|
type=int,
|
|
metavar="N",
|
|
help="number of time steps to run enumeration for, starting with"
|
|
" and including the time step of the first assertion failure"
|
|
" (default: %(default)s)",
|
|
default=10,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--no-sim",
|
|
dest="sim",
|
|
action="store_false",
|
|
help="do not run sim to obtain .fst traces for the enumerated counter examples",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--smtbmc-options",
|
|
metavar='"..."',
|
|
type=shlex.split,
|
|
help='command line options to pass to smtbmc (default: "%(default)s")',
|
|
default="-s yices --unroll",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--callback",
|
|
metavar='"..."',
|
|
type=shlex.split,
|
|
help="command that will receive enumerated traces on stdin and can control "
|
|
"the enumeration via stdout (pass '-' to handle callbacks interactively)",
|
|
default="",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--track-assumes",
|
|
help="track individual assumptions and in case the assumptions cannot be "
|
|
"satisfied, report a subset of used assumptions that are in conflict",
|
|
action="store_true",
|
|
)
|
|
parser.add_argument(
|
|
"--minimize-assumes",
|
|
help="when using --track-assumes, solve for a minimal set of sufficient "
|
|
"assumptions",
|
|
action="store_true",
|
|
)
|
|
parser.add_argument(
|
|
"--satisfy-assumes",
|
|
help="when minimizing counter examples, keep design assumptions satisfied",
|
|
action="store_true",
|
|
)
|
|
parser.add_argument(
|
|
"--diversify",
|
|
help="produce a more diverse set of counter examples early on",
|
|
action="store_true",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--diversify-seed",
|
|
metavar="<N>",
|
|
help="random seed used for --diversify",
|
|
type=int,
|
|
default=1,
|
|
)
|
|
parser.add_argument(
|
|
"--diversify-depth",
|
|
help="stop diversification after enumerating the given number of traces",
|
|
type=int,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--max-per-step",
|
|
metavar="<N>",
|
|
help="advance to the next step after enumerating N traces",
|
|
type=int,
|
|
)
|
|
|
|
parser.add_argument("--debug", action="store_true", help="enable debug logging")
|
|
parser.add_argument(
|
|
"--debug-events", action="store_true", help="enable debug event logging"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-j",
|
|
metavar="<N>",
|
|
type=int,
|
|
dest="jobs",
|
|
help="maximum number of processes to run in parallel",
|
|
default=None,
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def lines(*args: Any):
|
|
return "".join(f"{line}\n" for line in args)
|
|
|
|
|
|
@tl.task_context
|
|
class App:
|
|
raw_args: argparse.Namespace
|
|
|
|
debug: bool = False
|
|
debug_events: bool = False
|
|
|
|
track_assumes: bool = False
|
|
minimize_assumes: bool = False
|
|
satisfy_assumes: bool = False
|
|
|
|
depth: int
|
|
enum_depth: int
|
|
sim: bool
|
|
diversify: bool
|
|
diversify_seed: int
|
|
diversify_depth: int | None = None
|
|
max_per_step: int = 0
|
|
|
|
callback: list[str]
|
|
|
|
smtbmc_options: list[str]
|
|
|
|
work_dir: Path
|
|
|
|
work_subdir: Path
|
|
trace_dir_full: Path
|
|
trace_dir_min: Path
|
|
cache_dir: Path
|
|
|
|
|
|
def main() -> None:
|
|
args = arg_parser().parse_args()
|
|
|
|
job.global_client(args.jobs)
|
|
|
|
# Move command line arguments into the App context
|
|
for name in dir(args):
|
|
if name in type(App).__mro__[1].__annotations__:
|
|
setattr(App, name, getattr(args, name))
|
|
|
|
App.raw_args = args
|
|
|
|
try:
|
|
tl.run_task_loop(task_loop_main)
|
|
except tl.TaskCancelled:
|
|
exit(1)
|
|
except BaseException as e:
|
|
if App.debug or App.debug_events:
|
|
traceback.print_exc()
|
|
tl.log_exception(e, raise_error=False) # Automatically avoids double logging
|
|
exit(1)
|
|
|
|
|
|
def setup_logging():
|
|
tl.LogContext.app_name = "CEXENUM"
|
|
tl.logging.start_logging()
|
|
|
|
if App.debug_events:
|
|
tl.logging.start_debug_event_logging()
|
|
if App.debug:
|
|
tl.LogContext.level = "debug"
|
|
|
|
def error_handler(err: BaseException):
|
|
if isinstance(err, tl.TaskCancelled):
|
|
return
|
|
tl.log_exception(err, raise_error=True)
|
|
|
|
tl.current_task().set_error_handler(None, error_handler)
|
|
|
|
|
|
async def batch(*args: Awaitable[Any]):
|
|
result = None
|
|
for arg in args:
|
|
if arg is not None:
|
|
result = await arg
|
|
return result
|
|
|
|
|
|
async def task_loop_main() -> None:
|
|
setup_logging()
|
|
|
|
cached = False
|
|
|
|
App.cache_dir = App.work_dir / "cexenum_cache"
|
|
try:
|
|
App.cache_dir.mkdir()
|
|
except FileExistsError:
|
|
if (App.cache_dir / "done").exists():
|
|
cached = True
|
|
else:
|
|
shutil.rmtree(App.cache_dir)
|
|
App.cache_dir.mkdir()
|
|
|
|
App.work_subdir = App.work_dir / "cexenum"
|
|
try:
|
|
App.work_subdir.mkdir()
|
|
except FileExistsError:
|
|
shutil.rmtree(App.work_subdir)
|
|
App.work_subdir.mkdir()
|
|
|
|
App.trace_dir_full = App.work_subdir / "full"
|
|
App.trace_dir_full.mkdir()
|
|
App.trace_dir_min = App.work_subdir / "min"
|
|
App.trace_dir_min.mkdir()
|
|
|
|
if cached:
|
|
tl.log("Reusing cached AIGER model")
|
|
aig_model = tl.Task()
|
|
else:
|
|
aig_model = AigModel()
|
|
|
|
Enumeration(aig_model)
|
|
|
|
|
|
class AigModel(tl.process.Process):
|
|
def __init__(self):
|
|
self[tl.LogContext].scope = "aiger"
|
|
(App.cache_dir / "design_aiger.ys").write_text(
|
|
lines(
|
|
"read_rtlil ../model/design_prep.il",
|
|
"hierarchy -simcheck",
|
|
"flatten",
|
|
"setundef -undriven -anyseq",
|
|
"setattr -set keep 1 w:\\*",
|
|
"delete -output",
|
|
"opt -full",
|
|
"techmap",
|
|
"opt -fast",
|
|
"memory_map -formal",
|
|
"formalff -clk2ff -ff2anyinit",
|
|
"simplemap",
|
|
"dffunmap",
|
|
"abc -g AND -fast",
|
|
"opt_clean",
|
|
"stat",
|
|
"write_rtlil design_aiger.il",
|
|
"write_aiger -I -B -zinit"
|
|
" -map design_aiger.aim -ywmap design_aiger.ywa design_aiger.aig",
|
|
)
|
|
)
|
|
super().__init__(
|
|
["yosys", "-ql", "design_aiger.log", "design_aiger.ys"], cwd=App.cache_dir
|
|
)
|
|
self.name = "aiger"
|
|
self.log_output()
|
|
|
|
async def on_run(self) -> None:
|
|
await super().on_run()
|
|
(App.cache_dir / "done").write_text("")
|
|
|
|
|
|
class MinimizeTrace(tl.Task):
|
|
def __init__(self, trace_name: str, aig_model: tl.Task):
|
|
super().__init__()
|
|
self.trace_name = trace_name
|
|
|
|
full_yw = App.trace_dir_full / self.trace_name
|
|
min_yw = App.trace_dir_min / self.trace_name
|
|
min_x_yw = min_yw.with_suffix(".x.yw")
|
|
|
|
stem = full_yw.stem
|
|
|
|
full_aiw = full_yw.with_suffix(".aiw")
|
|
min_aiw = min_yw.with_suffix(".aiw")
|
|
|
|
yw2aiw = YosysWitness(
|
|
"yw2aiw",
|
|
full_yw,
|
|
App.cache_dir / "design_aiger.ywa",
|
|
full_aiw,
|
|
cwd=App.trace_dir_full,
|
|
)
|
|
yw2aiw.depends_on(aig_model)
|
|
yw2aiw[tl.LogContext].scope = f"yw2aiw[{stem}]"
|
|
|
|
aigcexmin = AigCexMin(
|
|
App.cache_dir / "design_aiger.aig",
|
|
full_aiw,
|
|
min_aiw,
|
|
cwd=App.trace_dir_min,
|
|
)
|
|
aigcexmin.depends_on(yw2aiw)
|
|
aigcexmin[tl.LogContext].scope = f"aigcexmin[{stem}]"
|
|
|
|
self.aiw2yw = aiw2yw = YosysWitness(
|
|
"aiw2yw",
|
|
min_aiw,
|
|
App.cache_dir / "design_aiger.ywa",
|
|
min_yw,
|
|
cwd=App.trace_dir_min,
|
|
options=("--skip-x", "--present-only"),
|
|
)
|
|
aiw2yw[tl.LogContext].scope = f"aiw2yw[{stem}]"
|
|
aiw2yw.depends_on(aigcexmin)
|
|
|
|
self.aiw2yw_x = aiw2yw_x = YosysWitness(
|
|
"aiw2yw",
|
|
min_aiw,
|
|
App.cache_dir / "design_aiger.ywa",
|
|
min_x_yw,
|
|
cwd=App.trace_dir_min,
|
|
options=("--present-only",),
|
|
)
|
|
aiw2yw_x[tl.LogContext].scope = f"aiw2yw[{stem}(x)]"
|
|
aiw2yw_x.depends_on(aigcexmin)
|
|
|
|
sim = SimTrace(
|
|
App.cache_dir / "design_aiger.il",
|
|
full_yw,
|
|
full_yw.with_suffix(".fst"),
|
|
cwd=App.trace_dir_full,
|
|
script_only=not App.sim,
|
|
)
|
|
sim.depends_on(aig_model)
|
|
sim[tl.LogContext].scope = f"full-sim[{stem}]"
|
|
|
|
sim = SimTrace(
|
|
App.cache_dir / "design_aiger.il",
|
|
min_x_yw,
|
|
min_yw.with_suffix(".fst"),
|
|
cwd=App.trace_dir_min,
|
|
script_only=not App.sim,
|
|
)
|
|
|
|
sim[tl.LogContext].scope = f"sim[{stem}]"
|
|
sim.depends_on(aiw2yw_x)
|
|
|
|
|
|
def relative_to(target: Path, cwd: Path) -> Path:
|
|
prefix = Path("")
|
|
target = target.resolve()
|
|
cwd = cwd.resolve()
|
|
|
|
ok = False
|
|
for limit in (Path.cwd(), App.work_dir):
|
|
limit = Path.cwd().resolve()
|
|
try:
|
|
target.relative_to(limit)
|
|
ok = True
|
|
except ValueError:
|
|
pass
|
|
if not ok:
|
|
return target
|
|
|
|
while True:
|
|
try:
|
|
return prefix / (target.relative_to(cwd))
|
|
except ValueError:
|
|
prefix = prefix / ".."
|
|
if cwd == cwd.parent:
|
|
return target
|
|
cwd = cwd.parent
|
|
|
|
|
|
class YosysWitness(tl.process.Process):
|
|
def __init__(
|
|
self,
|
|
mode: Literal["yw2aiw", "aiw2yw"],
|
|
input: Path,
|
|
mapfile: Path,
|
|
output: Path,
|
|
cwd: Path,
|
|
options: Iterable[str] = (),
|
|
):
|
|
super().__init__(
|
|
[
|
|
"yosys-witness",
|
|
mode,
|
|
*(options or []),
|
|
str(relative_to(input, cwd)),
|
|
str(relative_to(mapfile, cwd)),
|
|
str(relative_to(output, cwd)),
|
|
],
|
|
cwd=cwd,
|
|
)
|
|
|
|
def handler(event: tl.process.OutputEvent):
|
|
tl.log_debug(event.output.rstrip("\n"))
|
|
|
|
self.sync_handle_events(tl.process.OutputEvent, handler)
|
|
|
|
|
|
class AigCexMin(tl.process.Process):
|
|
def __init__(self, design_aig: Path, input_aiw: Path, output_aiw: Path, cwd: Path):
|
|
aigcexmin_args = []
|
|
if App.satisfy_assumes:
|
|
aigcexmin_args.append("--satisfy-assumptions")
|
|
super().__init__(
|
|
[
|
|
"aigcexmin",
|
|
*aigcexmin_args,
|
|
str(relative_to(design_aig, cwd)),
|
|
str(relative_to(input_aiw, cwd)),
|
|
str(relative_to(output_aiw, cwd)),
|
|
],
|
|
cwd=cwd,
|
|
)
|
|
|
|
self.log_path = output_aiw.with_suffix(".log")
|
|
self.log_file = None
|
|
|
|
def handler(event: tl.process.OutputEvent):
|
|
if self.log_file is None:
|
|
self.log_file = self.log_path.open("w")
|
|
self.log_file.write(event.output)
|
|
self.log_file.flush()
|
|
tl.log_debug(event.output.rstrip("\n"))
|
|
|
|
self.sync_handle_events(tl.process.OutputEvent, handler)
|
|
|
|
def on_cleanup(self):
|
|
if self.log_file is not None:
|
|
self.log_file.close()
|
|
super().on_cleanup()
|
|
|
|
|
|
class SimTrace(tl.process.Process):
|
|
def __init__(
|
|
self,
|
|
design_il: Path,
|
|
input_yw: Path,
|
|
output_fst: Path,
|
|
cwd: Path,
|
|
script_only: bool = False,
|
|
):
|
|
self[tl.LogContext].scope = "sim"
|
|
self._script_only = script_only
|
|
|
|
script_file = output_fst.with_suffix(".fst.ys")
|
|
log_file = output_fst.with_suffix(".fst.log")
|
|
|
|
script_file.write_text(
|
|
lines(
|
|
f"read_rtlil {relative_to(design_il, cwd)}",
|
|
"logger -nowarn"
|
|
' "Yosys witness trace has an unexpected value for the clock input"',
|
|
f"sim -zinit -r {relative_to(input_yw, cwd)} -hdlname"
|
|
f" -fst {relative_to(output_fst, cwd)}",
|
|
)
|
|
)
|
|
super().__init__(
|
|
[
|
|
"yosys",
|
|
"-ql",
|
|
str(relative_to(log_file, cwd)),
|
|
str(relative_to(script_file, cwd)),
|
|
],
|
|
cwd=cwd,
|
|
)
|
|
self.name = "sim"
|
|
self.log_output()
|
|
|
|
async def on_run(self):
|
|
if self._script_only:
|
|
tl.log(f"manually run {self.shell_command} to generate a full trace")
|
|
else:
|
|
await super().on_run()
|
|
|
|
|
|
class Callback(tl.TaskGroup):
|
|
recover_from_errors = False
|
|
|
|
def __init__(self, enumeration: Enumeration):
|
|
super().__init__()
|
|
self[tl.LogContext].scope = "callback"
|
|
self.search_next = False
|
|
self.enumeration = enumeration
|
|
self.tmp_counter = 0
|
|
|
|
async def step_callback(self, step: int) -> Literal["advance", "search"]:
|
|
with self.as_current_task():
|
|
return await self._callback(step=step)
|
|
|
|
async def unsat_callback(self, step: int) -> Literal["advance", "search"]:
|
|
with self.as_current_task():
|
|
return await self._callback(step=step, unsat=True)
|
|
|
|
async def trace_callback(
|
|
self, step: int, path: Path
|
|
) -> Literal["advance", "search"]:
|
|
with self.as_current_task():
|
|
return await self._callback(step=step, trace_path=path)
|
|
|
|
def exit(self):
|
|
pass
|
|
|
|
async def _callback(
|
|
self,
|
|
step: int,
|
|
trace_path: Path | None = None,
|
|
unsat: bool = False,
|
|
) -> Literal["advance", "search"]:
|
|
if not self.is_active():
|
|
if unsat:
|
|
return "advance"
|
|
return "search"
|
|
if trace_path is None and self.search_next:
|
|
if unsat:
|
|
return "advance"
|
|
return "search"
|
|
self.search_next = False
|
|
|
|
info = dict(step=step, enabled=sorted(self.enumeration.active_assumptions))
|
|
|
|
if trace_path:
|
|
self.callback_write(
|
|
{**info, "event": "trace", "trace_path": str(trace_path)}
|
|
)
|
|
elif unsat:
|
|
self.callback_write({**info, "event": "unsat"})
|
|
else:
|
|
self.callback_write({**info, "event": "step"})
|
|
|
|
while True:
|
|
try:
|
|
response: dict[Any, Any] | Any = await self.callback_read()
|
|
if not isinstance(response, (Exception, dict)):
|
|
raise ValueError(
|
|
f"expected JSON object, got: {json.dumps(response)}"
|
|
)
|
|
if isinstance(response, Exception):
|
|
raise response
|
|
|
|
did_something = False
|
|
|
|
if "block_yw" in response:
|
|
did_something = True
|
|
block_yw: Any = response["block_yw"]
|
|
|
|
if not isinstance(block_yw, str):
|
|
raise ValueError(
|
|
"'block_yw' must be a string containing a file path, "
|
|
f"got: {json.dumps(block_yw)}"
|
|
)
|
|
name: Any = response.get("name")
|
|
if name is not None and not isinstance(name, str):
|
|
raise ValueError(
|
|
"optional 'name' must be a string when present, "
|
|
"got: {json.dumps(name)}"
|
|
)
|
|
self.enumeration.block_trace(Path(block_yw), name=name)
|
|
|
|
if "block_aiw" in response:
|
|
did_something = True
|
|
block_aiw: Any = response["block_aiw"]
|
|
if not isinstance(block_aiw, str):
|
|
raise ValueError(
|
|
"'block_yw' must be a string containing a file path, "
|
|
f"got: {json.dumps(block_aiw)}"
|
|
)
|
|
name: Any = response.get("name")
|
|
if name is not None and not isinstance(name, str):
|
|
raise ValueError(
|
|
"optional 'name' must be a string when present, "
|
|
"got: {json.dumps(name)}"
|
|
)
|
|
|
|
tmpdir = App.work_subdir / "tmp"
|
|
tmpdir.mkdir(exist_ok=True)
|
|
self.tmp_counter += 1
|
|
block_yw = tmpdir / f"callback_{self.tmp_counter}.yw"
|
|
|
|
aiw2yw = YosysWitness(
|
|
"aiw2yw",
|
|
Path(block_aiw),
|
|
App.cache_dir / "design_aiger.ywa",
|
|
Path(block_yw),
|
|
cwd=tmpdir,
|
|
options=("--skip-x", "--present-only"),
|
|
)
|
|
aiw2yw[tl.LogContext].scope = f"aiw2yw[callback_{self.tmp_counter}]"
|
|
|
|
await aiw2yw.finished
|
|
self.enumeration.block_trace(Path(block_yw), name=name)
|
|
|
|
if "disable" in response:
|
|
did_something = True
|
|
name = response["disable"]
|
|
if not isinstance(name, str):
|
|
raise ValueError(
|
|
"'disable' must be a string representing an assumption, "
|
|
f"got: {json.dumps(name)}"
|
|
)
|
|
self.enumeration.disable_assumption(name)
|
|
if "enable" in response:
|
|
did_something = True
|
|
name = response["enable"]
|
|
if not isinstance(name, str):
|
|
raise ValueError(
|
|
"'disable' must be a string representing an assumption, "
|
|
f"got: {json.dumps(name)}"
|
|
)
|
|
self.enumeration.enable_assumption(name)
|
|
action: Any = response.get("action")
|
|
if action == "next":
|
|
did_something = True
|
|
self.search_next = True
|
|
action = "search"
|
|
if action in ("search", "advance"):
|
|
return action
|
|
if not did_something:
|
|
raise ValueError(
|
|
f"could not interpret callback response: {response}"
|
|
)
|
|
except Exception as e:
|
|
tl.log_exception(e, raise_error=False)
|
|
if not self.recover_from_errors:
|
|
raise
|
|
|
|
def is_active(self) -> bool:
|
|
return False
|
|
|
|
def callback_write(self, data: Any):
|
|
return
|
|
|
|
async def callback_read(self) -> Any:
|
|
raise NotImplementedError("must be implemented in Callback subclass")
|
|
|
|
|
|
class InteractiveCallback(Callback):
|
|
recover_from_errors = True
|
|
|
|
interactive_shortcuts = {
|
|
"n": '{"action": "next"}',
|
|
"s": '{"action": "search"}',
|
|
"a": '{"action": "advance"}',
|
|
}
|
|
|
|
def __init__(self, enumeration: Enumeration):
|
|
super().__init__(enumeration)
|
|
self.__eof_reached = False
|
|
|
|
def is_active(self) -> bool:
|
|
return not self.__eof_reached
|
|
|
|
def callback_write(self, data: Any):
|
|
print(f"callback: {json.dumps(data)}")
|
|
|
|
async def callback_read(self) -> Any:
|
|
future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def blocking_read():
|
|
try:
|
|
try:
|
|
result = ""
|
|
while not result:
|
|
result = input(
|
|
"callback> " if sys.stdout.isatty() else ""
|
|
).strip()
|
|
result = self.interactive_shortcuts.get(result, result)
|
|
result_data = json.loads(result)
|
|
except EOFError:
|
|
print()
|
|
self.__eof_reached = True
|
|
result_data = dict(action="next")
|
|
loop.call_soon_threadsafe(lambda: future.set_result(result_data))
|
|
except Exception as exc:
|
|
exception = exc
|
|
loop.call_soon_threadsafe(lambda: future.set_exception(exception))
|
|
|
|
thread = threading.Thread(target=blocking_read, daemon=True)
|
|
thread.start()
|
|
|
|
return await future
|
|
|
|
|
|
class ProcessCallback(Callback):
|
|
def __init__(self, enumeration: Enumeration, command: list[str]):
|
|
super().__init__(enumeration)
|
|
self[tl.LogContext].scope = "callback"
|
|
self.__eof_reached = False
|
|
self._command = command
|
|
self._lines: list[asyncio.Future[str]] = [asyncio.Future()]
|
|
|
|
def exit(self):
|
|
self.process.close_stdin()
|
|
|
|
async def on_prepare(self) -> None:
|
|
self.process = tl.Process(self._command, cwd=Path.cwd(), interact=True)
|
|
self.process.use_lease = False
|
|
|
|
future_line: asyncio.Future[str] = self._lines[-1]
|
|
|
|
def stdout_handler(event: tl.process.StdoutEvent):
|
|
nonlocal future_line
|
|
future_line.set_result(event.output)
|
|
future_line = asyncio.Future()
|
|
self._lines.append(future_line)
|
|
|
|
self.process.sync_handle_events(tl.process.StdoutEvent, stdout_handler)
|
|
|
|
def stderr_handler(event: tl.process.StderrEvent):
|
|
tl.log(event.output)
|
|
|
|
self.process.sync_handle_events(tl.process.StderrEvent, stderr_handler)
|
|
|
|
def exit_handler(event: tl.process.ExitEvent):
|
|
future_line.set_exception(EOFError("callback process exited"))
|
|
# Mark the exception as retrieved
|
|
future_line.exception()
|
|
|
|
self.process.sync_handle_events(tl.process.ExitEvent, exit_handler)
|
|
|
|
def is_active(self) -> bool:
|
|
return not self.__eof_reached
|
|
|
|
def callback_write(self, data: Any):
|
|
if not self.process.is_finished:
|
|
self.process.write(json.dumps(data) + "\n")
|
|
|
|
async def callback_read(self) -> Any:
|
|
future_line = self._lines.pop(0)
|
|
try:
|
|
data = json.loads(await future_line)
|
|
tl.log_debug(f"callback action: {data}")
|
|
return data
|
|
except EOFError:
|
|
self._lines.insert(0, future_line)
|
|
self.__eof_reached = True
|
|
return dict(action="next")
|
|
|
|
|
|
def enter_task(fn):
|
|
# TODO move into mau
|
|
import functools
|
|
|
|
@functools.wraps(fn)
|
|
def wrapped_fn(self, *args, **kwargs):
|
|
with self.as_current_task():
|
|
return fn(self, *args, **kwargs)
|
|
|
|
return wrapped_fn
|
|
|
|
|
|
def inv_signal(sig):
|
|
return (*sig[:-1], "0" if sig[-1] != "0" else "1")
|
|
|
|
|
|
class Diversifier(tl.TaskGroup):
|
|
def __init__(self, seed, enumeration_depth):
|
|
super().__init__()
|
|
self._rng = random.Random(seed)
|
|
self._failed: StableSet[Any] = StableSet()
|
|
self._failed_sets: dict[Any, list[StableSet[Any]]] = defaultdict(list)
|
|
self._active: StableSet[Any] = StableSet()
|
|
|
|
self._current_id = 0
|
|
|
|
self._targets: dict[int, StableSet[Any]] = {}
|
|
|
|
self._sig_index: dict[Any, StableSet[int]] = defaultdict(StableSet)
|
|
|
|
self._depth_limit = 0
|
|
|
|
self._enumeration_depth_left = enumeration_depth
|
|
|
|
self._unsat_counter = 0
|
|
|
|
self[tl.LogContext].scope = "diversifier"
|
|
# self[tl.LogContext].level = "debug"
|
|
|
|
def _new_id(self):
|
|
self._current_id += 1
|
|
return self._current_id
|
|
|
|
@enter_task
|
|
def new_step(self):
|
|
self._failed = StableSet()
|
|
self._failed_sets = defaultdict(list)
|
|
self._active = StableSet()
|
|
self._depth_limit = 0
|
|
self._unsat_counter = 0
|
|
|
|
@enter_task
|
|
def diversify(self):
|
|
if self._enumeration_depth_left is not None:
|
|
if self._enumeration_depth_left <= 0:
|
|
return StableSet()
|
|
|
|
selected = StableSet()
|
|
blocked = StableSet()
|
|
|
|
pending = list(self._targets.values())
|
|
|
|
tl.log_debug(f"{len(pending)=} initial")
|
|
|
|
depth = 0
|
|
|
|
deferred = []
|
|
while pending or (deferred and depth < self._depth_limit):
|
|
if not pending:
|
|
for subspace in deferred:
|
|
new_subspace = StableSet(
|
|
sig for sig in subspace if sig[:-1] not in blocked
|
|
)
|
|
if new_subspace:
|
|
pending.append(new_subspace)
|
|
deferred = []
|
|
depth += 1
|
|
continue
|
|
|
|
counter = Counter()
|
|
|
|
for subspace in self._shuffle_list(pending):
|
|
counter.update(self._shuffle_list(subspace))
|
|
|
|
sig, sig_count = max(counter.items(), key=lambda item: item[1])
|
|
|
|
tl.log_debug(f"{sig=} {sig_count=}")
|
|
|
|
selected.add(sig)
|
|
blocked.add(sig[:-1])
|
|
|
|
failed = False
|
|
|
|
for candidate in self._failed_sets[sig]:
|
|
if candidate.issubset(selected):
|
|
failed = True
|
|
tl.log_debug(f"failed {candidate=}")
|
|
break
|
|
|
|
if not failed:
|
|
for candidate in self._failed_sets[sig]:
|
|
target = next(s for s in candidate if s not in selected)
|
|
self._failed_sets[target].append(candidate)
|
|
|
|
self._failed_sets.pop(sig, None)
|
|
|
|
if failed:
|
|
selected.remove(sig)
|
|
sig = None
|
|
|
|
new_pending = []
|
|
|
|
for subspace in pending:
|
|
new_subspace = StableSet(
|
|
sig for sig in subspace if sig[:-1] not in blocked
|
|
)
|
|
if not new_subspace:
|
|
continue
|
|
if sig in subspace:
|
|
deferred.append(new_subspace)
|
|
else:
|
|
new_pending.append(new_subspace)
|
|
pending = new_pending
|
|
|
|
tl.log_debug(f"{len(pending)=} {selected=}")
|
|
|
|
return selected
|
|
|
|
def _add_target(self, signals):
|
|
signals.difference_update(self._failed)
|
|
if signals:
|
|
id = self._new_id()
|
|
self._targets[id] = signals
|
|
for sig in signals:
|
|
self._sig_index[sig].add(id)
|
|
|
|
def _remove_target(self, id):
|
|
signals = self._targets.pop(id)
|
|
for sig in signals:
|
|
self._sig_index[sig].remove(id)
|
|
|
|
def _remove_target_sig(self, id, sig):
|
|
self._targets[id].remove(sig)
|
|
self._sig_index[sig].remove(id)
|
|
|
|
@enter_task
|
|
def update(self, signals):
|
|
if self._enumeration_depth_left is not None:
|
|
if self._enumeration_depth_left > 0:
|
|
self._enumeration_depth_left -= 1
|
|
if self._enumeration_depth_left == 0:
|
|
tl.log(
|
|
"reached diversification depth, "
|
|
"disabling diversification for future traces"
|
|
)
|
|
self._unsat_counter = 0
|
|
inv_signals = StableSet(map(inv_signal, signals))
|
|
self._add_target(inv_signals)
|
|
self._depth_limit += 1
|
|
tl.log_debug(f"increased {self._depth_limit=}")
|
|
|
|
@enter_task
|
|
def failed(self, failed):
|
|
self._unsat_counter += 1
|
|
self._depth_limit = max(0, self._depth_limit - 1)
|
|
tl.log_debug(f"decreased {self._depth_limit=}")
|
|
|
|
failed_sig = next(iter(failed))
|
|
self._failed_sets[failed_sig].append(failed)
|
|
|
|
if len(failed) == 1:
|
|
self._failed.add(failed_sig)
|
|
|
|
for id in list(self._sig_index[failed_sig]):
|
|
self._remove_target_sig(id, failed_sig)
|
|
|
|
tl.log(f"found {len(self._failed)} settled inputs")
|
|
|
|
def should_minimize(self):
|
|
return self._unsat_counter > 2
|
|
|
|
def _shuffle_set(self, items):
|
|
return StableSet(self._shuffle_list(items))
|
|
|
|
def _shuffle_list(self, items):
|
|
as_list = list(items)
|
|
self._rng.shuffle(as_list)
|
|
return as_list
|
|
|
|
|
|
def yw_to_dicts(yw_trace):
|
|
"""Convert a .yw JSON object to a list of dicts.
|
|
|
|
Every item of the list corresponds to one step. For every step it contains a dict
|
|
mapping (*path, index) pairs to the bit value for bits that are present.
|
|
"""
|
|
|
|
signal_bits = []
|
|
for signal in yw_trace["signals"]:
|
|
chunk_offset = signal["offset"]
|
|
path = tuple(signal["path"])
|
|
for i in range(signal["width"]):
|
|
signal_bits.append((*path, chunk_offset + i))
|
|
|
|
return [
|
|
{
|
|
signal: value
|
|
for signal, value in zip(signal_bits, step["bits"][::-1])
|
|
if value in "01"
|
|
}
|
|
for step in yw_trace["steps"]
|
|
]
|
|
|
|
|
|
class Enumeration(tl.Task):
|
|
callback_mode: Literal["off", "interactive", "process"]
|
|
callback_auto_search: bool = False
|
|
|
|
def __init__(self, aig_model: tl.Task):
|
|
self.aig_model = aig_model
|
|
self._pending_blocks: list[tuple[str | None, Path, StableSet[Any] | None]] = []
|
|
self.named_assumptions: StableSet[str] = StableSet()
|
|
self.active_assumptions: StableSet[str] = StableSet()
|
|
|
|
self._probes: dict[Any, str] = {}
|
|
self._probe_defs: dict[str, Any] = {}
|
|
if App.diversify:
|
|
self._diversifier = Diversifier(App.diversify_seed, App.diversify_depth)
|
|
|
|
self._last_id: int = 0
|
|
|
|
super().__init__()
|
|
|
|
def _new_id(self) -> int:
|
|
self._last_id += 1
|
|
return self._last_id
|
|
|
|
async def on_prepare(self) -> None:
|
|
if App.callback:
|
|
if App.callback == ["-"]:
|
|
self.callback_task = InteractiveCallback(self)
|
|
else:
|
|
self.callback_task = ProcessCallback(self, App.callback)
|
|
else:
|
|
self.callback_task = Callback(self)
|
|
|
|
async def on_run(self) -> None:
|
|
self.smtbmc = smtbmc = Smtbmc(App.work_dir / "model" / "design_smt2.smt2")
|
|
self._push_level = 0
|
|
|
|
await smtbmc.ping()
|
|
|
|
try:
|
|
await self._bmc_loop()
|
|
finally:
|
|
smtbmc.close_stdin()
|
|
self.callback_task.exit()
|
|
|
|
async def _bmc_loop(self) -> None:
|
|
smtbmc = self.smtbmc
|
|
i = -1
|
|
limit = App.depth
|
|
first_failure = None
|
|
|
|
checked = "skip"
|
|
|
|
counter = 0
|
|
|
|
while i <= limit or limit < 0:
|
|
if checked != "skip":
|
|
checked = await self._search_counter_example(i)
|
|
|
|
if checked == "unsat":
|
|
if i >= 0:
|
|
action = await self.callback_task.unsat_callback(i)
|
|
if action == "search":
|
|
continue
|
|
checked = "skip"
|
|
if checked == "skip":
|
|
checked = "unsat"
|
|
i += 1
|
|
if App.diversify:
|
|
self._diversifier.new_step()
|
|
if i > limit and limit >= 0:
|
|
break
|
|
action = await self.callback_task.step_callback(i)
|
|
|
|
pending = batch(
|
|
self._top(),
|
|
smtbmc.bmc_step(
|
|
i,
|
|
initial=i == 0,
|
|
assertions=False,
|
|
pred=i - 1 if i else None,
|
|
),
|
|
)
|
|
|
|
if action == "advance":
|
|
tl.log(f"Skipping step {i}")
|
|
await batch(pending, smtbmc.assertions(i))
|
|
checked = "skip"
|
|
continue
|
|
assert action == "search"
|
|
|
|
tl.log(f"Checking assumptions in step {i}..")
|
|
presat_checked = await batch(
|
|
pending,
|
|
smtbmc.check(),
|
|
)
|
|
if presat_checked != "sat":
|
|
|
|
if App.track_assumes:
|
|
tl.log("No further counter-examples are reachable")
|
|
|
|
tl.log("Conflicting assumptions:")
|
|
unsat_assumptions = await smtbmc.get_unsat_assumptions(
|
|
App.minimize_assumes
|
|
)
|
|
|
|
for key in unsat_assumptions:
|
|
desc = await smtbmc.incremental_command(
|
|
cmd="get_design_assume", key=key
|
|
)
|
|
if desc is None:
|
|
tl.log(f" Callback blocked trace {key[1]!r}")
|
|
continue
|
|
expr, path, info = desc
|
|
tl.log(f" In {path}: {info}")
|
|
|
|
if not self.active_assumptions:
|
|
return
|
|
if first_failure is None and not self.active_assumptions:
|
|
tl.log_error("Assumptions are not satisfiable")
|
|
else:
|
|
tl.log("No further counter-examples are reachable")
|
|
if not self.active_assumptions:
|
|
return
|
|
|
|
tl.log(f"Checking assertions in step {i}..")
|
|
counter = 0
|
|
continue
|
|
elif checked == "sat":
|
|
if first_failure is None:
|
|
first_failure = i
|
|
if App.enum_depth < 0:
|
|
limit = -1
|
|
else:
|
|
limit = i + App.enum_depth
|
|
tl.log("BMC failed! Enumerating counter-examples..")
|
|
|
|
path = App.trace_dir_full / f"trace{i}_{counter}.yw"
|
|
await smtbmc.incremental_command(cmd="write_yw_trace", path=str(path))
|
|
tl.log(f"Written counter-example to {path.name}")
|
|
|
|
minimize = MinimizeTrace(path.name, self.aig_model)
|
|
minimize.depends_on(self.aig_model)
|
|
|
|
await minimize.aiw2yw.finished
|
|
|
|
min_path = App.trace_dir_min / f"trace{i}_{counter}.yw"
|
|
|
|
action = await self.callback_task.trace_callback(i, min_path)
|
|
|
|
if action == "search" and counter + 1 == App.max_per_step:
|
|
tl.log(f"Reached trace limit {App.max_per_step} for this step")
|
|
action = "advance"
|
|
|
|
if action == "advance":
|
|
tl.log("Skipping remaining counter-examples for this step")
|
|
checked = "skip"
|
|
continue
|
|
assert action == "search"
|
|
|
|
self.block_trace(min_path, diversify=App.diversify)
|
|
|
|
counter += 1
|
|
else:
|
|
tl.log_error(f"Unexpected solver result: {checked!r}")
|
|
|
|
def block_trace(self, path: Path, name: str | None = None, diversify: bool = False):
|
|
if name is not None:
|
|
if name in self.named_assumptions:
|
|
raise ValueError(f"an assumption with name {name} was already defined")
|
|
self.named_assumptions.add(name)
|
|
self.active_assumptions.add(name)
|
|
|
|
self._pending_blocks.append((name, path.absolute(), diversify))
|
|
|
|
def enable_assumption(self, name: str):
|
|
if name not in self.named_assumptions:
|
|
raise ValueError(f"unknown assumption {name!r}")
|
|
self.active_assumptions.add(name)
|
|
|
|
def disable_assumption(self, name: str):
|
|
if name not in self.named_assumptions:
|
|
raise ValueError(f"unknown assumption {name!r}")
|
|
self.active_assumptions.discard(name)
|
|
self._diversifier.new_step()
|
|
|
|
def _top(self) -> Awaitable[Any]:
|
|
return batch(*(self._pop() for _ in range(self._push_level)))
|
|
|
|
def _pop(self) -> Awaitable[Any]:
|
|
self._push_level -= 1
|
|
tl.log_debug(f"pop to {self._push_level}")
|
|
return self.smtbmc.pop()
|
|
|
|
def _push(self) -> Awaitable[Any]:
|
|
self._push_level += 1
|
|
tl.log_debug(f"push to {self._push_level}")
|
|
return self.smtbmc.push()
|
|
|
|
async def _search_counter_example(self, step: int) -> str:
|
|
smtbmc = self.smtbmc
|
|
pending_blocks, self._pending_blocks = self._pending_blocks, []
|
|
|
|
pending = self._top()
|
|
|
|
add_to_pending = []
|
|
|
|
for name, block_path, diversify in pending_blocks:
|
|
result = smtbmc.incremental_command(
|
|
cmd="read_yw_trace",
|
|
name="last",
|
|
path=str(block_path),
|
|
skip_x=True,
|
|
)
|
|
|
|
if diversify:
|
|
|
|
signals = StableSet(
|
|
(s, step_signal, value)
|
|
for s, step_signals in enumerate(
|
|
yw_to_dicts(json.loads(block_path.read_text()))
|
|
)
|
|
for step_signal, value in step_signals.items()
|
|
)
|
|
|
|
self._diversifier.update(signals)
|
|
|
|
async def check_yw_trace_len():
|
|
last_step = (await result).get("last_step", step)
|
|
if last_step > step:
|
|
tl.log_warning(
|
|
f"Ignoring future time steps "
|
|
f"{step + 1} to {last_step} of "
|
|
f"{relative_to(block_path, Path.cwd())}"
|
|
)
|
|
return last_step
|
|
|
|
expr = [
|
|
"not",
|
|
["and", *(["yw", "last", k] for k in range(step + 1))],
|
|
]
|
|
|
|
if name is not None:
|
|
name_id = safe_smtlib_id(f"cexenum trace {name}")
|
|
add_to_pending.append(smtbmc.smtlib(f"(declare-const {name_id} Bool)"))
|
|
expr = ["=", expr, ["smtlib", name_id, "Bool"]]
|
|
|
|
add_to_pending.append(check_yw_trace_len())
|
|
add_to_pending.append(smtbmc.assert_(expr))
|
|
|
|
active_diversifications = StableSet()
|
|
|
|
try:
|
|
while True:
|
|
diversifications = StableSet()
|
|
if App.diversify:
|
|
|
|
diversifications = StableSet(self._diversifier.diversify())
|
|
|
|
for signal_def in active_diversifications.difference(
|
|
diversifications
|
|
):
|
|
add_to_pending.append(
|
|
smtbmc.incremental_command(
|
|
cmd="update_assumptions",
|
|
key=("cexenum probe", self._probes[signal_def]),
|
|
expr=None,
|
|
)
|
|
)
|
|
|
|
new_probes = False
|
|
|
|
for signal_def in diversifications:
|
|
if signal_def in self._probes:
|
|
continue
|
|
|
|
sig_step, (*path, offset), value = signal_def
|
|
probe_result = smtbmc.incremental_command(
|
|
cmd="define",
|
|
expr=[
|
|
"=",
|
|
["yw_sig", sig_step, path, offset],
|
|
["bv", value],
|
|
],
|
|
)
|
|
|
|
async def store_probe(signal_def, probe_result):
|
|
defined = await probe_result
|
|
self._probes[signal_def] = defined["name"]
|
|
self._probe_defs[defined["name"]] = signal_def
|
|
|
|
add_to_pending.append(probe_result)
|
|
add_to_pending.append(store_probe(signal_def, probe_result))
|
|
new_probes = True
|
|
|
|
if new_probes:
|
|
await batch(pending, *add_to_pending)
|
|
pending, add_to_pending = None, []
|
|
|
|
for signal_def in diversifications.difference(
|
|
active_diversifications
|
|
):
|
|
add_to_pending.append(
|
|
smtbmc.incremental_command(
|
|
cmd="update_assumptions",
|
|
key=("cexenum probe", self._probes[signal_def]),
|
|
expr=["def", self._probes[signal_def]],
|
|
)
|
|
)
|
|
|
|
active_diversifications = diversifications
|
|
|
|
add_to_pending.append(self._push())
|
|
add_to_pending.append(smtbmc.assertions(step, False))
|
|
|
|
for name in self.active_assumptions:
|
|
name_id = safe_smtlib_id(f"cexenum trace {name}")
|
|
expr = ["smtlib", name_id, "Bool"]
|
|
if App.track_assumes:
|
|
add_to_pending.append(
|
|
smtbmc.incremental_command(
|
|
cmd="update_assumptions",
|
|
key=("cexenum trace", name),
|
|
expr=expr,
|
|
)
|
|
)
|
|
else:
|
|
add_to_pending.append(smtbmc.assert_(expr))
|
|
|
|
pending = batch(pending, *add_to_pending)
|
|
|
|
result = await batch(
|
|
pending,
|
|
smtbmc.check(),
|
|
)
|
|
pending, add_to_pending = None, []
|
|
|
|
if result == "sat" or not active_diversifications:
|
|
return result
|
|
|
|
failed_assumptions = StableSet(
|
|
map(
|
|
tuple,
|
|
await smtbmc.get_unsat_assumptions(
|
|
minimize=self._diversifier.should_minimize()
|
|
),
|
|
)
|
|
)
|
|
|
|
failed_diversifications = StableSet()
|
|
|
|
for key, expr in failed_assumptions:
|
|
if key == "cexenum probe":
|
|
failed_diversifications.add(self._probe_defs[expr])
|
|
|
|
if failed_diversifications:
|
|
self._diversifier.failed(failed_diversifications)
|
|
else:
|
|
return result
|
|
|
|
add_to_pending.append(self._top())
|
|
finally:
|
|
for signal_def in active_diversifications:
|
|
add_to_pending.append(
|
|
smtbmc.incremental_command(
|
|
cmd="update_assumptions",
|
|
key=("cexenum probe", self._probes[signal_def]),
|
|
expr=None,
|
|
)
|
|
)
|
|
|
|
await batch(pending, *add_to_pending)
|
|
|
|
|
|
class Smtbmc(tl.process.Process):
|
|
def __init__(self, smt2_model: Path):
|
|
self[tl.LogContext].scope = "smtbmc"
|
|
|
|
smtbmc_options = []
|
|
if App.track_assumes:
|
|
smtbmc_options += ["--track-assumes"]
|
|
if App.minimize_assumes:
|
|
smtbmc_options += ["--minimize-assumes"]
|
|
if App.diversify:
|
|
smtbmc_options += ["--smt2-option", ":produce-unsat-assumptions=true"]
|
|
|
|
smtbmc_options += App.smtbmc_options
|
|
super().__init__(
|
|
[
|
|
"yosys-smtbmc",
|
|
"--incremental",
|
|
"--noprogress",
|
|
*smtbmc_options,
|
|
str(smt2_model),
|
|
],
|
|
interact=True,
|
|
)
|
|
self.name = "smtbmc"
|
|
|
|
self.expected_results: list[asyncio.Future[Any]] = []
|
|
|
|
async def on_run(self) -> None:
|
|
def output_handler(event: tl.process.StdoutEvent):
|
|
line = event.output.strip()
|
|
if line.startswith("{"):
|
|
result = json.loads(event.output)
|
|
else:
|
|
result = dict(msg=line)
|
|
tl.log_debug(f"smtbmc > {result!r}")
|
|
if "err" in result:
|
|
exception = tl.logging.LoggedError(
|
|
tl.log_error(result["err"], raise_error=False)
|
|
)
|
|
self.expected_results.pop(0).set_exception(exception)
|
|
if "msg" in result:
|
|
tl.log(result["msg"])
|
|
if "ok" in result:
|
|
assert self.expected_results
|
|
self.expected_results.pop(0).set_result(result["ok"])
|
|
|
|
self.sync_handle_events(tl.process.StdoutEvent, output_handler)
|
|
|
|
return await super().on_run()
|
|
|
|
def ping(self) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="ping")
|
|
|
|
def incremental_command(self, **command: Any) -> Awaitable[Any]:
|
|
tl.log_debug(f"smtbmc < {command!r}")
|
|
self.write(json.dumps(command))
|
|
self.write("\n")
|
|
result: asyncio.Future[Any] = asyncio.Future()
|
|
self.expected_results.append(result)
|
|
|
|
return result
|
|
|
|
def new_step(self, step: int) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="new_step", step=step)
|
|
|
|
def push(self) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="push")
|
|
|
|
def pop(self) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="pop")
|
|
|
|
def check(self) -> Awaitable[str]:
|
|
return self.incremental_command(cmd="check")
|
|
|
|
def smtlib(self, command: str, response=False) -> Awaitable[str]:
|
|
return self.incremental_command(
|
|
cmd="smtlib", command=command, response=response
|
|
)
|
|
|
|
def assert_antecedent(self, expr: Any) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="assert_antecedent", expr=expr)
|
|
|
|
def assert_consequent(self, expr: Any) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="assert_consequent", expr=expr)
|
|
|
|
def assert_(self, expr: Any) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="assert", expr=expr)
|
|
|
|
def hierarchy(self, step: int) -> Awaitable[None]:
|
|
return self.assert_antecedent(["mod_h", ["step", step]])
|
|
|
|
def assert_design_assumes(self, step: int) -> Awaitable[None]:
|
|
return self.incremental_command(cmd="assert_design_assumes", step=step)
|
|
|
|
def get_unsat_assumptions(self, minimize: bool) -> Awaitable[list[Any]]:
|
|
return self.incremental_command(cmd="get_unsat_assumptions", minimize=minimize)
|
|
|
|
def assumptions(self, step: int, valid: bool = True) -> Awaitable[None]:
|
|
if valid:
|
|
return self.assert_design_assumes(step)
|
|
|
|
expr = ["mod_u", ["step", step]]
|
|
if not valid:
|
|
expr = ["not", expr]
|
|
return self.assert_consequent(expr)
|
|
|
|
def assertions(self, step: int, valid: bool = True) -> Awaitable[None]:
|
|
expr = ["mod_a", ["step", step]]
|
|
if not valid:
|
|
expr = ["not", expr]
|
|
return self.assert_(expr)
|
|
|
|
def initial(self, step: int, initial: bool) -> Awaitable[None]:
|
|
if initial:
|
|
return batch(
|
|
self.assert_antecedent(["mod_i", ["step", step]]),
|
|
self.assert_antecedent(["mod_is", ["step", step]]),
|
|
)
|
|
else:
|
|
return self.assert_antecedent(["not", ["mod_is", ["step", step]]])
|
|
|
|
def transition(self, pred: int, succ: int) -> Awaitable[None]:
|
|
return self.assert_antecedent(["mod_t", ["step", pred], ["step", succ]])
|
|
|
|
def bmc_step(
|
|
self,
|
|
step: int,
|
|
initial: bool = False,
|
|
assertions: bool = True,
|
|
pred: int | None = None,
|
|
assumptions: bool = True,
|
|
) -> Awaitable[None]:
|
|
futures: list[Awaitable[None]] = []
|
|
futures.append(self.new_step(step))
|
|
futures.append(self.hierarchy(step))
|
|
if assumptions:
|
|
futures.append(self.assumptions(step))
|
|
futures.append(self.initial(step, initial))
|
|
|
|
if pred is not None:
|
|
futures.append(self.transition(pred, step))
|
|
|
|
if assertions:
|
|
futures.append(self.assertions(step))
|
|
|
|
return batch(*futures)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|