mirror of
https://github.com/YosysHQ/sby.git
synced 2025-04-05 22:14:08 +00:00
Merge pull request #196 from jix/parallel_jobserver
Run tasks in parallel and integrate with the make jobserver
This commit is contained in:
commit
bd88454d7d
120
sbysrc/sby.py
120
sbysrc/sby.py
|
@ -19,9 +19,12 @@
|
|||
|
||||
import argparse, json, os, sys, shutil, tempfile, re
|
||||
##yosys-sys-path##
|
||||
from sby_core import SbyConfig, SbyTask, SbyAbort, process_filename
|
||||
from sby_core import SbyConfig, SbyTask, SbyAbort, SbyTaskloop, process_filename
|
||||
from sby_jobserver import SbyJobClient, process_jobserver_environment
|
||||
import time, platform
|
||||
|
||||
process_jobserver_environment() # needs to be called early
|
||||
|
||||
class DictAction(argparse.Action):
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()"
|
||||
|
@ -46,6 +49,11 @@ parser.add_argument("-T", metavar="<taskname>", action="append", dest="tasknames
|
|||
help="add taskname (useful when sby file is read from stdin)")
|
||||
parser.add_argument("-E", action="store_true", dest="throw_err",
|
||||
help="throw an exception (incl stack trace) for most errors")
|
||||
parser.add_argument("-j", metavar="<N>", type=int, dest="jobcount",
|
||||
help="maximum number of processes to run in parallel")
|
||||
parser.add_argument("--sequential", action="store_true", dest="sequential",
|
||||
help="run tasks in sequence, not in parallel")
|
||||
|
||||
parser.add_argument("--autotune", action="store_true", dest="autotune",
|
||||
help="automatically find a well performing engine and engine configuration for each task")
|
||||
parser.add_argument("--autotune-config", dest="autotune_config",
|
||||
|
@ -114,6 +122,8 @@ reusedir = False
|
|||
setupmode = args.setupmode
|
||||
autotune = args.autotune
|
||||
autotune_config = args.autotune_config
|
||||
sequential = args.sequential
|
||||
jobcount = args.jobcount
|
||||
init_config_file = args.init_config_file
|
||||
|
||||
if sbyfile is not None:
|
||||
|
@ -401,7 +411,7 @@ if (workdir is not None) and (len(tasknames) != 1):
|
|||
print("ERROR: Exactly one task is required when workdir is specified. Specify the task or use --prefix instead of -d.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
def run_task(taskname):
|
||||
def start_task(taskloop, taskname):
|
||||
sbyconfig, _, _, _ = read_sbyconfig(sbydata, taskname)
|
||||
|
||||
my_opt_tmpdir = opt_tmpdir
|
||||
|
@ -463,48 +473,96 @@ def run_task(taskname):
|
|||
else:
|
||||
junit_filename = "junit"
|
||||
|
||||
task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir)
|
||||
task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir, taskloop)
|
||||
|
||||
for k, v in exe_paths.items():
|
||||
task.exe_paths[k] = v
|
||||
|
||||
try:
|
||||
if autotune:
|
||||
import sby_autotune
|
||||
sby_autotune.SbyAutotune(task, autotune_config).run()
|
||||
def exit_callback():
|
||||
if not autotune and not setupmode:
|
||||
task.summarize()
|
||||
task.write_summary_file()
|
||||
|
||||
if my_opt_tmpdir:
|
||||
task.log(f"Removing directory '{my_workdir}'.")
|
||||
shutil.rmtree(my_workdir, ignore_errors=True)
|
||||
|
||||
if setupmode:
|
||||
task.log(f"SETUP COMPLETE (rc={task.retcode})")
|
||||
else:
|
||||
task.run(setupmode)
|
||||
except SbyAbort:
|
||||
if throw_err:
|
||||
raise
|
||||
task.log(f"DONE ({task.status}, rc={task.retcode})")
|
||||
task.logfile.close()
|
||||
|
||||
if my_opt_tmpdir:
|
||||
task.log(f"Removing directory '{my_workdir}'.")
|
||||
shutil.rmtree(my_workdir, ignore_errors=True)
|
||||
if not my_opt_tmpdir and not setupmode and not autotune:
|
||||
with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f:
|
||||
task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False)
|
||||
|
||||
if setupmode:
|
||||
task.log(f"SETUP COMPLETE (rc={task.retcode})")
|
||||
else:
|
||||
task.log(f"DONE ({task.status}, rc={task.retcode})")
|
||||
task.logfile.close()
|
||||
with open(f"{task.workdir}/status", "w") as f:
|
||||
print(f"{task.status} {task.retcode} {task.total_time}", file=f)
|
||||
|
||||
if not my_opt_tmpdir and not setupmode and not autotune:
|
||||
with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f:
|
||||
task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False)
|
||||
task.exit_callback = exit_callback
|
||||
|
||||
with open(f"{task.workdir}/status", "w") as f:
|
||||
print(f"{task.status} {task.retcode} {task.total_time}", file=f)
|
||||
|
||||
return task.retcode
|
||||
if not autotune:
|
||||
task.setup_procs(setupmode)
|
||||
task.task_local_abort = not throw_err
|
||||
|
||||
return task
|
||||
|
||||
failed = []
|
||||
retcode = 0
|
||||
for task in tasknames:
|
||||
task_retcode = run_task(task)
|
||||
retcode |= task_retcode
|
||||
if task_retcode:
|
||||
failed.append(task)
|
||||
|
||||
if jobcount is not None and jobcount < 1:
|
||||
print("ERROR: The -j option requires a positive number as argument")
|
||||
sys.exit(1)
|
||||
|
||||
# Autotune is already parallel, parallelizing it across tasks needs some more work
|
||||
if autotune:
|
||||
sequential = True
|
||||
|
||||
if sequential:
|
||||
if autotune:
|
||||
jobclient = None # TODO make autotune use a jobclient
|
||||
else:
|
||||
jobclient = SbyJobClient(jobcount)
|
||||
|
||||
for taskname in tasknames:
|
||||
taskloop = SbyTaskloop(jobclient)
|
||||
try:
|
||||
task = start_task(taskloop, taskname)
|
||||
except SbyAbort:
|
||||
if throw_err:
|
||||
raise
|
||||
sys.exit(1)
|
||||
|
||||
if autotune:
|
||||
from sby_autotune import SbyAutotune
|
||||
SbyAutotune(task, autotune_config).run()
|
||||
elif setupmode:
|
||||
task.exit_callback()
|
||||
else:
|
||||
taskloop.run()
|
||||
retcode |= task.retcode
|
||||
if task.retcode:
|
||||
failed.append(taskname)
|
||||
else:
|
||||
jobclient = SbyJobClient(jobcount)
|
||||
taskloop = SbyTaskloop(jobclient)
|
||||
|
||||
tasks = {}
|
||||
for taskname in tasknames:
|
||||
try:
|
||||
tasks[taskname] = start_task(taskloop, taskname)
|
||||
except SbyAbort:
|
||||
if throw_err:
|
||||
raise
|
||||
sys.exit(1)
|
||||
|
||||
taskloop.run()
|
||||
|
||||
for taskname, task in tasks.items():
|
||||
retcode |= task.retcode
|
||||
if task.retcode:
|
||||
failed.append(taskname)
|
||||
|
||||
if failed and (len(tasknames) > 1 or tasknames[0] is not None):
|
||||
tm = time.localtime()
|
||||
|
|
|
@ -168,6 +168,7 @@ class SbyAutotune:
|
|||
"""Performs automatic engine selection for a given task.
|
||||
"""
|
||||
def __init__(self, task, config_file=None):
|
||||
self.task_exit_callback = task.exit_callback
|
||||
task.exit_callback = lambda: None
|
||||
task.check_timeout = lambda: None
|
||||
task.status = "TIMEOUT"
|
||||
|
@ -432,6 +433,8 @@ class SbyAutotune:
|
|||
self.task.status = "FAIL"
|
||||
self.task.retcode = 2
|
||||
|
||||
self.task_exit_callback()
|
||||
|
||||
def next_candidate(self, peek=False):
|
||||
# peek=True is used to check whether we need to timeout running candidates to
|
||||
# give other candidates a chance.
|
||||
|
@ -635,6 +638,8 @@ class SbyAutotuneTask(SbyTask):
|
|||
self.model_time = 0
|
||||
self.model_requests = []
|
||||
|
||||
self.exit_callback = self.autotune_exit_callback
|
||||
|
||||
|
||||
def parse_config(self, f):
|
||||
super().parse_config(f)
|
||||
|
@ -650,8 +655,8 @@ class SbyAutotuneTask(SbyTask):
|
|||
self.log(f"using model '{model_name}'")
|
||||
return self.autotune.model(self, model_name)
|
||||
|
||||
def exit_callback(self):
|
||||
super().exit_callback()
|
||||
def autotune_exit_callback(self):
|
||||
self.summarize()
|
||||
|
||||
self.candidate.total_adjusted_time = int(monotonic() - self.start_clock_time + self.model_time)
|
||||
self.candidate.engine_retcode = self.retcode
|
||||
|
|
|
@ -82,6 +82,7 @@ class SbyProc:
|
|||
self.logstderr = logstderr
|
||||
self.silent = silent
|
||||
self.wait = False
|
||||
self.job_lease = None
|
||||
|
||||
self.task.update_proc_pending(self)
|
||||
|
||||
|
@ -162,6 +163,13 @@ class SbyProc:
|
|||
if not dep.finished:
|
||||
return
|
||||
|
||||
if self.task.taskloop.jobclient:
|
||||
if self.job_lease is None:
|
||||
self.job_lease = self.task.taskloop.jobclient.request_lease()
|
||||
|
||||
if not self.job_lease.is_ready:
|
||||
return
|
||||
|
||||
if not self.silent:
|
||||
self.task.log(f"{self.info}: starting process \"{self.cmdline}\"")
|
||||
|
||||
|
@ -190,8 +198,12 @@ class SbyProc:
|
|||
# The process might have written something since the last time we checked
|
||||
self.read_output()
|
||||
|
||||
if self.job_lease:
|
||||
self.job_lease.done()
|
||||
|
||||
if not self.silent:
|
||||
self.task.log(f"{self.info}: finished (returncode={self.p.returncode})")
|
||||
|
||||
self.task.update_proc_stopped(self)
|
||||
self.running = False
|
||||
self.exited = True
|
||||
|
@ -515,18 +527,26 @@ class SbyConfig:
|
|||
|
||||
|
||||
class SbyTaskloop:
|
||||
def __init__(self):
|
||||
def __init__(self, jobclient=None):
|
||||
self.procs_pending = []
|
||||
self.procs_running = []
|
||||
self.tasks = []
|
||||
self.poll_now = False
|
||||
self.jobclient = jobclient
|
||||
|
||||
def run(self):
|
||||
for proc in self.procs_pending:
|
||||
proc.poll()
|
||||
|
||||
while len(self.procs_running) or self.poll_now:
|
||||
|
||||
waiting_for_jobslots = False
|
||||
if self.jobclient:
|
||||
waiting_for_jobslots = self.jobclient.has_pending_leases()
|
||||
|
||||
while self.procs_running or waiting_for_jobslots or self.poll_now:
|
||||
fds = []
|
||||
if self.jobclient:
|
||||
fds.extend(self.jobclient.poll_fds())
|
||||
for proc in self.procs_running:
|
||||
if proc.running:
|
||||
fds.append(proc.p.stdout)
|
||||
|
@ -541,12 +561,20 @@ class SbyTaskloop:
|
|||
sleep(0.1)
|
||||
self.poll_now = False
|
||||
|
||||
if self.jobclient:
|
||||
self.jobclient.poll()
|
||||
|
||||
self.procs_waiting = []
|
||||
|
||||
for proc in self.procs_running:
|
||||
proc.poll()
|
||||
|
||||
for proc in self.procs_pending:
|
||||
proc.poll()
|
||||
|
||||
if self.jobclient:
|
||||
waiting_for_jobslots = self.jobclient.has_pending_leases()
|
||||
|
||||
tasks = self.tasks
|
||||
self.tasks = []
|
||||
for task in tasks:
|
||||
|
@ -574,6 +602,7 @@ class SbyTask(SbyConfig):
|
|||
self.precise_prop_status = False
|
||||
self.timeout_reached = False
|
||||
self.task_local_abort = False
|
||||
self.exit_callback = self.summarize
|
||||
|
||||
yosys_program_prefix = "" ##yosys-program-prefix##
|
||||
self.exe_paths = {
|
||||
|
@ -932,12 +961,6 @@ class SbyTask(SbyConfig):
|
|||
else:
|
||||
assert 0
|
||||
|
||||
def run(self, setupmode):
|
||||
self.setup_procs(setupmode)
|
||||
if not setupmode:
|
||||
self.taskloop.run()
|
||||
self.write_summary_file()
|
||||
|
||||
def handle_non_engine_options(self):
|
||||
with open(f"{self.workdir}/config.sby", "r") as f:
|
||||
self.parse_config(f)
|
||||
|
@ -1034,6 +1057,8 @@ class SbyTask(SbyConfig):
|
|||
total_process_time = int((ru.ru_utime + ru.ru_stime) - self.start_process_time)
|
||||
self.total_time = total_process_time
|
||||
|
||||
# TODO process time is incorrect when running in parallel
|
||||
|
||||
self.summary = [
|
||||
"Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format
|
||||
(total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time),
|
||||
|
@ -1066,9 +1091,6 @@ class SbyTask(SbyConfig):
|
|||
for line in self.summary:
|
||||
print(line, file=f)
|
||||
|
||||
def exit_callback(self):
|
||||
self.summarize()
|
||||
|
||||
def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False):
|
||||
junit_time = strftime('%Y-%m-%dT%H:%M:%S')
|
||||
if not self.design:
|
||||
|
|
314
sbysrc/sby_jobserver.py
Normal file
314
sbysrc/sby_jobserver.py
Normal file
|
@ -0,0 +1,314 @@
|
|||
#
|
||||
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
|
||||
#
|
||||
# Copyright (C) 2022 Jannis Harder <jix@yosyshq.com> <me@jix.one>
|
||||
#
|
||||
# Permission to use, copy, modify, and/or distribute this software for any
|
||||
# purpose with or without fee is hereby granted, provided that the above
|
||||
# copyright notice and this permission notice appear in all copies.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
#
|
||||
|
||||
import atexit
|
||||
import os
|
||||
import select
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import weakref
|
||||
import signal
|
||||
|
||||
if os.name == "posix":
|
||||
import fcntl
|
||||
|
||||
inherited_jobcount = None
|
||||
inherited_jobserver_auth = None
|
||||
inherited_jobserver_auth_present = None
|
||||
|
||||
def process_jobserver_environment():
|
||||
"""Process the environment looking for a make jobserver. This should be called
|
||||
early (when only inherited fds are present) to reliably detect whether the jobserver
|
||||
specified in the environment is accessible."""
|
||||
global inherited_jobcount
|
||||
global inherited_jobserver_auth
|
||||
global inherited_jobserver_auth_present
|
||||
|
||||
if len(sys.argv) >= 2 and sys.argv[1] == '--jobserver-helper':
|
||||
jobserver_helper(*map(int, sys.argv[2:]))
|
||||
exit(0)
|
||||
|
||||
inherited_jobserver_auth_present = False
|
||||
|
||||
for flag in shlex.split(os.environ.get("MAKEFLAGS", "")):
|
||||
if flag.startswith("-j"):
|
||||
if flag == "-j":
|
||||
inherited_jobcount = 0
|
||||
else:
|
||||
try:
|
||||
inherited_jobcount = int(flag[2:])
|
||||
except ValueError:
|
||||
pass
|
||||
elif flag.startswith("--jobserver-auth=") or flag.startswith("--jobserver-fds="):
|
||||
inherited_jobserver_auth_present = True
|
||||
if os.name == "posix":
|
||||
arg = flag.split("=", 1)[1].split(",")
|
||||
try:
|
||||
jobserver_fds = int(arg[0]), int(arg[1])
|
||||
for fd in jobserver_fds:
|
||||
fcntl.fcntl(fd, fcntl.F_GETFD)
|
||||
except (ValueError, OSError):
|
||||
pass
|
||||
else:
|
||||
inherited_jobserver_auth = jobserver_fds
|
||||
|
||||
|
||||
def jobserver_helper(jobserver_read_fd, jobserver_write_fd, request_fd, response_fd):
|
||||
"""Helper process to handle blocking jobserver pipes."""
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
pending = 0
|
||||
while True:
|
||||
try:
|
||||
new_pending = len(os.read(request_fd, 1024))
|
||||
if new_pending == 0:
|
||||
pending = 0
|
||||
break
|
||||
else:
|
||||
pending += new_pending
|
||||
continue
|
||||
except BlockingIOError:
|
||||
if pending == 0:
|
||||
select.select([request_fd], [], [])
|
||||
continue
|
||||
|
||||
if pending > 0:
|
||||
try:
|
||||
# Depending on the make version (4.3 vs 4.2) this is blocking or
|
||||
# non-blocking. As this is an attribute of the pipe not the fd, we
|
||||
# cannot change it without affecting other processes. Older versions of
|
||||
# gnu make require this to be blocking, and produce errors if it is
|
||||
# non-blocking. Newer versions of gnu make set this non-blocking, both,
|
||||
# as client and as server. The documentation still says it is blocking.
|
||||
# This leaves us no choice but to handle both cases, which is the reason
|
||||
# we have this helper process in the first place.
|
||||
token = os.read(jobserver_read_fd, 1)
|
||||
except BlockingIOError:
|
||||
select.select([jobserver_read_fd], [], [])
|
||||
continue
|
||||
|
||||
pending -= 1
|
||||
|
||||
try:
|
||||
os.write(response_fd, token)
|
||||
except:
|
||||
os.write(jobserver_write_fd, token)
|
||||
raise
|
||||
os.close(jobserver_write_fd)
|
||||
|
||||
|
||||
class SbyJobLease:
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.is_ready = False
|
||||
self.is_done = False
|
||||
|
||||
def done(self):
|
||||
if self.is_ready and not self.is_done:
|
||||
self.client.return_lease()
|
||||
|
||||
self.is_done = True
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.is_ready=} {self.is_done=}"
|
||||
|
||||
def __del__(self):
|
||||
self.done()
|
||||
|
||||
|
||||
class SbyJobServer:
|
||||
def __init__(self, jobcount):
|
||||
assert jobcount >= 1
|
||||
# TODO support unlimited parallelism?
|
||||
self.jobcount = jobcount
|
||||
if jobcount == 1:
|
||||
self.read_fd, self.write_fd = None, None
|
||||
self.makeflags = None
|
||||
elif jobcount > 1:
|
||||
self.read_fd, self.write_fd = os.pipe()
|
||||
if os.getenv('SBY_BLOCKING_JOBSERVER') != '1':
|
||||
os.set_blocking(self.read_fd, False)
|
||||
os.write(self.write_fd, b"*" * (jobcount - 1))
|
||||
self.makeflags = f"-j{jobcount} --jobserver-auth={self.read_fd},{self.write_fd} --jobserver-fds={self.read_fd},{self.write_fd}"
|
||||
|
||||
|
||||
class SbyJobClient:
|
||||
def __init__(self, fallback_jobcount=None):
|
||||
self.jobcount = None
|
||||
self.read_fd = self.write_fd = None
|
||||
self.helper_process = None
|
||||
|
||||
self.local_slots = 1
|
||||
self.acquired_slots = []
|
||||
self.pending_leases = []
|
||||
|
||||
assert inherited_jobserver_auth_present is not None, "process_jobserver_environment was not called"
|
||||
|
||||
have_jobserver = inherited_jobserver_auth_present
|
||||
|
||||
if os.name == "nt" and inherited_jobserver_auth_present:
|
||||
# There are even more incompatible variants of the make jobserver on
|
||||
# windows, none of them are supported for now.
|
||||
print("WARNING: Found jobserver in MAKEFLAGS, this is not supported on windows.")
|
||||
have_jobserver = False
|
||||
|
||||
if have_jobserver and inherited_jobserver_auth is None:
|
||||
print("WARNING: Could not connect to jobserver specified in MAKEFLAGS, disabling parallel execution.")
|
||||
have_jobserver = False
|
||||
fallback_jobcount = 1
|
||||
|
||||
if have_jobserver:
|
||||
jobcount = inherited_jobcount
|
||||
elif fallback_jobcount is not None:
|
||||
jobcount = fallback_jobcount
|
||||
elif inherited_jobcount is not None and inherited_jobcount > 0:
|
||||
jobcount = inherited_jobcount
|
||||
else:
|
||||
try:
|
||||
jobcount = len(os.sched_getaffinity(0))
|
||||
except AttributeError:
|
||||
jobcount = os.cpu_count()
|
||||
|
||||
if have_jobserver:
|
||||
self.read_fd, self.write_fd = inherited_jobserver_auth
|
||||
elif os.name == "nt":
|
||||
# On Windows, without a jobserver, use only local slots
|
||||
self.local_slots = jobcount
|
||||
else:
|
||||
self.sby_jobserver = SbyJobServer(jobcount)
|
||||
self.read_fd = self.sby_jobserver.read_fd
|
||||
self.write_fd = self.sby_jobserver.write_fd
|
||||
|
||||
self.jobcount = jobcount
|
||||
|
||||
if self.read_fd is not None:
|
||||
if os.get_blocking(self.read_fd):
|
||||
request_read_fd, self.request_write_fd = os.pipe()
|
||||
self.response_read_fd, response_write_fd = os.pipe()
|
||||
os.set_blocking(self.response_read_fd, False)
|
||||
os.set_blocking(request_read_fd, False)
|
||||
|
||||
pass_fds = [self.read_fd, self.write_fd, request_read_fd, response_write_fd]
|
||||
|
||||
self.helper_process = subprocess.Popen(
|
||||
[sys.executable, sys.modules['__main__'].__file__, '--jobserver-helper', *map(str, pass_fds)],
|
||||
stdin=subprocess.DEVNULL,
|
||||
pass_fds=pass_fds,
|
||||
)
|
||||
|
||||
os.close(request_read_fd)
|
||||
os.close(response_write_fd)
|
||||
|
||||
atexit.register(self.atexit_blocking)
|
||||
else:
|
||||
atexit.register(self.atexit_nonblocking)
|
||||
|
||||
def atexit_nonblocking(self):
|
||||
while self.acquired_slots:
|
||||
os.write(self.write_fd, self.acquired_slots.pop())
|
||||
|
||||
def atexit_blocking(self):
|
||||
# Return all slot tokens we are currently holding
|
||||
while self.acquired_slots:
|
||||
os.write(self.write_fd, self.acquired_slots.pop())
|
||||
|
||||
if self.helper_process:
|
||||
# Closing the request pipe singals the helper that we want to exit
|
||||
os.close(self.request_write_fd)
|
||||
|
||||
# The helper might have been in the process of sending us some tokens, which
|
||||
# we still need to return
|
||||
while True:
|
||||
try:
|
||||
token = os.read(self.response_read_fd, 1)
|
||||
except BlockingIOError:
|
||||
select.select([self.response_read_fd], [], [])
|
||||
continue
|
||||
if not token:
|
||||
break
|
||||
os.write(self.write_fd, token)
|
||||
os.close(self.response_read_fd)
|
||||
|
||||
# Wait for the helper to exit, should be immediate at this point
|
||||
self.helper_process.wait()
|
||||
|
||||
def request_lease(self):
|
||||
pending = SbyJobLease(self)
|
||||
|
||||
if self.local_slots > 0:
|
||||
self.local_slots -= 1
|
||||
pending.is_ready = True
|
||||
else:
|
||||
self.pending_leases.append(weakref.ref(pending))
|
||||
if self.helper_process:
|
||||
os.write(self.request_write_fd, b"!")
|
||||
|
||||
return pending
|
||||
|
||||
def return_lease(self):
|
||||
if self.acquired_slots:
|
||||
os.write(self.write_fd, self.acquired_slots.pop())
|
||||
return
|
||||
|
||||
if self.activate_pending_lease():
|
||||
return
|
||||
|
||||
self.local_slots += 1
|
||||
|
||||
def activate_pending_lease(self):
|
||||
while self.pending_leases:
|
||||
pending = self.pending_leases.pop(0)()
|
||||
if pending is None:
|
||||
continue
|
||||
pending.is_ready = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def has_pending_leases(self):
|
||||
while self.pending_leases and not self.pending_leases[-1]():
|
||||
self.pending_leases.pop()
|
||||
return bool(self.pending_leases)
|
||||
|
||||
def poll_fds(self):
|
||||
if self.helper_process:
|
||||
return [self.response_read_fd]
|
||||
elif self.read_fd is not None:
|
||||
return [self.read_fd]
|
||||
else:
|
||||
return []
|
||||
|
||||
def poll(self):
|
||||
read_fd = self.response_read_fd if self.helper_process else self.read_fd
|
||||
if read_fd is None:
|
||||
return
|
||||
|
||||
while self.helper_process or self.has_pending_leases():
|
||||
try:
|
||||
token = os.read(read_fd, 1)
|
||||
except BlockingIOError:
|
||||
break
|
||||
|
||||
self.got_token(token)
|
||||
|
||||
def got_token(self, token):
|
||||
self.acquired_slots.append(token)
|
||||
|
||||
if self.activate_pending_lease():
|
||||
return
|
||||
|
||||
self.return_lease()
|
|
@ -59,7 +59,7 @@ if __name__ == "__main__":
|
|||
exit(noskip)
|
||||
|
||||
print(command, flush=True)
|
||||
exit(subprocess.call(command, shell=True))
|
||||
exit(subprocess.call(command, shell=True, close_fds=False))
|
||||
|
||||
found_tools = []
|
||||
check_tools = set()
|
||||
|
|
|
@ -82,7 +82,7 @@ with rules_file.open("w") as rules:
|
|||
command = f"cd {sby_dir_unix} && python3 $(SBY_MAIN) -f {sby_file.name} {task}"
|
||||
|
||||
print(
|
||||
f"\t@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}",
|
||||
f"\t+@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}",
|
||||
file=rules,
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue