From de939e279a66236f91ae98d6e03e94483b0fb70a Mon Sep 17 00:00:00 2001 From: Jannis Harder Date: Wed, 6 Jul 2022 14:28:20 +0200 Subject: [PATCH 1/3] Run tasks in parallel --- sbysrc/sby.py | 99 +++++++++++++++++++++++++++++------------- sbysrc/sby_autotune.py | 9 +++- sbysrc/sby_core.py | 12 ++--- 3 files changed, 78 insertions(+), 42 deletions(-) diff --git a/sbysrc/sby.py b/sbysrc/sby.py index 0628c6d..7e55da8 100644 --- a/sbysrc/sby.py +++ b/sbysrc/sby.py @@ -19,7 +19,7 @@ import argparse, json, os, sys, shutil, tempfile, re ##yosys-sys-path## -from sby_core import SbyConfig, SbyTask, SbyAbort, process_filename +from sby_core import SbyConfig, SbyTask, SbyAbort, SbyTaskloop, process_filename import time, platform class DictAction(argparse.Action): @@ -401,7 +401,7 @@ if (workdir is not None) and (len(tasknames) != 1): print("ERROR: Exactly one task is required when workdir is specified. Specify the task or use --prefix instead of -d.", file=sys.stderr) sys.exit(1) -def run_task(taskname): +def start_task(taskloop, taskname): sbyconfig, _, _, _ = read_sbyconfig(sbydata, taskname) my_opt_tmpdir = opt_tmpdir @@ -463,48 +463,85 @@ def run_task(taskname): else: junit_filename = "junit" - task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir) + task = SbyTask(sbyconfig, my_workdir, early_logmsgs, reusedir, taskloop) for k, v in exe_paths.items(): task.exe_paths[k] = v - try: - if autotune: - import sby_autotune - sby_autotune.SbyAutotune(task, autotune_config).run() + def exit_callback(): + if not autotune and not setupmode: + task.summarize() + task.write_summary_file() + + if my_opt_tmpdir: + task.log(f"Removing directory '{my_workdir}'.") + shutil.rmtree(my_workdir, ignore_errors=True) + + if setupmode: + task.log(f"SETUP COMPLETE (rc={task.retcode})") else: - task.run(setupmode) - except SbyAbort: - if throw_err: - raise + task.log(f"DONE ({task.status}, rc={task.retcode})") + task.logfile.close() - if my_opt_tmpdir: - task.log(f"Removing directory '{my_workdir}'.") - shutil.rmtree(my_workdir, ignore_errors=True) + if not my_opt_tmpdir and not setupmode and not autotune: + with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f: + task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False) - if setupmode: - task.log(f"SETUP COMPLETE (rc={task.retcode})") - else: - task.log(f"DONE ({task.status}, rc={task.retcode})") - task.logfile.close() + with open(f"{task.workdir}/status", "w") as f: + print(f"{task.status} {task.retcode} {task.total_time}", file=f) - if not my_opt_tmpdir and not setupmode and not autotune: - with open("{}/{}.xml".format(task.workdir, junit_filename), "w") as f: - task.print_junit_result(f, junit_ts_name, junit_tc_name, junit_format_strict=False) + task.exit_callback = exit_callback - with open(f"{task.workdir}/status", "w") as f: - print(f"{task.status} {task.retcode} {task.total_time}", file=f) - - return task.retcode + if not autotune: + task.setup_procs(setupmode) + task.task_local_abort = not throw_err + return task failed = [] retcode = 0 -for task in tasknames: - task_retcode = run_task(task) - retcode |= task_retcode - if task_retcode: - failed.append(task) + +# Autotune is already parallel, parallelizing it across tasks needs some more work +sequential = autotune # TODO selection between parallel/sequential + +if sequential: + for taskname in tasknames: + taskloop = SbyTaskloop() + try: + task = start_task(taskloop, taskname) + except SbyAbort: + if throw_err: + raise + sys.exit(1) + + if autotune: + from sby_autotune import SbyAutotune + SbyAutotune(task, autotune_config).run() + elif setupmode: + task.exit_callback() + else: + taskloop.run() + retcode |= task.retcode + if task.retcode: + failed.append(taskname) +else: + taskloop = SbyTaskloop() + + tasks = {} + for taskname in tasknames: + try: + tasks[taskname] = start_task(taskloop, taskname) + except SbyAbort: + if throw_err: + raise + sys.exit(1) + + taskloop.run() + + for taskname, task in tasks.items(): + retcode |= task.retcode + if task.retcode: + failed.append(taskname) if failed and (len(tasknames) > 1 or tasknames[0] is not None): tm = time.localtime() diff --git a/sbysrc/sby_autotune.py b/sbysrc/sby_autotune.py index c7d741c..771a9a0 100644 --- a/sbysrc/sby_autotune.py +++ b/sbysrc/sby_autotune.py @@ -168,6 +168,7 @@ class SbyAutotune: """Performs automatic engine selection for a given task. """ def __init__(self, task, config_file=None): + self.task_exit_callback = task.exit_callback task.exit_callback = lambda: None task.check_timeout = lambda: None task.status = "TIMEOUT" @@ -432,6 +433,8 @@ class SbyAutotune: self.task.status = "FAIL" self.task.retcode = 2 + self.task_exit_callback() + def next_candidate(self, peek=False): # peek=True is used to check whether we need to timeout running candidates to # give other candidates a chance. @@ -635,6 +638,8 @@ class SbyAutotuneTask(SbyTask): self.model_time = 0 self.model_requests = [] + self.exit_callback = self.autotune_exit_callback + def parse_config(self, f): super().parse_config(f) @@ -650,8 +655,8 @@ class SbyAutotuneTask(SbyTask): self.log(f"using model '{model_name}'") return self.autotune.model(self, model_name) - def exit_callback(self): - super().exit_callback() + def autotune_exit_callback(self): + self.summarize() self.candidate.total_adjusted_time = int(monotonic() - self.start_clock_time + self.model_time) self.candidate.engine_retcode = self.retcode diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 366817f..f49aad0 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -437,6 +437,7 @@ class SbyTask(SbyConfig): self.precise_prop_status = False self.timeout_reached = False self.task_local_abort = False + self.exit_callback = self.summarize yosys_program_prefix = "" ##yosys-program-prefix## self.exe_paths = { @@ -795,12 +796,6 @@ class SbyTask(SbyConfig): else: assert 0 - def run(self, setupmode): - self.setup_procs(setupmode) - if not setupmode: - self.taskloop.run() - self.write_summary_file() - def handle_non_engine_options(self): with open(f"{self.workdir}/config.sby", "r") as f: self.parse_config(f) @@ -897,6 +892,8 @@ class SbyTask(SbyConfig): total_process_time = int((ru.ru_utime + ru.ru_stime) - self.start_process_time) self.total_time = total_process_time + # TODO process time is incorrect when running in parallel + self.summary = [ "Elapsed clock time [H:MM:SS (secs)]: {}:{:02d}:{:02d} ({})".format (total_clock_time // (60*60), (total_clock_time // 60) % 60, total_clock_time % 60, total_clock_time), @@ -929,9 +926,6 @@ class SbyTask(SbyConfig): for line in self.summary: print(line, file=f) - def exit_callback(self): - self.summarize() - def print_junit_result(self, f, junit_ts_name, junit_tc_name, junit_format_strict=False): junit_time = strftime('%Y-%m-%dT%H:%M:%S') if not self.design: From b0786aea434a7a5239c1f74bb898b6e2eb9064a4 Mon Sep 17 00:00:00 2001 From: Jannis Harder Date: Wed, 13 Jul 2022 15:51:26 +0200 Subject: [PATCH 2/3] Make jobserver integration Only implements the POSIX jobserver and will break on windows. Unbreaking it on windows will be done as a follow up. Not used for autotune, that needs some more changes. --- sbysrc/sby.py | 27 +++- sbysrc/sby_core.py | 32 +++- sbysrc/sby_jobserver.py | 303 +++++++++++++++++++++++++++++++++++ tests/make/required_tools.py | 2 +- tests/make/test_rules.py | 2 +- 5 files changed, 359 insertions(+), 7 deletions(-) create mode 100644 sbysrc/sby_jobserver.py diff --git a/sbysrc/sby.py b/sbysrc/sby.py index 7e55da8..63c9f33 100644 --- a/sbysrc/sby.py +++ b/sbysrc/sby.py @@ -20,8 +20,11 @@ import argparse, json, os, sys, shutil, tempfile, re ##yosys-sys-path## from sby_core import SbyConfig, SbyTask, SbyAbort, SbyTaskloop, process_filename +from sby_jobserver import SbyJobClient, process_jobserver_environment import time, platform +process_jobserver_environment() # needs to be called early + class DictAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()" @@ -46,6 +49,11 @@ parser.add_argument("-T", metavar="", action="append", dest="tasknames help="add taskname (useful when sby file is read from stdin)") parser.add_argument("-E", action="store_true", dest="throw_err", help="throw an exception (incl stack trace) for most errors") +parser.add_argument("-j", metavar="", type=int, dest="jobcount", + help="maximum number of processes to run in parallel") +parser.add_argument("--sequential", action="store_true", dest="sequential", + help="run tasks in sequence, not in parallel") + parser.add_argument("--autotune", action="store_true", dest="autotune", help="automatically find a well performing engine and engine configuration for each task") parser.add_argument("--autotune-config", dest="autotune_config", @@ -114,6 +122,8 @@ reusedir = False setupmode = args.setupmode autotune = args.autotune autotune_config = args.autotune_config +sequential = args.sequential +jobcount = args.jobcount init_config_file = args.init_config_file if sbyfile is not None: @@ -501,12 +511,22 @@ def start_task(taskloop, taskname): failed = [] retcode = 0 +if jobcount is not None and jobcount < 1: + print("ERROR: The -j option requires a positive number as argument") + sys.exit(1) + # Autotune is already parallel, parallelizing it across tasks needs some more work -sequential = autotune # TODO selection between parallel/sequential +if autotune: + sequential = True if sequential: + if autotune: + jobclient = None # TODO make autotune use a jobclient + else: + jobclient = SbyJobClient(jobcount) + for taskname in tasknames: - taskloop = SbyTaskloop() + taskloop = SbyTaskloop(jobclient) try: task = start_task(taskloop, taskname) except SbyAbort: @@ -525,7 +545,8 @@ if sequential: if task.retcode: failed.append(taskname) else: - taskloop = SbyTaskloop() + jobclient = SbyJobClient(jobcount) + taskloop = SbyTaskloop(jobclient) tasks = {} for taskname in tasknames: diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index f49aad0..1f72f56 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -82,6 +82,7 @@ class SbyProc: self.logstderr = logstderr self.silent = silent self.wait = False + self.job_lease = None self.task.update_proc_pending(self) @@ -162,6 +163,13 @@ class SbyProc: if not dep.finished: return + if self.task.taskloop.jobclient: + if self.job_lease is None: + self.job_lease = self.task.taskloop.jobclient.request_lease() + + if not self.job_lease.is_ready: + return + if not self.silent: self.task.log(f"{self.info}: starting process \"{self.cmdline}\"") @@ -190,8 +198,12 @@ class SbyProc: # The process might have written something since the last time we checked self.read_output() + if self.job_lease: + self.job_lease.done() + if not self.silent: self.task.log(f"{self.info}: finished (returncode={self.p.returncode})") + self.task.update_proc_stopped(self) self.running = False self.exited = True @@ -378,18 +390,26 @@ class SbyConfig: class SbyTaskloop: - def __init__(self): + def __init__(self, jobclient=None): self.procs_pending = [] self.procs_running = [] self.tasks = [] self.poll_now = False + self.jobclient = jobclient def run(self): for proc in self.procs_pending: proc.poll() - while len(self.procs_running) or self.poll_now: + + waiting_for_jobslots = False + if self.jobclient: + waiting_for_jobslots = self.jobclient.has_pending_leases() + + while self.procs_running or waiting_for_jobslots or self.poll_now: fds = [] + if self.jobclient: + fds.extend(self.jobclient.poll_fds()) for proc in self.procs_running: if proc.running: fds.append(proc.p.stdout) @@ -404,12 +424,20 @@ class SbyTaskloop: sleep(0.1) self.poll_now = False + if self.jobclient: + self.jobclient.poll() + + self.procs_waiting = [] + for proc in self.procs_running: proc.poll() for proc in self.procs_pending: proc.poll() + if self.jobclient: + waiting_for_jobslots = self.jobclient.has_pending_leases() + tasks = self.tasks self.tasks = [] for task in tasks: diff --git a/sbysrc/sby_jobserver.py b/sbysrc/sby_jobserver.py new file mode 100644 index 0000000..0f359b8 --- /dev/null +++ b/sbysrc/sby_jobserver.py @@ -0,0 +1,303 @@ +# +# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows +# +# Copyright (C) 2022 Jannis Harder +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# + +import atexit +import fcntl +import os +import select +import shlex +import subprocess +import sys +import weakref +import signal + + +inherited_jobcount = None +inherited_jobserver_auth = None +inherited_jobserver_auth_present = None + +def process_jobserver_environment(): + """Process the environment looking for a make jobserver. This should be called + early (when only inherited fds are present) to reliably detect whether the jobserver + specified in the environment is accessible.""" + global inherited_jobcount + global inherited_jobserver_auth + global inherited_jobserver_auth_present + + if len(sys.argv) >= 2 and sys.argv[1] == '--jobserver-helper': + jobserver_helper(*map(int, sys.argv[2:])) + exit(0) + + inherited_jobserver_auth_present = False + + for flag in shlex.split(os.environ.get("MAKEFLAGS", "")): + if flag.startswith("-j"): + if flag == "-j": + inherited_jobcount = 0 + else: + try: + inherited_jobcount = int(flag[2:]) + except ValueError: + pass + elif flag.startswith("--jobserver-auth=") or flag.startswith("--jobserver-fds="): + inherited_jobserver_auth_present = True + arg = flag.split("=", 1)[1].split(",") + try: + jobserver_fds = int(arg[0]), int(arg[1]) + for fd in jobserver_fds: + fcntl.fcntl(fd, fcntl.F_GETFD) + except (ValueError, OSError): + pass + else: + inherited_jobserver_auth = jobserver_fds + + +def jobserver_helper(jobserver_read_fd, jobserver_write_fd, request_fd, response_fd): + """Helper process to handle blocking jobserver pipes.""" + signal.signal(signal.SIGINT, signal.SIG_IGN) + pending = 0 + while True: + try: + new_pending = len(os.read(request_fd, 1024)) + if new_pending == 0: + pending = 0 + break + else: + pending += new_pending + continue + except BlockingIOError: + if pending == 0: + select.select([request_fd], [], []) + continue + + if pending > 0: + try: + # Depending on the make version (4.3 vs 4.2) this is blocking or + # non-blocking. As this is an attribute of the pipe not the fd, we + # cannot change it without affecting other processes. Older versions of + # gnu make require this to be blocking, and produce errors if it is + # non-blocking. Newer versions of gnu make set this non-blocking, both, + # as client and as server. The documentation still says it is blocking. + # This leaves us no choice but to handle both cases, which is the reason + # we have this helper process in the first place. + token = os.read(jobserver_read_fd, 1) + except BlockingIOError: + select.select([jobserver_read_fd], [], []) + continue + + pending -= 1 + + try: + os.write(response_fd, token) + except: + os.write(jobserver_write_fd, token) + raise + os.close(jobserver_write_fd) + + +class SbyJobLease: + def __init__(self, client): + self.client = client + self.is_ready = False + self.is_done = False + + def done(self): + if self.is_ready and not self.is_done: + self.client.return_lease() + + self.is_done = True + + def __repr__(self): + return f"{self.is_ready=} {self.is_done=}" + + def __del__(self): + self.done() + + +class SbyJobServer: + def __init__(self, jobcount): + assert jobcount >= 1 + # TODO support unlimited parallelism? + self.jobcount = jobcount + if jobcount == 1: + self.read_fd, self.write_fd = None, None + self.makeflags = None + elif jobcount > 1: + self.read_fd, self.write_fd = os.pipe() + if os.getenv('SBY_BLOCKING_JOBSERVER') != '1': + os.set_blocking(self.read_fd, False) + os.write(self.write_fd, b"*" * (jobcount - 1)) + self.makeflags = f"-j{jobcount} --jobserver-auth={self.read_fd},{self.write_fd} --jobserver-fds={self.read_fd},{self.write_fd}" + + +class SbyJobClient: + def __init__(self, fallback_jobcount=None): + self.jobcount = None + self.read_fd = self.write_fd = None + self.helper_process = None + + self.local_slots = 1 + self.acquired_slots = [] + self.pending_leases = [] + + assert inherited_jobserver_auth_present is not None, "process_jobserver_environment was not called" + + have_jobserver = inherited_jobserver_auth_present + + if have_jobserver and inherited_jobserver_auth is None: + print("WARNING: Could not connect to jobserver specified in MAKEFLAGS, disabling parallel execution.") + have_jobserver = False + fallback_jobcount = 1 + + if have_jobserver: + jobcount = inherited_jobcount + elif fallback_jobcount is not None: + jobcount = fallback_jobcount + elif inherited_jobcount is not None and inherited_jobcount > 0: + jobcount = inherited_jobcount + else: + try: + jobcount = len(os.sched_getaffinity(0)) + except AttributeError: + jobcount = os.cpu_count() + + if have_jobserver: + self.read_fd, self.write_fd = inherited_jobserver_auth + else: + self.sby_jobserver = SbyJobServer(jobcount) + self.read_fd = self.sby_jobserver.read_fd + self.write_fd = self.sby_jobserver.write_fd + + self.jobcount = jobcount + + if self.read_fd is not None: + if os.get_blocking(self.read_fd): + request_read_fd, self.request_write_fd = os.pipe() + self.response_read_fd, response_write_fd = os.pipe() + os.set_blocking(self.response_read_fd, False) + os.set_blocking(request_read_fd, False) + + pass_fds = [self.read_fd, self.write_fd, request_read_fd, response_write_fd] + + self.helper_process = subprocess.Popen( + [sys.executable, sys.modules['__main__'].__file__, '--jobserver-helper', *map(str, pass_fds)], + stdin=subprocess.DEVNULL, + pass_fds=pass_fds, + ) + + os.close(request_read_fd) + os.close(response_write_fd) + + atexit.register(self.atexit_blocking) + else: + atexit.register(self.atexit_nonblocking) + + def atexit_nonblocking(self): + while self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + + def atexit_blocking(self): + # Return all slot tokens we are currently holding + while self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + + if self.helper_process: + # Closing the request pipe singals the helper that we want to exit + os.close(self.request_write_fd) + + # The helper might have been in the process of sending us some tokens, which + # we still need to return + while True: + try: + token = os.read(self.response_read_fd, 1) + except BlockingIOError: + select.select([self.response_read_fd], [], []) + continue + if not token: + break + os.write(self.write_fd, token) + os.close(self.response_read_fd) + + # Wait for the helper to exit, should be immediate at this point + self.helper_process.wait() + + def request_lease(self): + pending = SbyJobLease(self) + + if self.local_slots > 0: + self.local_slots -= 1 + pending.is_ready = True + else: + self.pending_leases.append(weakref.ref(pending)) + if self.helper_process: + os.write(self.request_write_fd, b"!") + + return pending + + def return_lease(self): + if self.acquired_slots: + os.write(self.write_fd, self.acquired_slots.pop()) + return + + if self.activate_pending_lease(): + return + + self.local_slots += 1 + + def activate_pending_lease(self): + while self.pending_leases: + pending = self.pending_leases.pop(0)() + if pending is None: + continue + pending.is_ready = True + return True + return False + + def has_pending_leases(self): + while self.pending_leases and not self.pending_leases[-1](): + self.pending_leases.pop() + return bool(self.pending_leases) + + def poll_fds(self): + if self.helper_process: + return [self.response_read_fd] + elif self.read_fd is not None: + return [self.read_fd] + else: + return [] + + def poll(self): + read_fd = self.response_read_fd if self.helper_process else self.read_fd + if read_fd is None: + return + + while self.helper_process or self.has_pending_leases(): + try: + token = os.read(read_fd, 1) + except BlockingIOError: + break + + self.got_token(token) + + def got_token(self, token): + self.acquired_slots.append(token) + + if self.activate_pending_lease(): + return + + self.return_lease() diff --git a/tests/make/required_tools.py b/tests/make/required_tools.py index ce33356..82b5f49 100644 --- a/tests/make/required_tools.py +++ b/tests/make/required_tools.py @@ -59,7 +59,7 @@ if __name__ == "__main__": exit(noskip) print(command, flush=True) - exit(subprocess.call(command, shell=True)) + exit(subprocess.call(command, shell=True, close_fds=False)) found_tools = [] check_tools = set() diff --git a/tests/make/test_rules.py b/tests/make/test_rules.py index 9607d81..64f80e1 100644 --- a/tests/make/test_rules.py +++ b/tests/make/test_rules.py @@ -82,7 +82,7 @@ with rules_file.open("w") as rules: command = f"cd {sby_dir_unix} && python3 $(SBY_MAIN) -f {sby_file.name} {task}" print( - f"\t@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}", + f"\t+@python3 make/required_tools.py run {target} {shlex.quote(command)} {shlex.join(required_tools)}", file=rules, ) From e91977e01ef00454438b5b6739c2035f5438f93b Mon Sep 17 00:00:00 2001 From: Jannis Harder Date: Wed, 13 Jul 2022 18:14:19 +0200 Subject: [PATCH 3/3] Use local jobslots as fallback on Windows. As we have no make jobserver support on windows, fallback to using process local slots to limit prallelism. --- sbysrc/sby_jobserver.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/sbysrc/sby_jobserver.py b/sbysrc/sby_jobserver.py index 0f359b8..104bcc3 100644 --- a/sbysrc/sby_jobserver.py +++ b/sbysrc/sby_jobserver.py @@ -17,7 +17,6 @@ # import atexit -import fcntl import os import select import shlex @@ -26,6 +25,8 @@ import sys import weakref import signal +if os.name == "posix": + import fcntl inherited_jobcount = None inherited_jobserver_auth = None @@ -56,15 +57,16 @@ def process_jobserver_environment(): pass elif flag.startswith("--jobserver-auth=") or flag.startswith("--jobserver-fds="): inherited_jobserver_auth_present = True - arg = flag.split("=", 1)[1].split(",") - try: - jobserver_fds = int(arg[0]), int(arg[1]) - for fd in jobserver_fds: - fcntl.fcntl(fd, fcntl.F_GETFD) - except (ValueError, OSError): - pass - else: - inherited_jobserver_auth = jobserver_fds + if os.name == "posix": + arg = flag.split("=", 1)[1].split(",") + try: + jobserver_fds = int(arg[0]), int(arg[1]) + for fd in jobserver_fds: + fcntl.fcntl(fd, fcntl.F_GETFD) + except (ValueError, OSError): + pass + else: + inherited_jobserver_auth = jobserver_fds def jobserver_helper(jobserver_read_fd, jobserver_write_fd, request_fd, response_fd): @@ -159,6 +161,12 @@ class SbyJobClient: have_jobserver = inherited_jobserver_auth_present + if os.name == "nt" and inherited_jobserver_auth_present: + # There are even more incompatible variants of the make jobserver on + # windows, none of them are supported for now. + print("WARNING: Found jobserver in MAKEFLAGS, this is not supported on windows.") + have_jobserver = False + if have_jobserver and inherited_jobserver_auth is None: print("WARNING: Could not connect to jobserver specified in MAKEFLAGS, disabling parallel execution.") have_jobserver = False @@ -178,6 +186,9 @@ class SbyJobClient: if have_jobserver: self.read_fd, self.write_fd = inherited_jobserver_auth + elif os.name == "nt": + # On Windows, without a jobserver, use only local slots + self.local_slots = jobcount else: self.sby_jobserver = SbyJobServer(jobcount) self.read_fd = self.sby_jobserver.read_fd