3
0
Fork 0
mirror of https://github.com/YosysHQ/sby.git synced 2025-09-01 09:10:42 +00:00

Begin converting taskloop to use asyncio module.

Signed-off-by: William D. Jones <thor0505@comcast.net>
This commit is contained in:
William D. Jones 2019-03-27 21:32:29 -04:00 committed by Ed Bordin
parent 7bae1b8bba
commit 118280191a

View file

@ -20,6 +20,7 @@ import os, re, sys, signal
if os.name == "posix": if os.name == "posix":
import resource, fcntl import resource, fcntl
import subprocess import subprocess
import asyncio
from shutil import copyfile, rmtree from shutil import copyfile, rmtree
from select import select from select import select
from time import time, localtime, sleep from time import time, localtime, sleep
@ -200,6 +201,56 @@ class SbyTask:
next_task.poll() next_task.poll()
return return
async def output_async(self):
while True:
outs = await self.p.stdout.readline()
await asyncio.sleep(0) # https://bugs.python.org/issue24532
outs = outs.decode("utf-8")
if len(outs) == 0: break
if outs[-1] != '\n':
self.linebuffer += outs
break
outs = (self.linebuffer + outs).strip()
self.linebuffer = ""
self.handle_output(outs)
async def maybe_spawn_async(self):
if self.finished or self.terminated:
return
if not self.running:
for dep in self.deps:
if not dep.finished:
return
self.job.log("%s: starting process \"%s\"" % (self.info, self.cmdline))
self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=(asyncio.subprocess.STDOUT if self.logstderr else None))
self.job.tasks_pending.remove(self)
self.job.tasks_running.append(self)
self.running = True
asyncio.ensure_future(self.output_async())
self.fut = asyncio.ensure_future(self.p.wait())
async def shutdown_and_notify_async(self):
self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode))
self.job.tasks_running.remove(self)
self.running = False
self.handle_exit(self.p.returncode)
if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
self.job.log("%s: job failed. ERROR." % self.info)
self.terminated = True
self.job.terminate()
return
self.finished = True
for next_task in self.notify:
await next_task.maybe_spawn_async()
return
class SbyAbort(BaseException): class SbyAbort(BaseException):
pass pass
@ -283,6 +334,41 @@ class SbyJob:
self.status = "TIMEOUT" self.status = "TIMEOUT"
self.terminate(timeout=True) self.terminate(timeout=True)
def taskloop_async(self):
if os.name != "posix":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.set_debug(enabled=True)
poll_fut = asyncio.ensure_future(self.task_poller())
loop.run_until_complete(poll_fut)
async def task_poller(self):
for task in self.tasks_pending:
await task.maybe_spawn_async()
while len(self.tasks_running):
task_futs = []
for task in self.tasks_running:
if task.running:
task_futs.append(task.fut)
(done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED)
for task in self.tasks_running:
if task.fut in done:
await task.shutdown_and_notify_async()
# for task in self.tasks_pending:
# await task.poll_async()
#if self.opt_timeout is not None:
#total_clock_time = int(time() - self.start_clock_time)
#if total_clock_time > self.opt_timeout:
#self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout)
#self.status = "TIMEOUT"
#self.terminate(timeout=True)
def log(self, logmessage): def log(self, logmessage):
tm = localtime() tm = localtime()
print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True) print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True)
@ -655,7 +741,9 @@ class SbyJob:
if opt not in self.used_options: if opt not in self.used_options:
self.error("Unused option: {}".format(opt)) self.error("Unused option: {}".format(opt))
self.taskloop() # self.taskloop()
self.taskloop_async()
total_clock_time = int(time() - self.start_clock_time) total_clock_time = int(time() - self.start_clock_time)