3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-18 19:19:00 +00:00
This commit is contained in:
Ed Bordin 2021-07-07 20:04:09 +08:00 committed by GitHub
commit 3960fd3e8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 129 additions and 99 deletions

View file

@ -20,6 +20,8 @@ import os, re, sys, signal
if os.name == "posix":
import resource, fcntl
import subprocess
import asyncio
from functools import partial
from shutil import copyfile, rmtree
from select import select
from time import time, localtime, sleep
@ -82,17 +84,12 @@ class SbyTask:
self.job.tasks_pending.append(self)
for dep in self.deps:
dep.register_dep(self)
if not dep.finished:
dep.notify.append(self)
self.output_callback = None
self.exit_callback = None
def register_dep(self, next_task):
if self.finished:
next_task.poll()
else:
self.notify.append(next_task)
def log(self, line):
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
if self.logfile is not None:
@ -120,17 +117,39 @@ class SbyTask:
if self.running:
if not self.silent:
self.job.log("{}: terminating process".format(self.info))
if os.name == "posix":
if os.name != "posix":
# self.p.terminate does not actually terminate underlying
# processes on Windows, so send ctrl+break to the process
# group we created. This for some reason does
# not cause the associated future (self.fut) to complete
# until it is awaited on one last time.
os.kill(self.p.pid, signal.CTRL_BREAK_EVENT)
else:
try:
os.killpg(self.p.pid, signal.SIGTERM)
except PermissionError:
pass
self.p.terminate()
self.p.terminate()
self.job.tasks_running.remove(self)
self.job.tasks_retired.append(self)
all_tasks_running.remove(self)
self.terminated = True
def poll(self):
async def output(self):
while True:
outs = await self.p.stdout.readline()
await asyncio.sleep(0) # https://bugs.python.org/issue24532
outs = outs.decode("utf-8")
if len(outs) == 0: break
if outs[-1] != '\n':
self.linebuffer += outs
break
outs = (self.linebuffer + outs).strip()
self.linebuffer = ""
self.handle_output(outs)
async def maybe_spawn(self):
if self.finished or self.terminated:
return
@ -141,68 +160,54 @@ class SbyTask:
if not self.silent:
self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline))
if os.name == "posix":
def preexec_fn():
signal.signal(signal.SIGINT, signal.SIG_IGN)
os.setpgrp()
self.p = subprocess.Popen(["/usr/bin/env", "bash", "-c", self.cmdline], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=(subprocess.STDOUT if self.logstderr else None), preexec_fn=preexec_fn)
fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
subp_kwargs = { "preexec_fn" : preexec_fn }
else:
self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=(subprocess.STDOUT if self.logstderr else None))
subp_kwargs = { "creationflags" : subprocess.CREATE_NEW_PROCESS_GROUP }
self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=(asyncio.subprocess.STDOUT if self.logstderr else None),
**subp_kwargs)
self.job.tasks_pending.remove(self)
self.job.tasks_running.append(self)
all_tasks_running.append(self)
self.running = True
return
asyncio.ensure_future(self.output())
self.fut = asyncio.ensure_future(self.p.wait())
while True:
outs = self.p.stdout.readline().decode("utf-8")
if len(outs) == 0: break
if outs[-1] != '\n':
self.linebuffer += outs
break
outs = (self.linebuffer + outs).strip()
self.linebuffer = ""
self.handle_output(outs)
async def shutdown_and_notify(self):
if not self.silent:
self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
self.job.tasks_running.remove(self)
self.job.tasks_retired.append(self)
self.running = False
if self.p.poll() is not None:
self.handle_exit(self.p.returncode)
if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
self.job.tasks_running.remove(self)
all_tasks_running.remove(self)
self.running = False
if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return
self.handle_exit(self.p.returncode)
if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: job failed. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return
self.finished = True
for next_task in self.notify:
next_task.poll()
self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return
if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: job failed. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return
self.finished = True
for next_task in self.notify:
await next_task.maybe_spawn()
return
class SbyAbort(BaseException):
pass
@ -237,6 +242,7 @@ class SbyJob:
self.tasks_running = []
self.tasks_pending = []
self.tasks_retired = []
self.start_clock_time = time()
@ -257,35 +263,59 @@ class SbyJob:
print(line, file=f)
def taskloop(self):
if os.name != "posix":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
poll_fut = asyncio.ensure_future(self.task_poller())
loop.run_until_complete(poll_fut)
async def timekeeper(self):
total_clock_time = int(time() - self.start_clock_time)
try:
while total_clock_time <= self.opt_timeout:
await asyncio.sleep(1)
total_clock_time = int(time() - self.start_clock_time)
except asyncio.CancelledError:
pass
def timeout(self, fut):
self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
self.status = "TIMEOUT"
self.terminate(timeout=True)
async def task_poller(self):
if self.opt_timeout is not None:
timer_fut = asyncio.ensure_future(self.timekeeper())
done_cb = partial(SbyJob.timeout, self)
timer_fut.add_done_callback(done_cb)
for task in self.tasks_pending:
task.poll()
await task.maybe_spawn()
while len(self.tasks_running):
fds = []
task_futs = []
for task in self.tasks_running:
if task.running:
fds.append(task.p.stdout)
if os.name == "posix":
try:
select(fds, [], [], 1.0) == ([], [], [])
except InterruptedError:
pass
else:
sleep(0.1)
task_futs.append(task.fut)
(done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED)
for task in self.tasks_running:
task.poll()
if task.fut in done:
await task.shutdown_and_notify()
for task in self.tasks_pending:
task.poll()
if self.opt_timeout is not None:
timer_fut.remove_done_callback(done_cb)
timer_fut.cancel()
if self.opt_timeout is not None:
total_clock_time = int(time() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
self.status = "TIMEOUT"
self.terminate(timeout=True)
# Required on Windows. I am unsure why, but subprocesses that were
# terminated will not have their futures complete until awaited on
# one last time.
if os.name != "posix":
for t in self.tasks_retired:
if not t.fut.done():
await t.fut
def log(self, logmessage):
tm = localtime()
@ -641,19 +671,19 @@ class SbyJob:
if self.opt_mode == "bmc":
import sby_mode_bmc
sby_mode_bmc.run(self)
sby_mode_bmc.init(self)
elif self.opt_mode == "prove":
import sby_mode_prove
sby_mode_prove.run(self)
sby_mode_prove.init(self)
elif self.opt_mode == "live":
import sby_mode_live
sby_mode_live.run(self)
sby_mode_live.init(self)
elif self.opt_mode == "cover":
import sby_mode_cover
sby_mode_cover.run(self)
sby_mode_cover.init(self)
else:
assert False

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
abc_opts, abc_command = getopt.getopt(engine[1:], "", [])
if len(abc_command) == 0:

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
opts, solver_args = getopt.getopt(engine[1:], "", [])
if len(solver_args) == 0:

View file

@ -20,7 +20,7 @@ import re, os, getopt
from types import SimpleNamespace
from sby_core import SbyTask
def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
random_seed = None
opts, solver_args = getopt.getopt(engine[1:], "", ["seed="])

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
smtbmc_opts = []
nomem_opt = False
presat_opt = True
@ -102,9 +102,9 @@ def run(mode, job, engine_idx, engine):
if mode == "prove":
if not induction_only:
run("prove_basecase", job, engine_idx, engine)
init("prove_basecase", job, engine_idx, engine)
if not basecase_only:
run("prove_induction", job, engine_idx, engine)
init("prove_induction", job, engine_idx, engine)
return
taskname = "engine_{}".format(engine_idx)

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(job):
def init(job):
job.handle_int_option("depth", 20)
job.handle_int_option("append", 0)
job.handle_str_option("aigsmt", "yices")
@ -33,15 +33,15 @@ def run(job):
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_smtbmc.run("bmc", job, engine_idx, engine)
sby_engine_smtbmc.init("bmc", job, engine_idx, engine)
elif engine[0] == "abc":
import sby_engine_abc
sby_engine_abc.run("bmc", job, engine_idx, engine)
sby_engine_abc.init("bmc", job, engine_idx, engine)
elif engine[0] == "btor":
import sby_engine_btor
sby_engine_btor.run("bmc", job, engine_idx, engine)
sby_engine_btor.init("bmc", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for bmc mode.".format(engine[0]))

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(job):
def init(job):
job.handle_int_option("depth", 20)
job.handle_int_option("append", 0)
@ -32,7 +32,7 @@ def run(job):
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_smtbmc.run("cover", job, engine_idx, engine)
sby_engine_smtbmc.init("cover", job, engine_idx, engine)
elif engine[0] == "btor":
import sby_engine_btor

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(job):
def init(job):
job.handle_str_option("aigsmt", "yices")
job.status = "UNKNOWN"
@ -33,7 +33,7 @@ def run(job):
if engine[0] == "aiger":
import sby_engine_aiger
sby_engine_aiger.run("live", job, engine_idx, engine)
sby_engine_aiger.init("live", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for live mode.".format(engine[0]))

View file

@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask
def run(job):
def init(job):
job.handle_int_option("depth", 20)
job.handle_int_option("append", 0)
job.handle_str_option("aigsmt", "yices")
@ -40,15 +40,15 @@ def run(job):
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_smtbmc.run("prove", job, engine_idx, engine)
sby_engine_smtbmc.init("prove", job, engine_idx, engine)
elif engine[0] == "aiger":
import sby_engine_aiger
sby_engine_aiger.run("prove", job, engine_idx, engine)
sby_engine_aiger.init("prove", job, engine_idx, engine)
elif engine[0] == "abc":
import sby_engine_abc
sby_engine_abc.run("prove", job, engine_idx, engine)
sby_engine_abc.init("prove", job, engine_idx, engine)
else:
job.error("Invalid engine '{}' for prove mode.".format(engine[0]))