3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-05 06:04:06 +00:00

Turn .format() strings into f-strings

This commit is contained in:
piegames 2021-06-25 00:02:22 +02:00
parent 1f6700f21d
commit 2d7d48885b
10 changed files with 219 additions and 172 deletions

View file

@ -24,7 +24,7 @@ from time import localtime
class DictAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
assert isinstance(getattr(namespace, self.dest), dict), "Use ArgumentParser.set_defaults() to initialize {} to dict()".format(self.dest)
assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()"
name = option_string.lstrip(parser.prefix_chars).replace("-", "_")
getattr(namespace, self.dest)[name] = values
@ -124,21 +124,21 @@ elif init_config_file is not None:
sv_file = init_config_file + ".sv"
sby_file = init_config_file + ".sby"
with open(sby_file, 'w') as config:
config.write("""[options]
config.write(f"""[options]
mode bmc
[engines]
smtbmc
[script]
read -formal {0}
read -formal {sv_file}
prep -top top
[files]
{0}
""".format(sv_file))
{sv_file}
""")
print("sby config written to {}".format(sby_file), file=sys.stderr)
print(f"sby config written to {sby_file}", file=sys.stderr)
sys.exit(0)
early_logmsgs = list()
@ -220,7 +220,7 @@ def read_sbyconfig(sbydata, taskname):
if len(task_tags_all) and not found_task_tag:
tokens = line.split()
if len(tokens) > 0 and tokens[0][0] == line[0] and tokens[0].endswith(":"):
print("ERROR: Invalid task specifier \"{}\".".format(tokens[0]), file=sys.stderr)
print(f"ERROR: Invalid task specifier \"{tokens[0]}\".", file=sys.stderr)
sys.exit(1)
if task_skip_line or task_skip_block:
@ -263,7 +263,7 @@ def read_sbyconfig(sbydata, taskname):
handle_line(line)
if taskname is not None and not task_matched:
print("ERROR: Task name '{}' didn't match any lines in [tasks].".format(taskname), file=sys.stderr)
print(f"ERROR: Task name '{taskname}' didn't match any lines in [tasks].", file=sys.stderr)
sys.exit(1)
return cfgdata, tasklist
@ -343,14 +343,14 @@ def run_job(taskname):
shutil.move(my_workdir, "{}.bak{:03d}".format(my_workdir, backup_idx))
if opt_force and not reusedir:
early_log(my_workdir, "Removing directory '{}'.".format(os.path.abspath(my_workdir)))
early_log(my_workdir, f"Removing directory '{os.path.abspath(my_workdir)}'.")
if sbyfile:
shutil.rmtree(my_workdir, ignore_errors=True)
if reusedir:
pass
elif os.path.isdir(my_workdir):
print("ERROR: Directory '{}' already exists.".format(my_workdir))
print(f"ERROR: Directory '{my_workdir}' already exists.")
sys.exit(1)
else:
os.makedirs(my_workdir)
@ -388,13 +388,13 @@ def run_job(taskname):
pass
if my_opt_tmpdir:
job.log("Removing directory '{}'.".format(my_workdir))
job.log(f"Removing directory '{my_workdir}'.")
shutil.rmtree(my_workdir, ignore_errors=True)
if setupmode:
job.log("SETUP COMPLETE (rc={})".format(job.retcode))
job.log(f"SETUP COMPLETE (rc={job.retcode})")
else:
job.log("DONE ({}, rc={})".format(job.status, job.retcode))
job.log(f"DONE ({job.status}, rc={job.retcode})")
job.logfile.close()
if not my_opt_tmpdir and not setupmode:
@ -402,23 +402,23 @@ def run_job(taskname):
junit_errors = 1 if job.retcode == 16 else 0
junit_failures = 1 if job.retcode != 0 and junit_errors == 0 else 0
print('<?xml version="1.0" encoding="UTF-8"?>', file=f)
print('<testsuites disabled="0" errors="{}" failures="{}" tests="1" time="{}">'.format(junit_errors, junit_failures, job.total_time), file=f)
print('<testsuite disabled="0" errors="{}" failures="{}" name="{}" skipped="0" tests="1" time="{}">'.format(junit_errors, junit_failures, junit_ts_name, job.total_time), file=f)
print(f'<testsuites disabled="0" errors="{junit_errors}" failures="{junit_failures}" tests="1" time="{job.total_time}">', file=f)
print(f'<testsuite disabled="0" errors="{junit_errors}" failures="{junit_failures}" name="{junit_ts_name}" skipped="0" tests="1" time="{job.total_time}">', file=f)
print('<properties>', file=f)
print('<property name="os" value="{}"/>'.format(os.name), file=f)
print(f'<property name="os" value="{os.name}"/>', file=f)
print('</properties>', file=f)
print('<testcase classname="{}" name="{}" status="{}" time="{}">'.format(junit_ts_name, junit_tc_name, job.status, job.total_time), file=f)
print(f'<testcase classname="{junit_ts_name}" name="{junit_tc_name}" status="{job.status}" time="{job.total_time}">', file=f)
if junit_errors:
print('<error message="{}" type="{}"/>'.format(job.status, job.status), file=f)
print(f'<error message="{job.status}" type="{job.status}"/>', file=f)
if junit_failures:
print('<failure message="{}" type="{}"/>'.format(job.status, job.status), file=f)
print(f'<failure message="{job.status}" type="{job.status}"/>', file=f)
print('<system-out>', end="", file=f)
with open("{}/logfile.txt".format(job.workdir), "r") as logf:
with open(f"{job.workdir}/logfile.txt", "r") as logf:
for line in logf:
print(line.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace("\"", "&quot;"), end="", file=f)
print('</system-out></testcase></testsuite></testsuites>', file=f)
with open("{}/status".format(job.workdir), "w") as f:
print("{} {} {}".format(job.status, job.retcode, job.total_time), file=f)
with open(f"{job.workdir}/status", "w") as f:
print(f"{job.status} {job.retcode} {job.total_time}", file=f)
return job.retcode

