3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-08-09 06:41:26 +00:00

Unified trace generation using yosys's sim across all engines

Currently opt-in using the `fst` or `vcd_sim` options.
This commit is contained in:
Jannis Harder 2023-01-10 15:33:18 +01:00
parent 4c44a10f72
commit 6d3b5aa960
15 changed files with 686 additions and 183 deletions

View file

@ -20,6 +20,9 @@ import os, re, sys, signal, platform, click
if os.name == "posix":
import resource, fcntl
import subprocess
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
from shutil import copyfile, copytree, rmtree
from select import select
from time import monotonic, localtime, sleep, strftime
@ -100,7 +103,7 @@ class SbyProc:
dep.register_dep(self)
self.output_callback = None
self.exit_callback = None
self.exit_callbacks = []
self.error_callback = None
if self.task.timeout_reached:
@ -112,6 +115,9 @@ class SbyProc:
else:
self.notify.append(next_proc)
def register_exit_callback(self, callback):
self.exit_callbacks.append(callback)
def log(self, line):
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
if self.logfile is not None:
@ -130,8 +136,8 @@ class SbyProc:
return
if self.logfile is not None:
self.logfile.close()
if self.exit_callback is not None:
self.exit_callback(retcode)
for callback in self.exit_callbacks:
callback(retcode)
def handle_error(self, retcode):
if self.terminated:
@ -602,6 +608,199 @@ class SbyTaskloop:
for task in self.tasks:
task.exit_callback()
@dataclass
class SbySummaryEvent:
engine_idx: int
trace: Optional[str] = field(default=None)
path: Optional[str] = field(default=None)
hdlname: Optional[str] = field(default=None)
type: Optional[str] = field(default=None)
src: Optional[str] = field(default=None)
step: Optional[int] = field(default=None)
prop: Optional[SbyProperty] = field(default=None)
engine_case: Optional[str] = field(default=None)
@property
def engine(self):
return f"engine_{self.engine_idx}"
@dataclass
class SbyTraceSummary:
trace: str
path: Optional[str] = field(default=None)
engine_case: Optional[str] = field(default=None)
events: dict = field(default_factory=lambda: defaultdict(lambda: defaultdict(list)))
@property
def kind(self):
if '$assert' in self.events:
kind = 'counterexample trace'
elif '$cover' in self.events:
kind = 'cover trace'
else:
kind = 'trace'
return kind
@dataclass
class SbyEngineSummary:
engine_idx: int
traces: dict = field(default_factory=dict)
status: Optional[str] = field(default=None)
unreached_covers: Optional[list] = field(default=None)
@property
def engine(self):
return f"engine_{self.engine_idx}"
class SbySummary:
def __init__(self, task):
self.task = task
self.timing = []
self.lines = []
self.engine_summaries = {}
self.traces = defaultdict(dict)
self.engine_status = {}
self.unreached_covers = None
def append(self, line):
self.lines.append(line)
def extend(self, lines):
self.lines.extend(lines)
def engine_summary(self, engine_idx):
if engine_idx not in self.engine_summaries:
self.engine_summaries[engine_idx] = SbyEngineSummary(engine_idx)
return self.engine_summaries[engine_idx]
def add_event(self, *args, **kwargs):
event = SbySummaryEvent(*args, **kwargs)
if event.prop:
if event.type == "$assert":
event.prop.status = "FAIL"
if event.path:
event.prop.tracefiles.append(event.path)
if event.prop:
if event.type == "$cover":
event.prop.status = "PASS"
if event.path:
event.prop.tracefiles.append(event.path)
engine = self.engine_summary(event.engine_idx)
if event.trace not in engine.traces:
engine.traces[event.trace] = SbyTraceSummary(event.trace, path=event.path, engine_case=event.engine_case)
if event.type:
by_type = engine.traces[event.trace].events[event.type]
if event.hdlname:
by_type[event.hdlname].append(event)
def set_engine_status(self, engine_idx, status, case=None):
engine_summary = self.engine_summary(engine_idx)
if case is None:
self.task.log(f"{click.style(f'engine_{engine_idx}', fg='magenta')}: Status returned by engine: {status}")
self.engine_summary(engine_idx).status = status
else:
self.task.log(f"{click.style(f'engine_{engine_idx}.{case}', fg='magenta')}: Status returned by engine for {case}: {status}")
if engine_summary.status is None:
engine_summary.status = {}
engine_summary.status[case] = status
def summarize(self, short):
omitted_excess = False
for line in self.timing:
yield line
for engine_idx, engine_cmd in self.task.engine_list():
engine_cmd = ' '.join(engine_cmd)
trace_limit = 5
prop_limit = 5
step_limit = 5
engine = self.engine_summary(engine_idx)
if isinstance(engine.status, dict):
for case, status in sorted(engine.status.items()):
yield f"{engine.engine} ({engine_cmd}) returned {status} for {case}"
elif engine.status:
yield f"{engine.engine} ({engine_cmd}) returned {engine.status}"
else:
yield f"{engine.engine} ({engine_cmd}) did not return a status"
produced_traces = False
for i, (trace_name, trace) in enumerate(sorted(engine.traces.items())):
if short and i == trace_limit:
excess = len(engine.traces) - trace_limit
omitted_excess = True
yield f"and {excess} further trace{'s' if excess != 1 else ''}"
break
case_suffix = f" [{trace.engine_case}]" if trace.engine_case else ""
if trace.path:
if short:
yield f"{trace.kind}{case_suffix}: {self.task.workdir}/{trace.path}"
else:
yield f"{trace.kind}{case_suffix}: {trace.path}"
else:
yield f"{trace.kind}{case_suffix}: <{trace.trace}>"
produced_traces = True
for event_type, events in sorted(trace.events.items()):
if event_type == '$assert':
desc = "failed assertion"
short_desc = 'assertion'
elif event_type == '$cover':
desc = "reached cover statement"
short_desc = 'cover statement'
elif event_type == '$assume':
desc = "violated assumption"
short_desc = 'assumption'
else:
continue
for j, (hdlname, same_events) in enumerate(sorted(events.items())):
if short and j == prop_limit:
excess = len(events) - prop_limit
yield f" and {excess} further {short_desc}{'s' if excess != 1 else ''}"
break
event = same_events[0]
steps = sorted(e.step for e in same_events)
if short and len(steps) > step_limit:
steps = [str(step) for step in steps[:step_limit]]
excess = len(steps) - step_limit
omitted_excess = True
steps[-1] += f" and {excess} further step{'s' if excess != 1 else ''}"
steps = f"step{'s' if len(steps) > 1 else ''} {', '.join(map(str, steps))}"
yield f" {desc} {event.hdlname} at {event.src} in {steps}"
if not produced_traces:
yield f"{engine.engine} did not produce any traces"
if self.unreached_covers is None and self.task.opt_mode == 'cover' and self.task.status != "PASS" and self.task.design:
self.unreached_covers = []
for prop in self.task.design.hierarchy:
if prop.type == prop.Type.COVER and prop.status == "UNKNOWN":
self.unreached_covers.append(prop)
if self.unreached_covers:
yield f"unreached cover statements:"
for j, prop in enumerate(self.unreached_covers):
if short and j == prop_limit:
excess = len(self.unreached_covers) - prop_limit
omitted_excess = True
yield f" and {excess} further propert{'ies' if excess != 1 else 'y'}"
break
yield f" {prop.hdlname} at {prop.location}"
for line in self.lines:
yield line
if omitted_excess:
yield f"see {self.task.workdir}/{self.task.status} for a complete summary"
def __iter__(self):
yield from self.summarize(True)
class SbyTask(SbyConfig):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None, logfile=None):
@ -644,7 +843,7 @@ class SbyTask(SbyConfig):
ru = resource.getrusage(resource.RUSAGE_CHILDREN)
self.start_process_time = ru.ru_utime + ru.ru_stime
self.summary = list()
self.summary = SbySummary(self)
self.logfile = logfile or open(f"{workdir}/logfile.txt", "a")
self.log_targets = [sys.stdout, self.logfile]
@ -696,6 +895,15 @@ class SbyTask(SbyConfig):
for target in self.log_targets:
click.echo(line, file=target)
def log_prefix(self, prefix, message=None):
prefix = f"{click.style(prefix, fg='magenta')}: "
def log(message):
self.log(f"{prefix}{message}")
if message is None:
return log
else:
log(message)
def error(self, logmessage):
tm = localtime()
self.log(click.style(f"ERROR: {logmessage}", fg="red", bold=True))
@ -833,7 +1041,7 @@ class SbyTask(SbyConfig):
def instance_hierarchy_error_callback(retcode):
self.precise_prop_status = False
proc.exit_callback = instance_hierarchy_callback
proc.register_exit_callback(instance_hierarchy_callback)
proc.error_callback = instance_hierarchy_error_callback
return [proc]
@ -891,8 +1099,8 @@ class SbyTask(SbyConfig):
print("delete -output", file=f)
print("dffunmap", file=f)
print("stat", file=f)
print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
print("write_btor -s {}-i design_{m}_single.info design_{m}_single.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
print("write_btor {}-i design_{m}.info -ywmap design_btor.ywb design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
print("write_btor -s {}-i design_{m}_single.info -ywmap design_btor_single.ywb design_{m}_single.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
proc = SbyProc(
self,
@ -967,6 +1175,8 @@ class SbyTask(SbyConfig):
if new_status == "PASS":
assert self.status != "FAIL"
self.status = "PASS"
if self.opt_mode in ("bmc", "prove") and self.design:
self.design.pass_unknown_asserts()
elif new_status == "FAIL":
assert self.status != "PASS"
@ -1004,11 +1214,17 @@ class SbyTask(SbyConfig):
self.handle_int_option("timeout", None)
self.handle_bool_option("vcd", True)
self.handle_bool_option("vcd_sim", False)
self.handle_bool_option("fst", False)
self.handle_str_option("smtc", None)
self.handle_int_option("skip", None)
self.handle_str_option("tbtop", None)
if self.opt_mode != "live":
self.handle_int_option("append", 0)
self.handle_bool_option("append_assume", False)
self.handle_str_option("make_model", None)
def setup_procs(self, setupmode):
@ -1078,18 +1294,18 @@ class SbyTask(SbyConfig):
# TODO process time is incorrect when running in parallel
self.summary = [
self.summary.timing = [
"Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
"Elapsed process time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
(total_process_time // (60*60), (total_process_time // 60) % 60, total_process_time % 60, total_process_time),
] + self.summary
]
else:
self.summary = [
self.summary.timing = [
"Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
"Elapsed process time unvailable on Windows"
] + self.summary
]
for line in self.summary:
if line.startswith("Elapsed"):
@ -1110,7 +1326,7 @@ class SbyTask(SbyConfig):
def write_summary_file(self):
with open(f"{self.workdir}/{self.status}", "w") as f:
for line in self.summary:
for line in self.summary.summarize(short=False):
click.echo(line, file=f)
def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False):