3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-04-04 13:54:07 +00:00
sby/sbysrc/sby_jobserver.py
Jannis Harder edbc0548af Fix deadlock with parallel SBY procs each with parallel tasks
When multiple SBY processes run in parallel (from a Makefile or other
job-server aware tool) and each SBY process runs tasks in parallel, each
with enough tasks to be limited by the total job count, it is possible
for the processes to race in such a way that every SBY process's helper
process is in a blocking read from the job-server but a job-token would
only become available as soon as any SBY process exits.

In that situation SBY doesn't actually need the job-token anymore and
only previously requested it as there was opportunity for parallelism.
It would immediatly return the token as soon as it is acquired. That's
usually sufficient to deal with no-longer-needed-but-requested tokens,
but when SBY is done, it needs to return the job-token held by the
parent process ASAP which it can only do by actually exiting, so we need
to interrupt the blocking read of SBY's helper process.

This could be done by sending a signal to the helper process, except
that Python made the decision in 3.5 to have automatic EINTR retry loops
around most system calls with no opt-out. That was part of the reason to
go with this specifc helper process design that avoids interrupting a
blocking read in the first place.

Using an exception raised from the signal handler instead might lose a
token when the signal arrives after the read returns, but before the
token is stored in a variable. You cannot recover from a lost token in
the context of the job-server protocol, so that's not an option. (This
can't happen with recent Python versions but that would depend on
undocumented behavior that could plausibly change again.)

Thankfully the only case where we need to interrupt the read is when SBY
is about to exit and will not request any further tokens. This allows us
to use a signal handler that uses dup2 to close and replace the
read-from fd with one that already is at EOF, making the next retry
return immediatly. (If we'd need to interrupt a read and continue
running we could also do this but the fd shuffling would be more
involved.)
2023-07-17 15:29:17 +02:00

339 lines
12 KiB
Python