View file

@ -97,7 +97,7 @@ class SbyTask:
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
if self.logfile is not None:
print(line, file=self.logfile)
self.job.log("{}: {}".format(self.info, line))
self.job.log(f"{self.info}: {line}")
def handle_output(self, line):
if self.terminated or len(line) == 0:
@ -119,7 +119,7 @@ class SbyTask:
return
if self.running:
if not self.silent:
self.job.log("{}: terminating process".format(self.info))
self.job.log(f"{self.info}: terminating process")
if os.name == "posix":
try:
os.killpg(self.p.pid, signal.SIGTERM)
@ -140,7 +140,7 @@ class SbyTask:
return
if not self.silent:
self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline))
self.job.log(f"{self.info}: starting process \"{self.cmdline}\"")
if os.name == "posix":
def preexec_fn():
@ -175,7 +175,7 @@ class SbyTask:
if self.p.poll() is not None:
if not self.silent:
self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
self.job.log(f"{self.info}: finished (returncode={self.p.returncode})")
self.job.tasks_running.remove(self)
all_tasks_running.remove(self)
self.running = False
@ -183,7 +183,7 @@ class SbyTask:
if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
self.job.log(f"{self.info}: COMMAND NOT FOUND. ERROR.")
self.terminated = True
self.job.terminate()
return
@ -193,7 +193,7 @@ class SbyTask:
if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: job failed. ERROR.".format(self.info))
self.job.log(f"{self.info}: job failed. ERROR.")
self.terminated = True
self.job.terminate()
return
@ -246,13 +246,13 @@ class SbyJob:
self.summary = list()
self.logfile = open("{}/logfile.txt".format(workdir), "a")
self.logfile = open(f"{workdir}/logfile.txt", "a")
for line in early_logs:
print(line, file=self.logfile, flush=True)
if not reusedir:
with open("{}/config.sby".format(workdir), "w") as f:
with open(f"{workdir}/config.sby", "w") as f:
for line in sbyconfig:
print(line, file=f)
@ -283,7 +283,7 @@ class SbyJob:
if self.opt_timeout is not None:
total_clock_time = int(time() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all tasks.")
self.status = "TIMEOUT"
self.terminate(timeout=True)
@ -300,8 +300,8 @@ class SbyJob:
if "ERROR" not in self.expect:
self.retcode = 16
self.terminate()
with open("{}/{}".format(self.workdir, self.status), "w") as f:
print("ERROR: {}".format(logmessage), file=f)
with open(f"{self.workdir}/{self.status}", "w") as f:
print(f"ERROR: {logmessage}", file=f)
raise SbyAbort(logmessage)
def makedirs(self, path):
@ -314,7 +314,7 @@ class SbyJob:
for dstfile, lines in self.verbatim_files.items():
dstfile = self.workdir + "/src/" + dstfile
self.log("Writing '{}'.".format(dstfile))
self.log(f"Writing '{dstfile}'.")
with open(dstfile, "w") as f:
for line in lines:
@ -322,7 +322,7 @@ class SbyJob:
for dstfile, srcfile in self.files.items():
if dstfile.startswith("/") or dstfile.startswith("../") or ("/../" in dstfile):
self.error("destination filename must be a relative path without /../: {}".format(dstfile))
self.error(f"destination filename must be a relative path without /../: {dstfile}")
dstfile = self.workdir + "/src/" + dstfile
srcfile = process_filename(srcfile)
@ -331,7 +331,7 @@ class SbyJob:
if basedir != "" and not os.path.exists(basedir):
os.makedirs(basedir)
self.log("Copy '{}' to '{}'.".format(os.path.abspath(srcfile), os.path.abspath(dstfile)))
self.log(f"Copy '{os.path.abspath(srcfile)}' to '{os.path.abspath(dstfile)}'.")
copyfile(srcfile, dstfile)
def handle_str_option(self, option_name, default_value):
@ -351,19 +351,19 @@ class SbyJob:
def handle_bool_option(self, option_name, default_value):
if option_name in self.options:
if self.options[option_name] not in ["on", "off"]:
self.error("Invalid value '{}' for boolean option {}.".format(self.options[option_name], option_name))
self.error(f"Invalid value '{self.options[option_name]}' for boolean option {option_name}.")
self.__dict__["opt_" + option_name] = self.options[option_name] == "on"
self.used_options.add(option_name)
else:
self.__dict__["opt_" + option_name] = default_value
def make_model(self, model_name):
if not os.path.isdir("{}/model".format(self.workdir)):
os.makedirs("{}/model".format(self.workdir))
if not os.path.isdir(f"{self.workdir}/model"):
os.makedirs(f"{self.workdir}/model")
if model_name in ["base", "nomem"]:
with open("{}/model/design{}.ys".format(self.workdir, "" if model_name == "base" else "_nomem"), "w") as f:
print("# running in {}/src/".format(self.workdir), file=f)
with open(f"""{self.workdir}/model/design{"" if model_name == "base" else "_nomem"}.ys""", "w") as f:
print(f"# running in {self.workdir}/src/", file=f)
for cmd in self.script:
print(cmd, file=f)
if model_name == "base":
@ -387,19 +387,23 @@ class SbyJob:
print("opt -keepdc -fast", file=f)
print("check", file=f)
print("hierarchy -simcheck", file=f)
print("write_ilang ../model/design{}.il".format("" if model_name == "base" else "_nomem"), file=f)
print(f"""write_ilang ../model/design{"" if model_name == "base" else "_nomem"}.il""", file=f)
task = SbyTask(self, model_name, [],
"cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"],
s="" if model_name == "base" else "_nomem"))
task = SbyTask(
self,
model_name,
[],
"cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"],
s="" if model_name == "base" else "_nomem")
)
task.checkretcode = True
return [task]
if re.match(r"^smt2(_syn)?(_nomem)?(_stbv|_stdt)?$", model_name):
with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f:
print("# running in {}/model/".format(self.workdir), file=f)
print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f)
with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
print(f"# running in {self.workdir}/model/", file=f)
print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f)
if "_syn" in model_name:
print("techmap", file=f)
print("opt -fast", file=f)
@ -408,22 +412,26 @@ class SbyJob:
print("dffunmap", file=f)
print("stat", file=f)
if "_stbv" in model_name:
print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f)
print(f"write_smt2 -stbv -wires design_{model_name}.smt2", file=f)
elif "_stdt" in model_name:
print("write_smt2 -stdt -wires design_{}.smt2".format(model_name), file=f)
print(f"write_smt2 -stdt -wires design_{model_name}.smt2", file=f)
else:
print("write_smt2 -wires design_{}.smt2".format(model_name), file=f)
print(f"write_smt2 -wires design_{model_name}.smt2", file=f)
task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"),
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name))
task = SbyTask(
self,
model_name,
self.model("nomem" if "_nomem" in model_name else "base"),
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
)
task.checkretcode = True
return [task]
if re.match(r"^btor(_syn)?(_nomem)?$", model_name):
with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f:
print("# running in {}/model/".format(self.workdir), file=f)
print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f)
with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
print(f"# running in {self.workdir}/model/", file=f)
print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f)
print("flatten", file=f)
print("setundef -undriven -anyseq", file=f)
if "_syn" in model_name:
@ -439,15 +447,19 @@ class SbyJob:
print("stat", file=f)
print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"),
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name))
task = SbyTask(
self,
model_name,
self.model("nomem" if "_nomem" in model_name else "base"),
"cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
)
task.checkretcode = True
return [task]
if model_name == "aig":
with open("{}/model/design_aiger.ys".format(self.workdir), "w") as f:
print("# running in {}/model/".format(self.workdir), file=f)
with open(f"{self.workdir}/model/design_aiger.ys", "w") as f:
print(f"# running in {self.workdir}/model/", file=f)
print("read_ilang design_nomem.il", file=f)
print("flatten", file=f)
print("setundef -undriven -anyseq", file=f)
@ -462,8 +474,12 @@ class SbyJob:
print("stat", file=f)
print("write_aiger -I -B -zinit -map design_aiger.aim design_aiger.aig", file=f)
task = SbyTask(self, "aig", self.model("nomem"),
"cd {}/model; {} -ql design_aiger.log design_aiger.ys".format(self.workdir, self.exe_paths["yosys"]))
task = SbyTask(
self,
"aig",
self.model("nomem"),
f"""cd {self.workdir}/model; {self.exe_paths["yosys"]} -ql design_aiger.log design_aiger.ys"""
)
task.checkretcode = True
return [task]
@ -506,7 +522,7 @@ class SbyJob:
mode = None
key = None
with open("{}/config.sby".format(self.workdir), "r") as f:
with open(f"{self.workdir}/config.sby", "r") as f:
for line in f:
raw_line = line
if mode in ["options", "engines", "files"]:
@ -522,48 +538,48 @@ class SbyJob:
if match:
entries = match.group(1).split()
if len(entries) == 0:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
if entries[0] == "options":
mode = "options"
if len(self.options) != 0 or len(entries) != 1:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "engines":
mode = "engines"
if len(self.engines) != 0 or len(entries) != 1:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "script":
mode = "script"
if len(self.script) != 0 or len(entries) != 1:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "file":
mode = "file"
if len(entries) != 2:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
current_verbatim_file = entries[1]
if current_verbatim_file in self.verbatim_files:
self.error("duplicate file: {}".format(entries[1]))
self.error(f"duplicate file: {entries[1]}")
self.verbatim_files[current_verbatim_file] = list()
continue
if entries[0] == "files":
mode = "files"
if len(entries) != 1:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
continue
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
if mode == "options":
entries = line.split()
if len(entries) != 2:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
self.options[entries[0]] = entries[1]
continue
@ -583,19 +599,19 @@ class SbyJob:
elif len(entries) == 2:
self.files[entries[0]] = entries[1]
else:
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
continue
if mode == "file":
self.verbatim_files[current_verbatim_file].append(raw_line)
continue
self.error("sby file syntax error: {}".format(line))
self.error(f"sby file syntax error: {line}")
self.handle_str_option("mode", None)
if self.opt_mode not in ["bmc", "prove", "cover", "live"]:
self.error("Invalid mode: {}".format(self.opt_mode))
self.error(f"Invalid mode: {self.opt_mode}")
self.expect = ["PASS"]
if "expect" in self.options:
@ -604,7 +620,7 @@ class SbyJob:
for s in self.expect:
if s not in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]:
self.error("Invalid expect value: {}".format(s))
self.error(f"Invalid expect value: {s}")
self.handle_bool_option("multiclock", False)
self.handle_bool_option("wait", False)
@ -631,7 +647,7 @@ class SbyJob:
self.error("Config file is lacking engine configuration.")
if self.reusedir:
rmtree("{}/model".format(self.workdir), ignore_errors=True)
rmtree(f"{self.workdir}/model", ignore_errors=True)
else:
self.copy_src()
@ -660,7 +676,7 @@ class SbyJob:
for opt in self.options.keys():
if opt not in self.used_options:
self.error("Unused option: {}".format(opt))
self.error(f"Unused option: {opt}")
self.taskloop()
@ -685,7 +701,7 @@ class SbyJob:
] + self.summary
for line in self.summary:
self.log("summary: {}".format(line))
self.log(f"summary: {line}")
assert self.status in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]
@ -698,6 +714,6 @@ class SbyJob:
if self.status == "TIMEOUT": self.retcode = 8
if self.status == "ERROR": self.retcode = 16
with open("{}/{}".format(self.workdir, self.status), "w") as f:
with open(f"{self.workdir}/{self.status}", "w") as f:
for line in self.summary:
print(line, file=f)

