3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-08-09 14:51:26 +00:00

Merge branch 'main' into krys/symlink

This commit is contained in:
KrystalDelusion 2025-07-09 10:01:30 +12:00 committed by GitHub
commit 1130847901
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1016 additions and 324 deletions

View file

@ -20,6 +20,7 @@ import os, re, sys, signal, platform, click
if os.name == "posix":
import resource, fcntl
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
@ -631,6 +632,8 @@ class SbyTraceSummary:
path: Optional[str] = field(default=None)
engine_case: Optional[str] = field(default=None)
events: dict = field(default_factory=lambda: defaultdict(lambda: defaultdict(list)))
trace_ids: dict[str, int] = field(default_factory=lambda: dict())
last_ext: Optional[str] = field(default=None)
@property
def kind(self):
@ -682,42 +685,62 @@ class SbySummary:
if update_status:
status_metadata = dict(source="summary_event", engine=engine.engine)
if event.step:
status_metadata["step"] = event.step
add_trace = False
if event.prop:
if event.type == "$assert":
if event.type is None:
event.type = event.prop.celltype
elif event.type == "$assert":
event.prop.status = "FAIL"
if event.path:
event.prop.tracefiles.append(event.path)
if update_status:
self.task.status_db.add_task_property_data(
event.prop,
"trace",
data=dict(path=event.path, step=event.step, **status_metadata),
)
if event.prop:
if event.type == "$cover":
add_trace = True
elif event.type == "$cover":
event.prop.status = "PASS"
if event.path:
event.prop.tracefiles.append(event.path)
if update_status:
self.task.status_db.add_task_property_data(
event.prop,
"trace",
data=dict(path=event.path, step=event.step, **status_metadata),
)
add_trace = True
trace_id = None
trace_path = None
if event.trace:
# get or create trace summary
try:
trace_summary = engine.traces[event.trace]
except KeyError:
trace_summary = SbyTraceSummary(event.trace, path=event.path, engine_case=event.engine_case)
engine.traces[event.trace] = trace_summary
if event.path:
trace_path = Path(event.path)
trace_ext = trace_path.suffix
trace_summary.last_ext = trace_ext
try:
# use existing tracefile for this extension
trace_id = trace_summary.trace_ids[trace_ext]
except KeyError:
# add tracefile to database
trace_id = self.task.status_db.add_task_trace(event.trace, event.path, trace_ext[1:], event.engine_case)
trace_summary.trace_ids[trace_ext] = trace_id
elif trace_summary.path:
# use existing tracefile for last extension
trace_path = Path(trace_summary.path)
trace_ext = trace_summary.last_ext
trace_id = trace_summary.trace_ids[trace_ext]
if event.type:
by_type = trace_summary.events[event.type]
if event.hdlname:
by_type[event.hdlname].append(event)
if event.prop and update_status:
# update property status in database
self.task.status_db.set_task_property_status(
event.prop,
data=status_metadata
trace_id=trace_id,
data=status_metadata,
)
if event.trace not in engine.traces:
engine.traces[event.trace] = SbyTraceSummary(event.trace, path=event.path, engine_case=event.engine_case)
if event.type:
by_type = engine.traces[event.trace].events[event.type]
if event.hdlname:
by_type[event.hdlname].append(event)
if trace_path and add_trace:
event.prop.tracefiles.append(str(trace_path))
def set_engine_status(self, engine_idx, status, case=None):
engine_summary = self.engine_summary(engine_idx)
@ -759,10 +782,21 @@ class SbySummary:
break
case_suffix = f" [{trace.engine_case}]" if trace.engine_case else ""
if trace.path:
if short:
yield f"{trace.kind}{case_suffix}: {self.task.workdir}/{trace.path}"
else:
yield f"{trace.kind}{case_suffix}: {trace.path}"
# print single preferred trace
preferred_exts = [".fst", ".vcd"]
if trace.last_ext not in preferred_exts: preferred_exts.append(trace.last_ext)
for ext in trace.trace_ids.keys():
if ext not in preferred_exts: preferred_exts.append(ext)
for ext in preferred_exts:
if ext not in trace.trace_ids:
continue
if short:
path = Path(self.task.workdir) / trace.path
else:
path = Path(trace.path)
yield f"{trace.kind}{case_suffix}: {path.with_suffix(ext)}"
if short:
break
else:
yield f"{trace.kind}{case_suffix}: <{trace.trace}>"
produced_traces = True
@ -785,15 +819,18 @@ class SbySummary:
break
event = same_events[0]
steps = sorted(e.step for e in same_events)
# uniquify steps and ignore events with missing steps
steps = sorted(set(e.step for e in same_events if e.step))
if short and len(steps) > step_limit:
excess = len(steps) - step_limit
steps = [str(step) for step in steps[:step_limit]]
omitted_excess = True
steps[-1] += f" and {excess} further step{'s' if excess != 1 else ''}"
steps = f"step{'s' if len(steps) > 1 else ''} {', '.join(map(str, steps))}"
yield f" {desc} {event.hdlname} at {event.src} in {steps}"
event_string = f" {desc} {hdlname} at {event.src}"
if steps:
event_string += f" step{'s' if len(steps) > 1 else ''} {', '.join(map(str, steps))}"
yield event_string
if not produced_traces:
yield f"{engine.engine} did not produce any traces"
@ -801,7 +838,7 @@ class SbySummary:
if self.unreached_covers is None and self.task.opt_mode == 'cover' and self.task.status != "PASS" and self.task.design:
self.unreached_covers = []
for prop in self.task.design.hierarchy:
if prop.type == prop.Type.COVER and prop.status == "UNKNOWN":
if prop.type == prop.Type.COVER and prop.status in ["UNKNOWN", "FAIL"]:
self.unreached_covers.append(prop)
if self.unreached_covers:
@ -825,12 +862,14 @@ class SbySummary:
class SbyTask(SbyConfig):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None, logfile=None):
def __init__(self, sbyconfig, workdir, early_logs, reusedir, taskloop=None, logfile=None, name=None, live_csv=False):
super().__init__()
self.used_options = set()
self.models = dict()
self.workdir = workdir
self.reusedir = reusedir
self.name = name
self.live_csv = live_csv
self.status = "UNKNOWN"
self.total_time = 0
self.expect = list()
@ -1027,7 +1066,10 @@ class SbyTask(SbyConfig):
if self.opt_mode in ["bmc", "prove"]:
print("chformal -live -fair -cover -remove", file=f)
if self.opt_mode == "cover":
print("chformal -live -fair -remove", file=f)
if self.opt_cover_assert:
print("chformal -live -fair -remove", file=f)
else:
print("chformal -live -fair -assert -remove", file=f)
if self.opt_mode == "live":
print("chformal -assert2assume", file=f)
print("chformal -cover -remove", file=f)
@ -1219,17 +1261,27 @@ class SbyTask(SbyConfig):
proc.terminate(timeout=timeout)
for proc in list(self.procs_pending):
proc.terminate(timeout=timeout)
if timeout:
self.update_unknown_props(dict(source="timeout"))
def proc_failed(self, proc):
# proc parameter used by autotune override
self.status = "ERROR"
self.terminate()
def update_unknown_props(self, data):
for prop in self.design.hierarchy:
if prop.status != "UNKNOWN":
continue
if ((prop.type == prop.Type.ASSERT and self.opt_mode in ["bmc", "prove"]) or
(prop.type == prop.Type.COVER and self.opt_mode == "cover")):
self.status_db.set_task_property_status(prop, data=data)
def pass_unknown_asserts(self, data):
for prop in self.design.pass_unknown_asserts():
self.status_db.set_task_property_status(prop, data=data)
def update_status(self, new_status):
def update_status(self, new_status, step = None):
assert new_status in ["PASS", "FAIL", "UNKNOWN", "ERROR"]
self.status_db.set_task_status(new_status)
@ -1243,7 +1295,10 @@ class SbyTask(SbyConfig):
assert self.status != "FAIL"
self.status = "PASS"
if self.opt_mode in ("bmc", "prove") and self.design:
self.pass_unknown_asserts(dict(source="task_status"))
data = {"source": "task_status"}
if step:
data["step"] = step
self.pass_unknown_asserts(data)
elif new_status == "FAIL":
assert self.status != "PASS"
@ -1301,6 +1356,9 @@ class SbyTask(SbyConfig):
self.handle_bool_option("skip_prep", False)
self.handle_bool_option("assume_early", True)
if self.opt_mode == "cover":
self.handle_bool_option("cover_assert", False)
def setup_status_db(self, status_path=None):
if hasattr(self, 'status_db'):
@ -1313,7 +1371,7 @@ class SbyTask(SbyConfig):
except FileNotFoundError:
status_path = f"{self.workdir}/status.sqlite"
self.status_db = SbyStatusDb(status_path, self)
self.status_db = SbyStatusDb(status_path, self, live_csv=self.live_csv)
def setup_procs(self, setupmode, linkmode=False):
self.handle_non_engine_options()