3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-10-26 02:34:37 +00:00
sby/sbysrc/sby_status.py
Krystine Sherwin cb81a97808
Add --taskstatus
Used for checking tasks in the status db.
Change `SBYStatusDB.all_tasks_status()` to use a `LEFT JOIN` to get a status of `UNKNOWN` for pending or aborted tasks (e.g. because they were ctrl+c'ed).
2025-07-09 10:06:56 +12:00

591 lines
20 KiB
Python

from __future__ import annotations
import sqlite3
import os
import time
import json
import click
import re
from collections import defaultdict
from functools import wraps
from pathlib import Path
from typing import Any, Callable, TypeVar, Optional, Iterable
from sby_design import SbyProperty, pretty_path
Fn = TypeVar("Fn", bound=Callable[..., Any])
SQLSCRIPT = """\
CREATE TABLE task (
id INTEGER PRIMARY KEY,
workdir TEXT,
name TEXT,
mode TEXT,
created REAL
);
CREATE TABLE task_status (
id INTEGER PRIMARY KEY,
task INTEGER,
status TEXT,
data TEXT,
created REAL,
FOREIGN KEY(task) REFERENCES task(id)
);
CREATE TABLE task_property (
id INTEGER PRIMARY KEY,
task INTEGER,
src TEXT,
name TEXT,
hdlname TEXT,
kind TEXT,
created REAL,
FOREIGN KEY(task) REFERENCES task(id)
);
CREATE TABLE task_property_status (
id INTEGER PRIMARY KEY,
task_property INTEGER,
task_trace INTEGER,
status TEXT,
data TEXT,
created REAL,
FOREIGN KEY(task_property) REFERENCES task_property(id),
FOREIGN KEY(task_trace) REFERENCES task_trace(id)
);
CREATE TABLE task_trace (
id INTEGER PRIMARY KEY,
task INTEGER,
trace TEXT,
path TEXT,
kind TEXT,
engine_case TEXT,
created REAL,
FOREIGN KEY(task) REFERENCES task(id)
);"""
def transaction(method: Fn) -> Fn:
@wraps(method)
def wrapper(self: SbyStatusDb, *args: Any, **kwargs: Any) -> Any:
if self.con.in_transaction:
return method(self, *args, **kwargs)
try:
with self.con:
self.log_debug(f"begin {method.__name__!r} transaction")
self.db.execute("begin")
result = method(self, *args, **kwargs)
except Exception as err:
self.log_debug(f"failed {method.__name__!r} transaction {err}")
if not isinstance(err, sqlite3.OperationalError):
raise
if re.match(r"table \w+ has no column named \w+", err.args[0]):
err.add_note("SBY status database can be reset with --statusreset")
raise
else:
self.log_debug(f"comitted {method.__name__!r} transaction")
return result
try:
with self.con:
self.log_debug(
f"retrying {method.__name__!r} transaction once in immediate mode"
)
self.db.execute("begin immediate")
result = method(self, *args, **kwargs)
except Exception as err:
self.log_debug(f"failed {method.__name__!r} transaction {err}")
raise
else:
self.log_debug(f"comitted {method.__name__!r} transaction")
return result
return wrapper # type: ignore
class FileInUseError(Exception):
def __init__(self, *args, file: Path|str = "file"):
super().__init__(f"Found {file}, try again later", *args)
class SbyStatusDb:
def __init__(self, path: Path, task, timeout: float = 5.0, live_csv = False):
self.debug = False
self.task = task
self.live_csv = live_csv
self.con = sqlite3.connect(path, isolation_level=None, timeout=timeout)
self.db = self.con.cursor()
self.db.row_factory = sqlite3.Row
err_count = 0
err_max = 3
while True:
try:
self.db.execute("PRAGMA journal_mode=WAL")
self.db.execute("PRAGMA synchronous=0")
self.db.execute("PRAGMA foreign_keys=ON")
except sqlite3.OperationalError as err:
if "database is locked" not in err.args[0]:
raise
err_count += 1
if err_count > err_max:
err.add_note(f"Failed to acquire lock after {err_count} attempts, aborting")
raise
backoff = err_count / 10.0
self.log_debug(f"Database locked, retrying in {backoff}s")
time.sleep(backoff)
else:
break
self._setup()
if task is not None:
self.start_time = time.time()
self.task_id = self.create_task(workdir=task.workdir, name=task.name, mode=task.opt_mode, now=self.start_time)
def log_debug(self, *args):
if self.debug:
if self.task:
self.task.log(" ".join(str(arg) for arg in args))
else:
print(*args)
@transaction
def _setup(self):
for statement in SQLSCRIPT.split(";\n"):
statement = statement.strip().replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS")
if statement:
self.db.execute(statement)
def test_schema(self) -> bool:
schema = self.db.execute("SELECT sql FROM sqlite_master;").fetchall()
schema_script = '\n'.join(str(sql[0] + ';') for sql in schema)
self._tables = re.findall(r"CREATE TABLE (\w+) \(", schema_script)
return schema_script != SQLSCRIPT
@transaction
def create_task(self, workdir: str, name: str, mode: str, now:float) -> int:
return self.db.execute(
"""
INSERT INTO task (workdir, name, mode, created)
VALUES (:workdir, :name, :mode, :now)
""",
dict(workdir=workdir, name=name, mode=mode, now=now),
).lastrowid
@transaction
def create_task_properties(
self, properties: Iterable[SbyProperty], *, task_id: Optional[int] = None
):
if task_id is None:
task_id = self.task_id
now = time.time()
self.db.executemany(
"""
INSERT INTO task_property (name, src, hdlname, task, kind, created)
VALUES (:name, :src, :hdlname, :task, :kind, :now)
""",
[
dict(
name=json.dumps(prop.path),
src=prop.location or "",
hdlname=prop.hdlname,
task=task_id,
kind=prop.kind,
now=now,
)
for prop in properties
],
)
@transaction
def set_task_status(
self,
status: Optional[str] = None,
data: Any = None,
):
if status is None:
status = property.status
now = time.time()
self.db.execute(
"""
INSERT INTO task_status (
task, status, data, created
)
VALUES (
:task, :status, :data, :now
)
""",
dict(
task=self.task_id,
status=status,
data=json.dumps(data),
now=now,
),
)
@transaction
def set_task_property_status(
self,
property: SbyProperty,
trace_id: Optional[int] = None,
data: Any = None,
):
now = time.time()
self.db.execute(
"""
INSERT INTO task_property_status (
task_property, task_trace, status, data, created
)
VALUES (
(SELECT id FROM task_property WHERE task = :task AND name = :name),
:trace_id, :status, :data, :now
)
""",
dict(
task=self.task_id,
trace_id=trace_id,
name=json.dumps(property.path),
status=property.status,
data=json.dumps(data),
now=now,
),
)
if self.live_csv:
row = self.get_status_data_joined(self.db.lastrowid)
csvline = format_status_data_csvline(row)
self.task.log(f"{click.style('csv', fg='yellow')}: {csvline}")
@transaction
def add_task_trace(
self,
trace: str,
path: str,
kind: str,
engine_case: Optional[str] = None,
task_id: Optional[int] = None,
):
if task_id is None:
task_id = self.task_id
now = time.time()
return self.db.execute(
"""
INSERT INTO task_trace (
trace, task, path, engine_case, kind, created
)
VALUES (
:trace, :task, :path, :engine_case, :kind, :now
)
""",
dict(
trace=trace,
task=task_id,
path=path,
engine_case=engine_case,
kind=kind,
now=now
)
).lastrowid
def all_tasks(self):
rows = self.db.execute(
"""
SELECT id, workdir, created FROM task
"""
).fetchall()
return {row["id"]: dict(row) for row in rows}
def all_tasks_status(self):
rows = self.db.execute(
"""
SELECT task.id, task.name, task.created,
task_status.status, task_status.created as 'status_created'
FROM task
LEFT JOIN task_status ON task_status.task=task.id
"""
).fetchall()
return {row["id"]: dict(row) for row in rows}
def all_task_properties(self):
rows = self.db.execute(
"""
SELECT id, task, src, name, created FROM task_property
"""
).fetchall()
def get_result(row):
row = dict(row)
row["name"] = tuple(json.loads(row.get("name", "[]")))
return row
return {row["id"]: get_result(row) for row in rows}
def all_task_property_statuses(self):
rows = self.db.execute(
"""
SELECT id, task_property, status, data, created
FROM task_property_status
"""
).fetchall()
def get_result(row):
row = dict(row)
row["data"] = json.loads(row.get("data", "null"))
return row
return {row["id"]: get_result(row) for row in rows}
def all_status_data(self):
return (
self.all_tasks(),
self.all_task_properties(),
self.all_task_property_statuses(),
)
@transaction
def _reset(self):
hard_reset = self.test_schema()
# table names can't be parameters, so we need to use f-strings
# but it is safe to use here because it comes from the regex "\w+"
for table in self._tables:
if hard_reset:
self.log_debug(f"dropping {table}")
self.db.execute(f"DROP TABLE {table}")
else:
self.log_debug(f"clearing {table}")
self.db.execute(f"DELETE FROM {table}")
if hard_reset:
self._setup()
def reset(self):
self.db.execute("PRAGMA foreign_keys=OFF")
self._reset()
self.db.execute("PRAGMA foreign_keys=ON")
def print_status_summary(self, latest: bool):
tasks, task_properties, task_property_statuses = self.all_status_data()
latest_task_ids = filter_latest_task_ids(tasks)
properties = defaultdict(set)
uniquify_paths = defaultdict(dict)
def add_status(task_property, status):
if latest and task_property["task"] not in latest_task_ids:
return
display_name = task_property["name"]
if display_name[-1].startswith("$"):
counters = uniquify_paths[task_property["src"]]
counter = counters.setdefault(display_name[-1], len(counters) + 1)
if task_property["src"]:
if counter < 2:
path_based = f"<unnamed at {task_property['src']}>"
else:
path_based = f"<unnamed #{counter} at {task_property['src']}>"
else:
path_based = f"<unnamed #{counter}>"
display_name = (*display_name[:-1], path_based)
properties[display_name].add(status)
for task_property in task_properties.values():
add_status(task_property, "UNKNOWN")
for status in task_property_statuses.values():
task_property = task_properties[status["task_property"]]
add_status(task_property, status["status"])
for display_name, statuses in sorted(properties.items()):
print(pretty_path(display_name), combine_statuses(statuses))
def print_task_summary(self):
tasks = self.all_tasks_status()
task_status = defaultdict(set)
for task in tasks.values():
task_status[task["name"]].add(task["status"] or "UNKNOWN")
for task_name, statuses in sorted(task_status.items()):
print(task_name, combine_statuses(statuses))
def get_status_data_joined(self, status_id: int):
row = self.db.execute(
"""
SELECT task.name as 'task_name', task.mode, task.workdir, task.created, task_property.kind,
task_property.src as 'location', task_property.name, task_property.hdlname, task_property_status.status,
task_property_status.data, task_property_status.created as 'status_created',
task_property_status.id, task_trace.path, task_trace.kind as trace_kind
FROM task
INNER JOIN task_property ON task_property.task=task.id
INNER JOIN task_property_status ON task_property_status.task_property=task_property.id
LEFT JOIN task_trace ON task_property_status.task_trace=task_trace.id
WHERE task_property_status.id=:status_id;
""",
dict(status_id=status_id)
).fetchone()
return parse_status_data_row(row)
def all_status_data_joined(self):
rows = self.db.execute(
"""
SELECT task.id as 'task_id', task.name as 'task_name', task.mode, task.workdir, task.created, task_property.kind,
task_property.src as 'location', task_property.name, task_property.hdlname, task_property_status.status,
task_property_status.data, task_property_status.created as 'status_created',
task_property_status.id, task_trace.path, task_trace.kind as trace_kind
FROM task
INNER JOIN task_property ON task_property.task=task.id
INNER JOIN task_property_status ON task_property_status.task_property=task_property.id
LEFT JOIN task_trace ON task_property_status.task_trace=task_trace.id;
"""
).fetchall()
return {row["id"]: parse_status_data_row(row) for row in rows}
def print_status_summary_csv(self, tasknames: list[str], latest: bool):
# get all statuses
all_properties = self.all_status_data_joined()
latest_task_ids = filter_latest_task_ids(self.all_tasks())
# print csv header
csvheader = format_status_data_csvline(None)
print(csvheader)
# find summary for each task/property combo
prop_map: dict[(str, str), dict[str, (int, int)]] = {}
for row, prop_status in all_properties.items():
if tasknames and prop_status['task_name'] not in tasknames:
continue
if latest and prop_status['task_id'] not in latest_task_ids:
continue
status = prop_status['status']
this_depth = prop_status['data'].get('step')
this_kind = prop_status['trace_kind']
key = (prop_status['task_name'], prop_status['hdlname'])
try:
prop_status_map = prop_map[key]
except KeyError:
prop_map[key] = prop_status_map = {}
current_depth, _ = prop_status_map.get(status, (None, None))
update_map = False
if current_depth is None:
update_map = True
elif this_depth is not None:
if status == 'FAIL' and this_depth < current_depth:
# earliest fail
update_map = True
elif status != 'FAIL' and this_depth > current_depth:
# latest non-FAIL
update_map = True
elif this_depth == current_depth and this_kind in ['fst', 'vcd']:
# prefer traces over witness files
update_map = True
if update_map:
prop_status_map[status] = (this_depth, row)
for prop in prop_map.values():
# ignore UNKNOWNs if there are other statuses
if len(prop) > 1 and "UNKNOWN" in prop:
del prop["UNKNOWN"]
for _, row in prop.values():
csvline = format_status_data_csvline(all_properties[row])
print(csvline)
def combine_statuses(statuses):
statuses = set(statuses)
if len(statuses) > 1:
statuses.discard("UNKNOWN")
return ",".join(sorted(statuses))
def parse_status_data_row(raw: sqlite3.Row):
row_dict = dict(raw)
row_dict["name"] = json.loads(row_dict.get("name", "null"))
row_dict["data"] = json.loads(row_dict.get("data") or "{}")
return row_dict
def format_status_data_csvline(row: dict|None) -> str:
if row is None:
csv_header = [
"time",
"task_name",
"mode",
"engine",
"name",
"location",
"kind",
"status",
"trace",
"depth",
]
return ','.join(csv_header)
else:
engine = row['data'].get('engine', row['data'].get('source'))
try:
time = row['status_created'] - row['created']
except TypeError:
time = 0
name = row['hdlname']
depth = row['data'].get('step')
try:
trace_path = Path(row['workdir']) / row['path']
except TypeError:
trace_path = None
csv_line = [
round(time, 2),
row['task_name'],
row['mode'],
engine,
name or pretty_path(row['name']),
row['location'],
row['kind'],
row['status'] or "UNKNOWN",
trace_path,
depth,
]
return ','.join("" if v is None else str(v) for v in csv_line)
def filter_latest_task_ids(all_tasks: dict[int, dict[str]]):
latest: dict[str, int] = {}
for task_id, task_dict in all_tasks.items():
latest[task_dict["workdir"]] = task_id
return list(latest.values())
def remove_db(path):
path = Path(path)
lock_exts = [".sqlite-wal", ".sqlite-shm"]
maybe_locked = False
for lock_file in [path.with_suffix(ext) for ext in lock_exts]:
if lock_file.exists():
# lock file may be a false positive if it wasn't cleaned up
maybe_locked = True
break
if not maybe_locked:
# safe to delete
os.remove(path)
return
# test database directly
with sqlite3.connect(path, isolation_level="EXCLUSIVE", timeout=1) as con:
cur = con.cursor()
# single result rows
cur.row_factory = lambda _, r: r[0]
# changing journal_mode is disallowed if there are multiple connections
try:
cur.execute("PRAGMA journal_mode=DELETE")
except sqlite3.OperationalError as err:
if "database is locked" in err.args[0]:
raise FileInUseError(file=path)
else:
raise
# no other connections, delete all tables
drop_script = cur.execute("SELECT name FROM sqlite_master WHERE type = 'table';").fetchall()
for table in drop_script:
print(table)
cur.execute(f"DROP TABLE {table}")