diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 4ec2ab3..3c9c1f9 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -20,6 +20,7 @@ import os, re, sys, signal if os.name == "posix": import resource, fcntl import subprocess +import asyncio from shutil import copyfile, rmtree from select import select from time import time, localtime, sleep @@ -200,6 +201,56 @@ class SbyTask: next_task.poll() return + async def output_async(self): + while True: + outs = await self.p.stdout.readline() + await asyncio.sleep(0) # https://bugs.python.org/issue24532 + outs = outs.decode("utf-8") + if len(outs) == 0: break + if outs[-1] != '\n': + self.linebuffer += outs + break + outs = (self.linebuffer + outs).strip() + self.linebuffer = "" + self.handle_output(outs) + + async def maybe_spawn_async(self): + if self.finished or self.terminated: + return + + if not self.running: + for dep in self.deps: + if not dep.finished: + return + + self.job.log("%s: starting process \"%s\"" % (self.info, self.cmdline)) + self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=(asyncio.subprocess.STDOUT if self.logstderr else None)) + self.job.tasks_pending.remove(self) + self.job.tasks_running.append(self) + self.running = True + asyncio.ensure_future(self.output_async()) + self.fut = asyncio.ensure_future(self.p.wait()) + + async def shutdown_and_notify_async(self): + self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode)) + self.job.tasks_running.remove(self) + self.running = False + + self.handle_exit(self.p.returncode) + + if self.checkretcode and self.p.returncode != 0: + self.job.status = "ERROR" + self.job.log("%s: job failed. ERROR." % self.info) + self.terminated = True + self.job.terminate() + return + + self.finished = True + for next_task in self.notify: + await next_task.maybe_spawn_async() + return class SbyAbort(BaseException): pass @@ -283,6 +334,41 @@ class SbyJob: self.status = "TIMEOUT" self.terminate(timeout=True) + def taskloop_async(self): + if os.name != "posix": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() + loop.set_debug(enabled=True) + poll_fut = asyncio.ensure_future(self.task_poller()) + loop.run_until_complete(poll_fut) + + async def task_poller(self): + for task in self.tasks_pending: + await task.maybe_spawn_async() + + while len(self.tasks_running): + task_futs = [] + for task in self.tasks_running: + if task.running: + task_futs.append(task.fut) + (done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED) + + for task in self.tasks_running: + if task.fut in done: + await task.shutdown_and_notify_async() + + # for task in self.tasks_pending: + # await task.poll_async() + + #if self.opt_timeout is not None: + #total_clock_time = int(time() - self.start_clock_time) + #if total_clock_time > self.opt_timeout: + #self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout) + #self.status = "TIMEOUT" + #self.terminate(timeout=True) + + def log(self, logmessage): tm = localtime() print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True) @@ -655,7 +741,9 @@ class SbyJob: if opt not in self.used_options: self.error("Unused option: {}".format(opt)) - self.taskloop() + # self.taskloop() + self.taskloop_async() + total_clock_time = int(time() - self.start_clock_time)