View file

@ -31,24 +31,27 @@ def run(mode, job, engine_idx, engine):
if abc_command[0] == "bmc3":
if mode != "bmc":
job.error("ABC command 'bmc3' is only valid in bmc mode.")
abc_command[0] += " -F {} -v".format(job.opt_depth)
abc_command[0] += f" -F {job.opt_depth} -v"
elif abc_command[0] == "sim3":
if mode != "bmc":
job.error("ABC command 'sim3' is only valid in bmc mode.")
abc_command[0] += " -F {} -v".format(job.opt_depth)
abc_command[0] += f" -F {job.opt_depth} -v"
elif abc_command[0] == "pdr":
if mode != "prove":
job.error("ABC command 'pdr' is only valid in prove mode.")
else:
job.error("Invalid ABC command {}.".format(abc_command[0]))
job.error(f"Invalid ABC command {abc_command[0]}.")
task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"),
("cd {}; {} -c 'read_aiger model/design_aiger.aig; fold; strash; {}; write_cex -a engine_{}/trace.aiw'").format
(job.workdir, job.exe_paths["abc"], " ".join(abc_command), engine_idx),
logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
task = SbyTask(
job,
f"engine_{engine_idx}",
job.model("aig"),
f"""cd {job.workdir}; {job.exe_paths["abc"]} -c 'read_aiger model/design_aiger.aig; fold; strash; {" ".join(abc_command)}; write_cex -a engine_{engine_idx}/trace.aiw'""",
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
)
task.noprintregex = re.compile(r"^\.+$")
task_status = None
@ -78,19 +81,23 @@ def run(mode, job, engine_idx, engine):
assert task_status is not None
job.update_status(task_status)
job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
job.terminate()
if task_status == "FAIL" and job.opt_aigsmt != "none":
task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
task2 = SbyTask(
job,
f"engine_{engine_idx}",
job.model("smt2"),
("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw --aig-noheader model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
"" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
"" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
job.opt_append, i=engine_idx),
logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
)
task2_status = None
@ -109,8 +116,8 @@ def run(mode, job, engine_idx, engine):
assert task2_status is not None
assert task2_status == "FAIL"
if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
task2.output_callback = output_callback2
task2.exit_callback = exit_callback2

