3
0
Fork 0
mirror of https://github.com/Z3Prover/z3 synced 2026-03-14 09:09:58 +00:00

run black and ruff on all skill scripts

This commit is contained in:
Angelica Moreira 2026-03-11 21:53:10 +00:00
parent 9d674404c8
commit 621638abb9
7 changed files with 103 additions and 60 deletions

View file

@ -42,13 +42,19 @@ def main():
for i in range(args.runs):
run_id = db.start_run("benchmark", formula)
result = run_z3(formula, z3_bin=args.z3, timeout=args.timeout,
args=["-st"], debug=args.debug)
result = run_z3(
formula,
z3_bin=args.z3,
timeout=args.timeout,
args=["-st"],
debug=args.debug,
)
stats = parse_stats(result["stdout"])
db.log_formula(run_id, formula, result["result"], stats=stats)
db.finish_run(run_id, result["result"], result["duration_ms"],
result["exit_code"])
db.finish_run(
run_id, result["result"], result["duration_ms"], result["exit_code"]
)
timings.append(result["duration_ms"])
if args.runs == 1:

View file

@ -15,7 +15,6 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "shared"))
from z3db import Z3DB, run_z3, setup_logging
VALIDATION_TIMEOUT = 5
@ -41,7 +40,10 @@ def validate(formula: str, z3_bin: str = None, debug: bool = False) -> dict:
Returns a dict with 'valid' (bool), 'errors' (list), and 'raw' output.
"""
result = run_z3(
formula, z3_bin=z3_bin, timeout=VALIDATION_TIMEOUT, debug=debug,
formula,
z3_bin=z3_bin,
timeout=VALIDATION_TIMEOUT,
debug=debug,
)
errors = find_errors(result["stdout"]) + find_errors(result["stderr"])
@ -72,8 +74,10 @@ def report_errors(errors: list, formula: str):
def write_output(formula: str, output_path: str, fmt: str):
"""Write the validated formula to a file or stdout."""
if fmt == "python":
print("python format output is generated by the agent, "
"not by this script", file=sys.stderr)
print(
"python format output is generated by the agent, " "not by this script",
file=sys.stderr,
)
sys.exit(1)
if output_path:
@ -131,7 +135,8 @@ def main():
for err in result["errors"]:
db.log_finding(run_id, "syntax", err, severity="error")
db.finish_run(
run_id, "error",
run_id,
"error",
result["raw"]["duration_ms"],
result["raw"]["exit_code"],
)

View file

@ -21,7 +21,7 @@ def detect_type(text: str) -> str:
return "model"
if "(error" in text:
return "error"
if re.search(r':\S+\s+[\d.]+', text):
if re.search(r":\S+\s+[\d.]+", text):
return "stats"
first = text.strip().split("\n")[0].strip()
if first == "unsat":
@ -86,8 +86,9 @@ def main():
parser = argparse.ArgumentParser(prog="explain")
parser.add_argument("--file")
parser.add_argument("--stdin", action="store_true")
parser.add_argument("--type", choices=["model", "core", "stats", "error", "auto"],
default="auto")
parser.add_argument(
"--type", choices=["model", "core", "stats", "error", "auto"], default="auto"
)
parser.add_argument("--db", default=None)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()

View file

@ -42,10 +42,8 @@ def main():
model = parse_model(result["stdout"]) if result["result"] == "sat" else None
db.log_formula(run_id, formula, result["result"],
str(model) if model else None)
db.finish_run(run_id, result["result"], result["duration_ms"],
result["exit_code"])
db.log_formula(run_id, formula, result["result"], str(model) if model else None)
db.finish_run(run_id, result["result"], result["duration_ms"], result["exit_code"])
print(result["result"])
if model:

View file

@ -27,7 +27,6 @@ import time
from pathlib import Path
from typing import Optional
SCHEMA_PATH = Path(__file__).parent / "schema.sql"
DEFAULT_DB_DIR = ".z3-agent"
DEFAULT_DB_NAME = "z3agent.db"
@ -37,8 +36,11 @@ logger = logging.getLogger("z3agent")
def setup_logging(debug: bool = False):
level = logging.DEBUG if debug else logging.INFO
fmt = "[%(levelname)s] %(message)s" if not debug else \
"[%(levelname)s %(asctime)s] %(message)s"
fmt = (
"[%(levelname)s] %(message)s"
if not debug
else "[%(levelname)s %(asctime)s] %(message)s"
)
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
@ -73,8 +75,9 @@ class Z3DB:
logger.debug("started run %d (skill=%s, hash=%s)", run_id, skill, input_hash)
return run_id
def finish_run(self, run_id: int, status: str, duration_ms: int,
exit_code: int = 0):
def finish_run(
self, run_id: int, status: str, duration_ms: int, exit_code: int = 0
):
self.conn.execute(
"UPDATE runs SET status=?, duration_ms=?, exit_code=? WHERE run_id=?",
(status, duration_ms, exit_code, run_id),
@ -82,25 +85,44 @@ class Z3DB:
self.conn.commit()
logger.debug("finished run %d: %s (%dms)", run_id, status, duration_ms)
def log_formula(self, run_id: int, smtlib2: str, result: str = None,
model: str = None, stats: dict = None) -> int:
def log_formula(
self,
run_id: int,
smtlib2: str,
result: str = None,
model: str = None,
stats: dict = None,
) -> int:
cur = self.conn.execute(
"INSERT INTO formulas (run_id, smtlib2, result, model, stats) "
"VALUES (?, ?, ?, ?, ?)",
(run_id, smtlib2, result, model,
json.dumps(stats) if stats else None),
(run_id, smtlib2, result, model, json.dumps(stats) if stats else None),
)
self.conn.commit()
return cur.lastrowid
def log_finding(self, run_id: int, category: str, message: str,
severity: str = None, file: str = None,
line: int = None, details: dict = None) -> int:
def log_finding(
self,
run_id: int,
category: str,
message: str,
severity: str = None,
file: str = None,
line: int = None,
details: dict = None,
) -> int:
cur = self.conn.execute(
"INSERT INTO findings (run_id, category, severity, file, line, "
"message, details) VALUES (?, ?, ?, ?, ?, ?, ?)",
(run_id, category, severity, file, line, message,
json.dumps(details) if details else None),
(
run_id,
category,
severity,
file,
line,
message,
json.dumps(details) if details else None,
),
)
self.conn.commit()
return cur.lastrowid
@ -109,8 +131,7 @@ class Z3DB:
"""Write to stderr and to the interaction_log table."""
getattr(logger, level, logger.info)(message)
self.conn.execute(
"INSERT INTO interaction_log (run_id, level, message) "
"VALUES (?, ?, ?)",
"INSERT INTO interaction_log (run_id, level, message) " "VALUES (?, ?, ?)",
(run_id, level, message),
)
self.conn.commit()
@ -144,11 +165,11 @@ class Z3DB:
if run_id:
return self.conn.execute(
"SELECT * FROM interaction_log WHERE run_id=? "
"ORDER BY log_id DESC LIMIT ?", (run_id, last)
"ORDER BY log_id DESC LIMIT ?",
(run_id, last),
).fetchall()
return self.conn.execute(
"SELECT * FROM interaction_log ORDER BY log_id DESC LIMIT ?",
(last,)
"SELECT * FROM interaction_log ORDER BY log_id DESC LIMIT ?", (last,)
).fetchall()
def query(self, sql: str):
@ -192,8 +213,13 @@ def _find_repo_root() -> Optional[Path]:
return None
def run_z3(formula: str, z3_bin: str = None, timeout: int = 30,
args: list = None, debug: bool = False) -> dict:
def run_z3(
formula: str,
z3_bin: str = None,
timeout: int = 30,
args: list = None,
debug: bool = False,
) -> dict:
"""Pipe an SMT-LIB2 formula into z3 -in, return parsed output."""
z3_path = find_z3(z3_bin)
cmd = [z3_path, "-in"] + (args or [])
@ -204,15 +230,21 @@ def run_z3(formula: str, z3_bin: str = None, timeout: int = 30,
start = time.monotonic()
try:
proc = subprocess.run(
cmd, input=formula, capture_output=True, text=True,
cmd,
input=formula,
capture_output=True,
text=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
duration_ms = int((time.monotonic() - start) * 1000)
logger.warning("z3 timed out after %dms", duration_ms)
return {
"stdout": "", "stderr": "timeout", "exit_code": -1,
"duration_ms": duration_ms, "result": "timeout",
"stdout": "",
"stderr": "timeout",
"exit_code": -1,
"duration_ms": duration_ms,
"result": "timeout",
}
duration_ms = int((time.monotonic() - start) * 1000)
@ -237,9 +269,7 @@ def run_z3(formula: str, z3_bin: str = None, timeout: int = 30,
def parse_model(stdout: str) -> Optional[dict]:
"""Pull define-fun entries from a (get-model) response."""
model = {}
for m in re.finditer(
r'\(define-fun\s+(\S+)\s+\(\)\s+\S+\s+(.+?)\)', stdout
):
for m in re.finditer(r"\(define-fun\s+(\S+)\s+\(\)\s+\S+\s+(.+?)\)", stdout):
model[m.group(1)] = m.group(2).strip()
return model if model else None
@ -247,9 +277,9 @@ def parse_model(stdout: str) -> Optional[dict]:
def parse_stats(stdout: str) -> Optional[dict]:
"""Parse :key value pairs from z3 -st output."""
stats = {}
for m in re.finditer(r':(\S+)\s+([\d.]+)', stdout):
for m in re.finditer(r":(\S+)\s+([\d.]+)", stdout):
key, val = m.group(1), m.group(2)
stats[key] = float(val) if '.' in val else int(val)
stats[key] = float(val) if "." in val else int(val)
return stats if stats else None
@ -275,7 +305,7 @@ def cli():
sub.add_parser("init", help="initialize the database")
status_p = sub.add_parser("status", help="show run summary")
sub.add_parser("status", help="show run summary")
log_p = sub.add_parser("log", help="show interaction log")
log_p.add_argument("--run-id", type=int, help="filter by run ID")
@ -298,21 +328,27 @@ def cli():
elif args.command == "status":
s = db.get_status()
print(f"Runs: {s['total']}"
f" | success: {s.get('success', 0)}"
f" | error: {s.get('error', 0)}"
f" | timeout: {s.get('timeout', 0)}"
f" | Last: {s['last_run'] or 'never'}")
print(
f"Runs: {s['total']}"
f" | success: {s.get('success', 0)}"
f" | error: {s.get('error', 0)}"
f" | timeout: {s.get('timeout', 0)}"
f" | Last: {s['last_run'] or 'never'}"
)
elif args.command == "log":
for row in db.get_logs(args.run_id, args.last):
print(f"[{row['level']}] {row['timestamp']} "
f"(run {row['run_id']}): {row['message']}")
print(
f"[{row['level']}] {row['timestamp']} "
f"(run {row['run_id']}): {row['message']}"
)
elif args.command == "runs":
for row in db.get_runs(args.skill, args.last):
print(f"#{row['run_id']} {row['skill']} {row['status']} "
f"{row['duration_ms']}ms @ {row['timestamp']}")
print(
f"#{row['run_id']} {row['skill']} {row['status']} "
f"{row['duration_ms']}ms @ {row['timestamp']}"
)
elif args.command == "query":
for row in db.query(args.sql):

View file

@ -14,7 +14,6 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "shared"))
from z3db import Z3DB, run_z3, setup_logging
DEFAULT_TACTICS = "simplify,propagate-values,ctx-simplify"

View file

@ -44,10 +44,8 @@ def main():
model = parse_model(result["stdout"]) if result["result"] == "sat" else None
core = parse_unsat_core(result["stdout"]) if result["result"] == "unsat" else None
db.log_formula(run_id, formula, result["result"],
str(model) if model else None)
db.finish_run(run_id, result["result"], result["duration_ms"],
result["exit_code"])
db.log_formula(run_id, formula, result["result"], str(model) if model else None)
db.finish_run(run_id, result["result"], result["duration_ms"], result["exit_code"])
print(result["result"])
if model: