3
0
Fork 0
mirror of https://github.com/Z3Prover/z3 synced 2026-06-27 10:58:48 +00:00

Memorize previous conflicts

This commit is contained in:
CEisenhofer 2026-06-25 15:40:53 +02:00
parent d0c0c1b573
commit 2aa1d1ee01
3 changed files with 261 additions and 72 deletions

View file

@ -1,14 +1,21 @@
#!/usr/bin/env python3
"""
Compare z3 string solver configurations, and optionally against an external ZIPT solver.
Compare z3 string solver configurations, and optionally against external solvers.
We always run three z3 configurations:
nseq_md monadic decomposition (parikh off, eager regex factorization)
nseq_pa parikh (parikh on, no regex factorization)
seq the old/baseline string solver
Optionally compare against external solvers:
zipt ZIPT solver (--zipt /path/to/zipt)
ostrich Ostrich solver (--ostrich /path/to/ostrich)
noodler z3-noodler solver (--noodler /path/to/z3noodler)
Usage:
python compare_seq_solvers.py <path-to-smtlib-files> --z3 /path/to/z3 [--zipt /path/to/zipt] [--ext .smt2]
python compare_seq_solvers.py <path-to-smtlib-files> --z3 /path/to/z3
[--zipt /path/to/zipt] [--ostrich /path/to/ostrich] [--noodler /path/to/noodler]
[--ext .smt2]
Finds all .smt2 files under the given path, runs the solvers with a configurable timeout,
and reports timeouts, divergences, and summary statistics.
@ -106,17 +113,21 @@ def run_z3(z3_bin: str, smt_file: Path, solver_args: list[str], timeout_s: int =
return f"error:{e}", elapsed
def run_zipt(zipt_bin: str, smt_file: Path, timeout_s: int = DEFAULT_TIMEOUT) -> tuple[str, float]:
"""Run ZIPT on a file. Returns (result, elapsed)."""
def run_external_solver(binary: str, smt_file: Path, timeout_s: int = DEFAULT_TIMEOUT,
extra_args: list[str] | None = None) -> tuple[str, float]:
"""Run an external solver on a file. Returns (result, elapsed).
Passes -t:{timeout_ms} as the first argument, matching the Z3/ZIPT convention.
Supply extra_args for any solver-specific flags that precede the file path.
"""
timeout_ms = timeout_s * 1000
cmd = [zipt_bin, f"-t:{timeout_ms}", str(smt_file)]
cmd = [binary, f"-t:{timeout_ms}"] + (extra_args or []) + [str(smt_file)]
start = time.monotonic()
try:
proc = subprocess.run(cmd, capture_output=True, text=True,
timeout=timeout_s + 5)
elapsed = time.monotonic() - start
out = proc.stdout.strip()
return _parse_result(out), elapsed
return _parse_result(proc.stdout.strip()), elapsed
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start
return "timeout", elapsed
@ -151,7 +162,11 @@ def classify(solver_results: dict[str, str]) -> str:
def process_file(z3_bin: str, smt_file: Path, timeout_s: int = DEFAULT_TIMEOUT,
zipt_bin: str | None = None) -> dict:
external_bins: dict[str, str] | None = None) -> dict:
"""Run all z3 configurations and any external solvers on smt_file.
external_bins maps solver label (e.g. 'zipt', 'ostrich', 'noodler') to binary path.
"""
solver_results: dict[str, str] = {}
solver_times: dict[str, float] = {}
for name in SOLVER_NAMES:
@ -162,21 +177,21 @@ def process_file(z3_bin: str, smt_file: Path, timeout_s: int = DEFAULT_TIMEOUT,
cat = classify(solver_results)
smtlib_status = read_smtlib_status(smt_file)
status = determine_status(solver_results, smtlib_status)
result = {
"file": smt_file,
"category": cat,
"smtlib_status": smtlib_status,
"status": status,
"zipt": None,
"t_zipt": None,
result: dict = {
"file": smt_file,
"category": cat,
"smtlib_status": smtlib_status,
"status": status,
}
for name in SOLVER_NAMES:
result[name] = solver_results[name]
result[f"t_{name}"] = solver_times[name]
if zipt_bin:
res_zipt, t_zipt = run_zipt(zipt_bin, smt_file, timeout_s)
result["zipt"] = res_zipt
result["t_zipt"] = t_zipt
for label, binary in (external_bins or {}).items():
res, t = run_external_solver(binary, smt_file, timeout_s)
result[label] = res
result[f"t_{label}"] = t
return result
@ -193,14 +208,24 @@ def main():
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, metavar="SEC",
help=f"Per-solver timeout in seconds (default: {DEFAULT_TIMEOUT})")
parser.add_argument("--zipt", metavar="PATH", default=None,
help="Path to ZIPT binary (optional; if omitted, ZIPT is not benchmarked)")
help="Path to ZIPT binary (optional)")
parser.add_argument("--ostrich", metavar="PATH", default=None,
help="Path to Ostrich binary (optional)")
parser.add_argument("--noodler", metavar="PATH", default=None,
help="Path to z3-noodler binary (optional)")
parser.add_argument("--csv", metavar="FILE", help="Also write results to a CSV file")
args = parser.parse_args()
z3_bin = args.z3
zipt_bin = args.zipt
timeout_s = args.timeout
# Collect external solvers in declaration order so output columns are stable.
external_bins: dict[str, str] = {}
if args.zipt: external_bins["zipt"] = args.zipt
if args.ostrich: external_bins["ostrich"] = args.ostrich
if args.noodler: external_bins["noodler"] = args.noodler
ext_names = list(external_bins.keys())
root = Path(args.path)
if not root.exists():
print(f"Error: path does not exist: {root}", file=sys.stderr)
@ -225,14 +250,13 @@ def main():
print(f"Sampling: {len(files)} files selected "
f"(max {args.max_per_folder} per subfolder, {len(by_folder)} subfolder(s))")
solvers_label = ", ".join(SOLVER_NAMES)
if zipt_bin: solvers_label += ", zipt"
print(f"Found {len(files)} files. Solvers: {solvers_label}. "
all_solver_labels = SOLVER_NAMES + ext_names
print(f"Found {len(files)} files. Solvers: {', '.join(all_solver_labels)}. "
f"Workers: {args.jobs}, timeout: {timeout_s}s\n")
results = []
pool = ThreadPoolExecutor(max_workers=args.jobs)
futures = {pool.submit(process_file, z3_bin, f, timeout_s, zipt_bin): f for f in files}
futures = {pool.submit(process_file, z3_bin, f, timeout_s, external_bins): f for f in files}
done = 0
try:
for fut in as_completed(futures):
@ -240,10 +264,8 @@ def main():
r = fut.result()
results.append(r)
solver_part = " ".join(f"{name}={r[name]:8s} ({r[f't_{name}']:.1f}s)" for name in SOLVER_NAMES)
zipt_part = ""
if zipt_bin:
zipt_part = f" zipt={r['zipt']:8s} ({r['t_zipt']:.1f}s)"
print(f"[{done:4d}/{len(files)}] {r['category']:20s} {solver_part}{zipt_part} {r['file'].parent.name}/{r['file'].name}")
ext_part = "".join(f" {label}={r[label]:8s} ({r[f't_{label}']:.1f}s)" for label in ext_names)
print(f"[{done:4d}/{len(files)}] {r['category']:20s} {solver_part}{ext_part} {r['file'].parent.name}/{r['file'].name}")
except KeyboardInterrupt:
print("\nInterrupted — cancelling pending tasks.", file=sys.stderr)
pool.shutdown(wait=False, cancel_futures=True)
@ -283,10 +305,11 @@ def main():
timeouts = [r for r in results if r[name] == "timeout"]
if timeouts:
_print_file_list(f"{name.upper()} TIMES OUT", timeouts)
if zipt_bin:
zipt_timeouts = [r for r in results if r["zipt"] == "timeout"]
if zipt_timeouts:
_print_file_list("ZIPT TIMES OUT", zipt_timeouts)
for label in ext_names:
timeouts = [r for r in results if r[label] == "timeout"]
if timeouts:
_print_file_list(f"{label.upper()} TIMES OUT", timeouts)
all_to = categories["all_timeout"]
if all_to:
@ -294,20 +317,24 @@ def main():
invalid_models = categories.get("invalid_model", [])
if invalid_models:
_print_file_list("INVALID MODEL GENERATED", invalid_models)
if zipt_bin:
all_three_to = [r for r in results
if all(r[name] == "timeout" for name in SOLVER_NAMES) and r["zipt"] == "timeout"]
if all_three_to:
_print_file_list("ALL SOLVERS (INCL. ZIPT) TIME OUT", all_three_to)
if ext_names:
all_ext_to = [r for r in results
if all(r[name] == "timeout" for name in SOLVER_NAMES)
and all(r[label] == "timeout" for label in ext_names)]
if all_ext_to:
_print_file_list("ALL SOLVERS TIME OUT", all_ext_to)
if diverged:
_print_file_list("DIVERGE among z3 configurations (sat vs unsat)", diverged)
if zipt_bin:
definite = {"sat", "unsat"}
zipt_diverged = [r for r in results
if r["zipt"] in definite
and any(r[name] in definite and r[name] != r["zipt"] for name in SOLVER_NAMES)]
if zipt_diverged:
_print_file_list("DIVERGE involving ZIPT", zipt_diverged)
definite = {"sat", "unsat"}
for label in ext_names:
ext_diverged = [r for r in results
if r[label] in definite
and any(r[name] in definite and r[name] != r[label] for name in SOLVER_NAMES)]
if ext_diverged:
_print_file_list(f"DIVERGE involving {label.upper()}", ext_diverged)
print()
@ -331,8 +358,8 @@ def main():
csv_path = Path(args.csv)
fieldnames = ["file"] + SOLVER_NAMES + [f"t_{name}" for name in SOLVER_NAMES]
fieldnames.extend(["category", "smtlib_status", "status"])
if zipt_bin:
fieldnames.extend(["zipt", "t_zipt"])
for label in ext_names:
fieldnames.extend([label, f"t_{label}"])
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()