View file

@ -40,16 +40,20 @@ def run(mode, job, engine_idx, engine):
solver_cmd = " ".join([job.exe_paths["aigbmc"]] + solver_args[1:])
else:
job.error("Invalid solver command {}.".format(solver_args[0]))
job.error(f"Invalid solver command {solver_args[0]}.")
task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"),
"cd {}; {} model/design_aiger.aig".format(job.workdir, solver_cmd),
logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
task = SbyTask(
job,
f"engine_{engine_idx}",
job.model("aig"),
f"cd {job.workdir}; {solver_cmd} model/design_aiger.aig",
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
)
task_status = None
produced_cex = False
end_of_cex = False
aiw_file = open("{}/engine_{}/trace.aiw".format(job.workdir, engine_idx), "w")
aiw_file = open(f"{job.workdir}/engine_{engine_idx}/trace.aiw", "w")
def output_callback(line):
nonlocal task_status
@ -66,7 +70,7 @@ def run(mode, job, engine_idx, engine):
return None
if line.startswith("u"):
return "No CEX up to depth {}.".format(int(line[1:])-1)
return f"No CEX up to depth {int(line[1:])-1}."
if line in ["0", "1", "2"]:
print(line, file=aiw_file)
@ -84,29 +88,37 @@ def run(mode, job, engine_idx, engine):
aiw_file.close()
job.update_status(task_status)
job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
job.terminate()
if task_status == "FAIL" and job.opt_aigsmt != "none":
if produced_cex:
if mode == "live":
task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
task2 = SbyTask(
job,
f"engine_{engine_idx}",
job.model("smt2"),
("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
"" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
"" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
i=engine_idx),
logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
)
else:
task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
task2 = SbyTask(
job,
f"engine_{engine_idx}",
job.model("smt2"),
("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
"" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
"" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
job.opt_append, i=engine_idx),
logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
)
task2_status = None
@ -128,14 +140,14 @@ def run(mode, job, engine_idx, engine):
else:
assert task2_status == "FAIL"
if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
task2.output_callback = output_callback2
task2.exit_callback = exit_callback2
else:
job.log("engine_{}: Engine did not produce a counter example.".format(engine_idx))
job.log(f"engine_{engine_idx}: Engine did not produce a counter example.")
task.output_callback = output_callback
task.exit_callback = exit_callback