#
# SymbiYosys (sby) -- Front-end for Yosys-based formal verification flows
#
# Copyright (C) 2022 Jannis Harder <jix@yosyshq.com> <me@jix.one>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import atexit
import os
import select
import shlex
import subprocess
import sys
import weakref
import signal
if os.name == "posix":
import fcntl
inherited_jobcount = None
inherited_jobserver_auth = None
inherited_jobserver_auth_present = None
def process_jobserver_environment():
"""Process the environment looking for a make jobserver. This should be called
early (when only inherited fds are present) to reliably detect whether the jobserver
specified in the environment is accessible."""
global inherited_jobcount
global inherited_jobserver_auth
global inherited_jobserver_auth_present
if len(sys.argv) >= 2 and sys.argv[1] == '--jobserver-helper':
jobserver_helper(*map(int, sys.argv[2:]))
exit(0)
inherited_jobserver_auth_present = False
for flag in shlex.split(os.environ.get("MAKEFLAGS", "")):
if flag.startswith("-j"):
if flag == "-j":
inherited_jobcount = 0
else:
try:
inherited_jobcount = int(flag[2:])
except ValueError:
pass
elif flag.startswith("--jobserver-auth=") or flag.startswith("--jobserver-fds="):
inherited_jobserver_auth_present = True
if os.name == "posix":
arg = flag.split("=", 1)[1]
if arg.startswith("fifo:"):
try:
fd = os.open(arg[5:], os.O_RDWR)
except FileNotFoundError:
pass
else:
inherited_jobserver_auth = fd, fd
else:
arg = arg.split(",")
try:
jobserver_fds = int(arg[0]), int(arg[1])
for fd in jobserver_fds:
fcntl.fcntl(fd, fcntl.F_GETFD)
except (ValueError, OSError):
pass
else:
inherited_jobserver_auth = jobserver_fds
def jobserver_helper(jobserver_read_fd, jobserver_write_fd, request_fd, response_fd):
"""Helper process to handle blocking jobserver pipes."""
def handle_sigusr1(*args):
# Since Python doesn't allow user code to handle EINTR anymore, we replace the
# jobserver fd with an fd at EOF to interrupt a blocking read in a way that
# cannot lose any read data
r, w = os.pipe()
os.close(w)
os.dup2(r, jobserver_read_fd)
os.close(r)
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGUSR1, handle_sigusr1)
pending = 0
while True:
try:
new_pending = len(os.read(request_fd, 1024))
if new_pending == 0:
pending = 0
break
else:
pending += new_pending
continue
except BlockingIOError:
if pending == 0:
select.select([request_fd], [], [])
continue
if pending > 0:
try:
# Depending on the make version (4.3 vs 4.2) this is blocking or
# non-blocking. As this is an attribute of the pipe not the fd, we
# cannot change it without affecting other processes. Older versions of
# gnu make require this to be blocking, and produce errors if it is
# non-blocking. Newer versions of gnu make set this non-blocking, both,
# as client and as server. The documentation still says it is blocking.
# This leaves us no choice but to handle both cases, which is the reason
# we have this helper process in the first place.
token = os.read(jobserver_read_fd, 1)
except BlockingIOError:
select.select([jobserver_read_fd], [], [])
continue
if not token:
break
pending -= 1
try:
os.write(response_fd, token)
except:
os.write(jobserver_write_fd, token)
raise
os.close(jobserver_write_fd)
class SbyJobLease:
def __init__(self, client):
self.client = client
self.is_ready = False
self.is_done = False
def done(self):
if self.is_ready and not self.is_done:
self.client.return_lease()
self.is_done = True
def __repr__(self):
return f"is_ready={self.is_ready} is_done={self.is_done}"
def __del__(self):
self.done()
class SbyJobServer:
def __init__(self, jobcount):
assert jobcount >= 1
# TODO support unlimited parallelism?
self.jobcount = jobcount
if jobcount == 1:
self.read_fd, self.write_fd = None, None
self.makeflags = None
elif jobcount > 1:
self.read_fd, self.write_fd = os.pipe()
if os.getenv('SBY_BLOCKING_JOBSERVER') != '1':
os.set_blocking(self.read_fd, False)
os.write(self.write_fd, b"*" * (jobcount - 1))
self.makeflags = f"-j{jobcount} --jobserver-auth={self.read_fd},{self.write_fd} --jobserver-fds={self.read_fd},{self.write_fd}"
class SbyJobClient:
def __init__(self, fallback_jobcount=None):
self.jobcount = None
self.read_fd = self.write_fd = None
self.helper_process = None
self.local_slots = 1
self.acquired_slots = []
self.pending_leases = []
assert inherited_jobserver_auth_present is not None, "process_jobserver_environment was not called"
have_jobserver = inherited_jobserver_auth_present
if os.name == "nt" and inherited_jobserver_auth_present:
# There are even more incompatible variants of the make jobserver on
# windows, none of them are supported for now.
print("WARNING: Found jobserver in MAKEFLAGS, this is not supported on windows.")
have_jobserver = False
if have_jobserver and inherited_jobserver_auth is None:
print("WARNING: Could not connect to jobserver specified in MAKEFLAGS, disabling parallel execution.")
have_jobserver = False
fallback_jobcount = 1
if have_jobserver:
jobcount = inherited_jobcount
elif fallback_jobcount is not None:
jobcount = fallback_jobcount
elif inherited_jobcount is not None and inherited_jobcount > 0:
jobcount = inherited_jobcount
else:
try:
jobcount = len(os.sched_getaffinity(0))
except AttributeError:
jobcount = os.cpu_count()
if have_jobserver:
self.read_fd, self.write_fd = inherited_jobserver_auth
elif os.name == "nt":
# On Windows, without a jobserver, use only local slots
self.local_slots = jobcount
else:
self.sby_jobserver = SbyJobServer(jobcount)
self.read_fd = self.sby_jobserver.read_fd
self.write_fd = self.sby_jobserver.write_fd
self.jobcount = jobcount
if self.read_fd is not None:
if os.get_blocking(self.read_fd):
request_read_fd, self.request_write_fd = os.pipe()
self.response_read_fd, response_write_fd = os.pipe()
os.set_blocking(self.response_read_fd, False)
os.set_blocking(request_read_fd, False)
pass_fds = [self.read_fd, self.write_fd, request_read_fd, response_write_fd]
self.helper_process = subprocess.Popen(
[sys.executable, sys.modules['__main__'].__file__, '--jobserver-helper', *map(str, pass_fds)],
stdin=subprocess.DEVNULL,
pass_fds=pass_fds,
)
os.close(request_read_fd)
os.close(response_write_fd)
atexit.register(self.atexit_blocking)
else:
atexit.register(self.atexit_nonblocking)
def atexit_nonblocking(self):
while self.acquired_slots:
os.write(self.write_fd, self.acquired_slots.pop())
def atexit_blocking(self):
# Return all slot tokens we are currently holding
while self.acquired_slots:
os.write(self.write_fd, self.acquired_slots.pop())
if self.helper_process:
# Closing the request pipe singals the helper that we want to exit
os.close(self.request_write_fd)
# Additionally we send a signal to interrupt a blocking read within the
# helper
self.helper_process.send_signal(signal.SIGUSR1)
# The helper might have been in the process of sending us some tokens, which
# we still need to return
while True:
try:
token = os.read(self.response_read_fd, 1)
except BlockingIOError:
select.select([self.response_read_fd], [], [])
continue
if not token:
break
os.write(self.write_fd, token)
os.close(self.response_read_fd)
# Wait for the helper to exit, should be immediate at this point
self.helper_process.wait()
def request_lease(self):
pending = SbyJobLease(self)
if self.local_slots > 0:
self.local_slots -= 1
pending.is_ready = True
else:
self.pending_leases.append(weakref.ref(pending))
if self.helper_process:
os.write(self.request_write_fd, b"!")
return pending
def return_lease(self):
if self.acquired_slots:
os.write(self.write_fd, self.acquired_slots.pop())
return
if self.activate_pending_lease():
return
self.local_slots += 1
def activate_pending_lease(self):
while self.pending_leases:
pending = self.pending_leases.pop(0)()
if pending is None:
continue
pending.is_ready = True
return True
return False
def has_pending_leases(self):
while self.pending_leases and not self.pending_leases[-1]():
self.pending_leases.pop()
return bool(self.pending_leases)
def poll_fds(self):
if self.helper_process:
return [self.response_read_fd]
elif self.read_fd is not None and self.has_pending_leases():
return [self.read_fd]
else:
return []
def poll(self):
read_fd = self.response_read_fd if self.helper_process else self.read_fd
if read_fd is None:
return
while self.helper_process or self.has_pending_leases():
try:
token = os.read(read_fd, 1)
except BlockingIOError:
break
self.got_token(token)
def got_token(self, token):
self.acquired_slots.append(token)
if self.activate_pending_lease():
return
self.return_lease()