From b0786aea434a7a5239c1f74bb898b6e2eb9064a4 Mon Sep 17 00:00:00 2001 From: Jannis Harder Date: Wed, 13 Jul 2022 15:51:26 +0200 Subject: [PATCH] Make jobserver integration Only implements the POSIX jobserver and will break on windows. Unbreaking it on windows will be done as a follow up. Not used for autotune, that needs some more changes. --- sbysrc/sby.py | 27 +++- sbysrc/sby_core.py | 32 +++- sbysrc/sby_jobserver.py | 303 +++++++++++++++++++++++++++++++++++ tests/make/required_tools.py | 2 +- tests/make/test_rules.py | 2 +- 5 files changed, 359 insertions(+), 7 deletions(-) create mode 100644 sbysrc/sby_jobserver.py diff --git a/sbysrc/sby.py b/sbysrc/sby.py index 7e55da8..63c9f33 100644 --- a/sbysrc/sby.py +++ b/sbysrc/sby.py @@ -20,8 +20,11 @@ import argparse, json, os, sys, shutil, tempfile, re ##yosys-sys-path## from sby_core import SbyConfig, SbyTask, SbyAbort, SbyTaskloop, process_filename +from sby_jobserver import SbyJobClient, process_jobserver_environment import time, platform +process_jobserver_environment() # needs to be called early + class DictAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()" @@ -46,6 +49,11 @@ parser.add_argument("-T", metavar="", action="append", dest="tasknames help="add taskname (useful when sby file is read from stdin)") parser.add_argument("-E", action="store_true", dest="throw_err", help="throw an exception (incl stack trace) for most errors") +parser.add_argument("-j", metavar="", type=int, dest="jobcount", + help="maximum number of processes to run in parallel") +parser.add_argument("--sequential", action="store_true", dest="sequential", + help="run tasks in sequence, not in parallel") + parser.add_argument("--autotune", action="store_true", dest="autotune", help="automatically find a well performing engine and engine configuration for each task") parser.add_argument("--autotune-config", dest="autotune_config", @@ -114,6 +122,8 @@ reusedir = False setupmode = args.setupmode autotune = args.autotune autotune_config = args.autotune_config +sequential = args.sequential +jobcount = args.jobcount init_config_file = args.init_config_file if sbyfile is not None: @@ -501,12 +511,22 @@ def start_task(taskloop, taskname): failed = [] retcode = 0 +if jobcount is not None and jobcount < 1: + print("ERROR: The -j option requires a positive number as argument") + sys.exit(1) + # Autotune is already parallel, parallelizing it across tasks needs some more work -sequential = autotune # TODO selection between parallel/sequential +if autotune: + sequential = True if sequential: + if autotune: + jobclient = None # TODO make autotune use a jobclient + else: + jobclient = SbyJobClient(jobcount) + for taskname in tasknames: - taskloop = SbyTaskloop() + taskloop = SbyTaskloop(jobclient) try: task = start_task(taskloop, taskname) except SbyAbort: @@ -525,7 +545,8 @@ if sequential: if task.retcode: failed.append(taskname) else: - taskloop = SbyTaskloop() + jobclient = SbyJobClient(jobcount) + taskloop = SbyTaskloop(jobclient) tasks = {} for taskname in tasknames: diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index f49aad0..1f72f56 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -82,6 +82,7 @@ class SbyProc: self.logstderr = logstderr self.silent = silent self.wait = False + self.job_lease = None self.task.update_proc_pending(self) @@ -162,6 +163,13 @@ class SbyProc: if not dep.finished: return + if self.task.taskloop.jobclient: + if self.job_lease is None: + self.job_lease = self.task.taskloop.jobclient.request_lease() + + if not self.job_lease.is_ready: + return + if not self.silent: self.task.log(f"{self.info}: starting process \"{self.cmdline}\"") @@ -190,8 +198,12 @@ class SbyProc: # The process might have written something since the last time we checked self.read_output() + if self.job_lease: + self.job_lease.done() + if not self.silent: self.task.log(f"{self.info}: finished (returncode={self.p.returncode})") + self.task.update_proc_stopped(self) self.running = False self.exited = True @@ -378,18 +390,26 @@ class SbyConfig: class SbyTaskloop: - def __init__(self): + def __init__(self, jobclient=None): self.procs_pending = [] self.procs_running = [] self.tasks = [] self.poll_now = False + self.jobclient = jobclient def run(self): for proc in self.procs_pending: proc.poll() - while len(self.procs_running) or self.poll_now: + + waiting_for_jobslots = False + if self.jobclient: + waiting_for_jobslots = self.jobclient.has_pending_leases() + + while self.procs_running or waiting_for_jobslots or self.poll_now: fds = [] + if self.jobclient: + fds.extend(self.jobclient.poll_fds()) for proc in self.procs_running: if proc.running: fds.append(proc.p.stdout) @@ -404,12 +424,20 @@ class SbyTaskloop: sleep(0.1) self.poll_now = False + if self.jobclient: + self.jobclient.poll() + + self.procs_waiting = [] + for proc in self.procs_running: proc.poll() for proc in self.procs_pending: proc.poll() + if self.jobclient: + waiting_for_jobslots = self.jobclient.has_pending_leases() + tasks = self.tasks self.tasks = [] for task in tasks: diff --git a/sbysrc/sby_jobserver.py b/sbysrc/sby_jobserver.py new file mode 100644 index 0000000..0f359b8 --- /dev/null +++ b/sbysrc/sby_jobserver.py @@ -0,0 +1,303 @@ +# +# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows +# +# Copyright (C) 2022 Jannis Harder +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# + +import atexit +import fcntl +import os +import select +import shlex +import subprocess +import sys +import weakref +import signal + + +inherited_jobcount = None +inherited_jobserver_auth = None +inherited_jobserver_auth_present = None + +def process_jobserver_environment(): + """Process the environment looking for a make jobserver. This should be called + early (when only inherited fds are present) to reliably detect whether the jobserver + specified in the environment is accessible.""" + global inherited_jobcount + global inherited_jobserver_auth + global inherited_jobserver_auth_present + + if len(sys.argv) >= 2 and sys.argv[1] == '--jobserver-helper': + jobserver_helper(*map(int, sys.argv[2:])) + exit(0) + + inherited_jobserver_auth_present = False + + for flag in shlex.split(os.environ.get("MAKEFLAGS", "")): + if flag.startswith("-j"): + if flag == "-j": + inherited_jobcount = 0 + else: + try: + inherited_jobcount = int(flag[2:]) + except ValueError: + pass + elif flag.startswith("--jobserver-auth=") or flag.startswith("--jobserver-fds="): + inherited_jobserver_auth_present = True + arg = flag.split("=", 1)[1].split(",") + try: + jobserver_fds = int(arg[0]), int(arg[1]) + for fd in jobserver_fds: + fcntl.fcntl(fd, fcntl.F_GETFD) + except (ValueError, OSError): + pass + else: + inherited_jobserver_auth = jobserver_fds + + +def jobserver_helper(jobserver_read_fd, jobserver_write_fd, request_fd, response_fd): + """Helper process to handle blocking jobserver pipes.""" + signal.signal(signal.SIGINT, signal.SIG_IGN) + pending = 0 + while True: + try: + new_pending = len(os.read(request_fd, 1024)) + if new_pending == 0: + pending = 0 + break + else: + pending += new_pending + continue + except BlockingIOError: + if pending == 0: + select.select([request_fd], [], []) + continue + + if pending > 0: + try: + # Depending on the make version (4.3 vs 4.2) this is blocking or + # non-blocking. As this is an attribute of the pipe not the fd, we + # cannot change it without affecting other processes. Older versions of + # gnu make require this to be blocking, and produce errors if it is + # non-blocking. Newer versions of gnu make set this non-blocking, both, + # as client and as server. The documentation still says it is blocking. + # This leaves us no choice but to handle both cases, which is the reason + # we have this helper process in the first place. + token = os.read(jobserver_read_fd, 1) + except BlockingIOError: + select.select([jobserver_read_fd], [], []) + continue + + pending -= 1 + + try: + os.write(response_fd, token) + except: + os.write(jobserver_write_fd, token) + raise + os.close(jobserver_write_fd) + + +class SbyJobLease: + def __init__(self, client): + self.client = client + self.is_ready = False + self.is_done = False + + def done(self): + if self.is_ready and not self.is_done: + self.client.return_lease() + + self.is_done = True + + def __repr__(self): + return f"{self.is_ready=} {self.is_done=}" + + def __del__(self): + self.done() + + +class SbyJobServer: + def __init__(self, jobcount): + assert jobcount >= 1 + # TODO support unlimited parallelism? + self.jobcount = jobcount + if jobcount == 1: + self.read_fd, self.write_fd = None, None + self.makeflags = None + elif jobcount > 1: + self.read_fd, self.write_fd = os.pipe() + if os.getenv('SBY_BLOCKING_JOBSERVER') != '1': + os.set_blocking(self.read_fd, False) + os.write(self.write_fd, b"*" * (jobcount - 1)) + self.makeflags = f"-j{jobcount} --jobserver-auth={self.read_fd},{self.write_fd} --jobserver-fds={self.read_fd},{self.write_fd}" + + +class SbyJobClient: + def __init__(self, fallback_jobcount=None): + self.jobcount = None + self.read_fd = self.write_fd = None + self.helper_process = None + + self.local_slots = 1 + self.acquired_slots = [] + self.pending_leases = [] + + assert inherited_jobserver_auth_present is not None, "process_jobserver_environment was not called" + + have_jobserver = inherited_jobserver_auth_present + + if have_jobserver and inherited_jobserver_auth is None: + print("WARNING: Could not connect to jobserver specified in MAKEFLAGS, disabling parallel execution.") + have_jobserver = False + fallback_jobcount = 1 + + if have_jobserver: + jobcount = inherited_jobcount + elif fallback_jobcount is not None: + jobcount = fallback_jobcount + elif inherited_jobcount is not None and inherited_jobcount > 0: + jobcount = inherited_jobcount + else: + try: + jobcount = len(os.sched_getaffinity(0)) + except AttributeError: + jobcount = os.cpu_count() + + if have_jobserver: + self.read_fd, self.write_fd = inherited_jobserver_auth + else: + self.sby_jobserver = SbyJobServer(jobcount) + self.read_fd = self.sby_jobserver.read_fd + self.write_fd = self.sby_jobserver.write_fd + + self.jobcount = jobcount + + if self.read_fd is not None: + if os.get_blocking(self.read_fd): + request_read_fd, self.request_write_fd = os.pipe() + self.response_read_fd, response_write_fd = os.pipe() + os.set_blocking(self.response_read_fd, False) + os.set_blocking(request_read_fd, False) + + pass_fds = [self.read_fd, self.write_fd, request_read_fd, response_write_fd] + + self.helper_process = subprocess.Popen( + [sys.executable, sys.modules['__main__'].__file__, '--jobserver-helper', *map(str, pass_fds)], + stdin=subprocess.DEVNULL, + pass_fds=pass_fds, + ) + + os.close(request_read_fd) + os.close(response_write_fd) + + atexit.register(self.atexit_blocking) + else: + atexit.register(self.atexit_nonblocking) + + def atexit_nonblocking(self): + while self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + + def atexit_blocking(self): + # Return all slot tokens we are currently holding + while self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + + if self.helper_process: + # Closing the request pipe singals the helper that we want to exit + os.close(self.request_write_fd) + + # The helper might have been in the process of sending us some tokens, which + # we still need to return + while True: + try: + token = os.read(self.response_read_fd, 1) + except BlockingIOError: + select.select([self.response_read_fd], [], []) + continue + if not token: + break + os.write(self.write_fd, token) + os.close(self.response_read_fd) + + # Wait for the helper to exit, should be immediate at this point + self.helper_process.wait() + + def request_lease(self): + pending = SbyJobLease(self) + + if self.local_slots > 0: + self.local_slots -= 1 + pending.is_ready = True + else: + self.pending_leases.append(weakref.ref(pending)) + if self.helper_process: + os.write(self.request_write_fd, b"!") + + return pending + + def return_lease(self): + if self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + return + + if self.activate_pending_lease(): + return + + self.local_slots += 1 + + def activate_pending_lease(self): + while self.pending_leases: + pending = self.pending_leases.pop(0)() + if pending is None: + continue + pending.is_ready = True + return True + return False + + def has_pending_leases(self): + while self.pending_leases and not self.pending_leases[-1](): + self.pending_leases.pop() + return bool(self.pending_leases) + + def poll_fds(self): + if self.helper_process: + return [self.response_read_fd] + elif self.read_fd is not None: + return [self.read_fd] + else: + return [] + + def poll(self): + read_fd = self.response_read_fd if self.helper_process else self.read_fd + if read_fd is None: + return + + while self.helper_process or self.has_pending_leases(): + try: + token = os.read(read_fd, 1) + except BlockingIOError: + break + + self.got_token(token) + + def got_token(self, token): + self.acquired_slots.append(token) + + if self.activate_pending_lease(): + return + + self.return_lease() diff --git a/tests/make/required_tools.py b/tests/make/required_tools.py index ce33356..82b5f49 100644 --- a/tests/make/required_tools.py +++ b/tests/make/required_tools.py @@ -59,7 +59,7 @@ if __name__ == "__main__": exit(noskip) print(command, flush=True) - exit(subprocess.call(command, shell=True)) + exit(subprocess.call(command, shell=True, close_fds=False)) found_tools = [] check_tools = set() diff --git a/tests/make/test_rules.py b/tests/make/test_rules.py index 9607d81..64f80e1 100644 --- a/tests/make/test_rules.py +++ b/tests/make/test_rules.py @@ -82,7 +82,7 @@ with rules_file.open("w") as rules: command = f"cd {sby_dir_unix} && python3 $(SBY_MAIN) -f {sby_file.name} {task}" print( - f"\t@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}", + f"\t+@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}", file=rules, )