View file

@ -37,19 +37,19 @@ def run(mode, job, engine_idx, engine):
if solver_args[0] == "btormc":
solver_cmd = ""
if random_seed:
solver_cmd += "BTORSEED={} ".format(random_seed)
solver_cmd += job.exe_paths["btormc"] + " --stop-first {} -v 1 -kmax {}".format(0 if mode == "cover" else 1, job.opt_depth - 1)
solver_cmd += f"BTORSEED={random_seed} "
solver_cmd += job.exe_paths["btormc"] + f""" --stop-first {0 if mode == "cover" else 1} -v 1 -kmax {job.opt_depth - 1}"""
if job.opt_skip is not None:
solver_cmd += " -kmin {}".format(job.opt_skip)
solver_cmd += f" -kmin {job.opt_skip}"
solver_cmd += " ".join([""] + solver_args[1:])
elif solver_args[0] == "pono":
if random_seed:
job.error("Setting the random seed is not available for the pono solver.")
solver_cmd = job.exe_paths["pono"] + " -v 1 -e bmc -k {}".format(job.opt_depth - 1)
solver_cmd = job.exe_paths["pono"] + f" -v 1 -e bmc -k {job.opt_depth - 1}"
else:
job.error("Invalid solver command {}.".format(solver_args[0]))
job.error(f"Invalid solver command {solver_args[0]}.")
common_state = SimpleNamespace()
common_state.solver_status = None
@ -72,7 +72,7 @@ def run(mode, job, engine_idx, engine):
elif common_state.solver_status == "unsat":
task_status = "FAIL"
else:
job.error("engine_{}: Engine terminated without status.".format(engine_idx))
job.error(f"engine_{engine_idx}: Engine terminated without status.")
else:
if common_state.expected_cex == 0:
task_status = "pass"
@ -81,20 +81,20 @@ def run(mode, job, engine_idx, engine):
elif common_state.solver_status == "unsat":
task_status = "pass"
else:
job.error("engine_{}: Engine terminated without status.".format(engine_idx))
job.error(f"engine_{engine_idx}: Engine terminated without status.")
job.update_status(task_status.upper())
job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
if len(common_state.produced_traces) == 0:
job.log("engine_{}: Engine did not produce a{}example.".format(engine_idx, " counter" if mode != "cover" else "n "))
job.log(f"""engine_{engine_idx}: Engine did not produce a{" counter" if mode != "cover" else "n "}example.""")
elif len(common_state.produced_traces) <= common_state.print_traces_max:
job.summary.extend(common_state.produced_traces)
else:
job.summary.extend(common_state.produced_traces[:common_state.print_traces_max])
excess_traces = len(common_state.produced_traces) - common_state.print_traces_max
job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else ""))
job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""")
job.terminate()
@ -112,9 +112,9 @@ def run(mode, job, engine_idx, engine):
def exit_callback2(retcode):
assert retcode == 0
vcdpath = "{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, suffix)
vcdpath = f"{job.workdir}/engine_{engine_idx}/trace{suffix}.vcd"
if os.path.exists(vcdpath):
common_state.produced_traces.append("{}trace: {}".format("" if mode == "cover" else "counterexample ", vcdpath))
common_state.produced_traces.append(f"""{"" if mode == "cover" else "counterexample "}trace: {vcdpath}""")
common_state.running_tasks -= 1
if (common_state.running_tasks == 0):
@ -131,14 +131,14 @@ def run(mode, job, engine_idx, engine):
assert common_state.produced_cex == 0
else:
job.error("engine_{}: BTOR solver '{}' is currently not supported in cover mode.".format(engine_idx, solver_args[0]))
job.error(f"engine_{engine_idx}: BTOR solver '{solver_args[0]}' is currently not supported in cover mode.")
if (common_state.produced_cex < common_state.expected_cex) and line == "sat":
assert common_state.wit_file == None
if common_state.expected_cex == 1:
common_state.wit_file = open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w")
common_state.wit_file = open(f"{job.workdir}/engine_{engine_idx}/trace.wit", "w")
else:
common_state.wit_file = open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, common_state.produced_cex), "w")
common_state.wit_file = open(f"""{job.workdir}/engine_{engine_idx}/trace{common_state.produced_cex}.wit""", "w")
if solver_args[0] != "btormc":
task.log("Found satisfiability witness.")
@ -149,9 +149,13 @@ def run(mode, job, engine_idx, engine):
suffix = ""
else:
suffix = common_state.produced_cex
task2 = SbyTask(job, "engine_{}_{}".format(engine_idx, common_state.produced_cex), job.model("btor"),
"cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix),
logfile=open("{dir}/engine_{idx}/logfile2.txt".format(dir=job.workdir, idx=engine_idx), "w"))
task2 = SbyTask(
job,
f"engine_{engine_idx}_{common_state.produced_cex}",
job.model("btor"),
"cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix),
logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
)
task2.output_callback = output_callback2
task2.exit_callback = make_exit_callback(suffix)
task2.checkretcode = True
@ -198,20 +202,23 @@ def run(mode, job, engine_idx, engine):
if common_state.solver_status == "unsat":
if common_state.expected_cex == 1:
with open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w") as wit_file:
with open(f"""{job.workdir}/engine_{engine_idx}/trace.wit""", "w") as wit_file:
print("unsat", file=wit_file)
else:
for i in range(common_state.produced_cex, common_state.expected_cex):
with open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, i), "w") as wit_file:
with open(f"{job.workdir}/engine_{engine_idx}/trace{i}.wit", "w") as wit_file:
print("unsat", file=wit_file)
common_state.running_tasks -= 1
if (common_state.running_tasks == 0):
print_traces_and_terminate()
task = SbyTask(job, "engine_{}".format(engine_idx), job.model("btor"),
"cd {}; {} model/design_btor.btor".format(job.workdir, solver_cmd),
logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
task = SbyTask(
job,
f"engine_{engine_idx}", job.model("btor"),
f"cd {job.workdir}; {solver_cmd} model/design_btor.btor",
logfile=open("{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
)
task.output_callback = output_callback
task.exit_callback = exit_callback

View file

@ -68,7 +68,7 @@ def run(mode, job, engine_idx, engine):
elif o == "--seed":
random_seed = a
else:
job.error("Invalid smtbmc options {}.".format(o))
job.error(f"Invalid smtbmc options {o}.")
xtra_opts = False
for i, a in enumerate(args):
@ -89,7 +89,7 @@ def run(mode, job, engine_idx, engine):
smtbmc_opts += ["--unroll"]
if job.opt_smtc is not None:
smtbmc_opts += ["--smtc", "src/{}".format(job.opt_smtc)]
smtbmc_opts += ["--smtc", f"src/{job.opt_smtc}"]
if job.opt_tbtop is not None:
smtbmc_opts += ["--vlogtb-top", job.opt_tbtop]
@ -107,9 +107,9 @@ def run(mode, job, engine_idx, engine):
run("prove_induction", job, engine_idx, engine)
return
taskname = "engine_{}".format(engine_idx)
trace_prefix = "engine_{}/trace".format(engine_idx)
logfile_prefix = "{}/engine_{}/logfile".format(job.workdir, engine_idx)
taskname = f"engine_{engine_idx}"
trace_prefix = f"engine_{engine_idx}/trace"
logfile_prefix = f"{job.workdir}/engine_{engine_idx}/logfile"
if mode == "prove_basecase":
taskname += ".basecase"
@ -137,10 +137,15 @@ def run(mode, job, engine_idx, engine):
else:
t_opt = "{}".format(job.opt_depth)
task = SbyTask(job, taskname, job.model(model_name),
"cd {}; {} {} -t {} {} --append {} --dump-vcd {p}.vcd --dump-vlogtb {p}_tb.v --dump-smtc {p}.smtc model/design_{}.smt2".format
(job.workdir, job.exe_paths["smtbmc"], " ".join(smtbmc_opts), t_opt, "--info \"(set-option :random-seed {})\"".format(random_seed) if random_seed else "", job.opt_append, model_name, p=trace_prefix),
logfile=open(logfile_prefix + ".txt", "w"), logstderr=(not progress))
random_seed = f"--info \"(set-option :random-seed {random_seed})\"" if random_seed else ""
task = SbyTask(
job,
taskname,
job.model(model_name),
f"""cd {job.workdir}; {job.exe_paths["smtbmc"]} {" ".join(smtbmc_opts)} -t {t_opt} {random_seed} --append {job.opt_append} --dump-vcd {trace_prefix}.vcd --dump-vlogtb {trace_prefix}_tb.v --dump-smtc {trace_prefix}.smtc model/design_{model_name}.smt2""",
logfile=open(logfile_prefix + ".txt", "w"),
logstderr=(not progress)
)
if mode == "prove_basecase":
job.basecase_tasks.append(task)
@ -177,37 +182,37 @@ def run(mode, job, engine_idx, engine):
def exit_callback(retcode):
if task_status is None:
job.error("engine_{}: Engine terminated without status.".format(engine_idx))
job.error(f"engine_{engine_idx}: Engine terminated without status.")
if mode == "bmc" or mode == "cover":
job.update_status(task_status)
task_status_lower = task_status.lower() if task_status == "PASS" else task_status
job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status_lower))
job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status_lower))
job.log(f"engine_{engine_idx}: Status returned by engine: {task_status_lower}")
job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower}""")
if task_status == "FAIL" and mode != "cover":
if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
elif task_status == "PASS" and mode == "cover":
print_traces_max = 5
for i in range(print_traces_max):
if os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i)):
job.summary.append("trace: {}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i))
if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{i}.vcd"):
job.summary.append(f"trace: {job.workdir}/engine_{engine_idx}/trace{i}.vcd")
else:
break
else:
excess_traces = 0
while os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, print_traces_max + excess_traces)):
while os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{print_traces_max + excess_traces}.vcd"):
excess_traces += 1
if excess_traces > 0:
job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else ""))
job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""")
job.terminate()
elif mode in ["prove_basecase", "prove_induction"]:
task_status_lower = task_status.lower() if task_status == "PASS" else task_status
job.log("engine_{}: Status returned by engine for {}: {}".format(engine_idx, mode.split("_")[1], task_status_lower))
job.summary.append("engine_{} ({}) returned {} for {}".format(engine_idx, " ".join(engine), task_status_lower, mode.split("_")[1]))
job.log(f"""engine_{engine_idx}: Status returned by engine for {mode.split("_")[1]}: {task_status_lower}""")
job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower} for {mode.split("_")[1]}""")
if mode == "prove_basecase":
for task in job.basecase_tasks:
@ -218,8 +223,8 @@ def run(mode, job, engine_idx, engine):
else:
job.update_status(task_status)
if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
job.terminate()
elif mode == "prove_induction":

View file

@ -28,8 +28,8 @@ def run(job):
engine = job.engines[engine_idx]
assert len(engine) > 0
job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
@ -44,4 +44,4 @@ def run(job):
sby_engine_btor.run("bmc", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for bmc mode.".format(engine[0]))
job.error(f"Invalid engine '{engine[0]}' for bmc mode.")

View file

@ -27,8 +27,8 @@ def run(job):
engine = job.engines[engine_idx]
assert len(engine) > 0
job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
@ -39,4 +39,4 @@ def run(job):
sby_engine_btor.run("cover", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for cover mode.".format(engine[0]))
job.error(f"Invalid engine '{engine[0]}' for cover mode.")

View file

@ -28,12 +28,12 @@ def run(job):
engine = job.engines[engine_idx]
assert len(engine) > 0
job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "aiger":
import sby_engine_aiger
sby_engine_aiger.run("live", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for live mode.".format(engine[0]))
job.error(f"Invalid engine '{engine[0]}' for live mode.")

View file

@ -35,8 +35,8 @@ def run(job):
engine = job.engines[engine_idx]
assert len(engine) > 0
job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
@ -51,4 +51,4 @@ def run(job):
sby_engine_abc.run("prove", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for prove mode.".format(engine[0]))
job.error(f"Invalid engine '{engine[0]}' for prove mode.")