diff options
| -rw-r--r-- | meta/lib/oeqa/utils/commands.py | 107 | 
1 files changed, 85 insertions, 22 deletions
| diff --git a/meta/lib/oeqa/utils/commands.py b/meta/lib/oeqa/utils/commands.py index 57286fcb10..5e5345434d 100644 --- a/meta/lib/oeqa/utils/commands.py +++ b/meta/lib/oeqa/utils/commands.py @@ -13,6 +13,7 @@ import sys  import signal  import subprocess  import threading +import time  import logging  from oeqa.utils import CommandError  from oeqa.utils import ftools @@ -25,7 +26,7 @@ except ImportError:      pass  class Command(object): -    def __init__(self, command, bg=False, timeout=None, data=None, **options): +    def __init__(self, command, bg=False, timeout=None, data=None, output_log=None, **options):          self.defaultopts = {              "stdout": subprocess.PIPE, @@ -48,41 +49,103 @@ class Command(object):          self.options.update(options)          self.status = None +        # We collect chunks of output before joining them at the end. +        self._output_chunks = [] +        self._error_chunks = []          self.output = None          self.error = None -        self.thread = None +        self.threads = [] +        self.output_log = output_log          self.log = logging.getLogger("utils.commands")      def run(self):          self.process = subprocess.Popen(self.cmd, **self.options) -        def commThread(): -            self.output, self.error = self.process.communicate(self.data) - -        self.thread = threading.Thread(target=commThread) -        self.thread.start() +        def readThread(output, stream, logfunc): +            if logfunc: +                for line in stream: +                    output.append(line) +                    logfunc(line.decode("utf-8", errors='replace').rstrip()) +            else: +                output.append(stream.read()) + +        def readStderrThread(): +            readThread(self._error_chunks, self.process.stderr, self.output_log.error if self.output_log else None) + +        def readStdoutThread(): +            readThread(self._output_chunks, self.process.stdout, self.output_log.info if self.output_log else None) + +        def writeThread(): +            try: +                self.process.stdin.write(self.data) +                self.process.stdin.close() +            except OSError as ex: +                # It's not an error when the command does not consume all +                # of our data. subprocess.communicate() also ignores that. +                if ex.errno != EPIPE: +                    raise + +        # We write in a separate thread because then we can read +        # without worrying about deadlocks. The additional thread is +        # expected to terminate by itself and we mark it as a daemon, +        # so even it should happen to not terminate for whatever +        # reason, the main process will still exit, which will then +        # kill the write thread. +        if self.data: +            threading.Thread(target=writeThread, daemon=True).start() +        if self.process.stderr: +            thread = threading.Thread(target=readStderrThread) +            thread.start() +            self.threads.append(thread) +        if self.output_log: +            self.output_log.info('Running: %s' % self.cmd) +        thread = threading.Thread(target=readStdoutThread) +        thread.start() +        self.threads.append(thread)          self.log.debug("Running command '%s'" % self.cmd)          if not self.bg: -            self.thread.join(self.timeout) +            if self.timeout is None: +                for thread in self.threads: +                    thread.join() +            else: +                deadline = time.time() + self.timeout +                for thread in self.threads: +                    timeout = deadline - time.time()  +                    if timeout < 0: +                        timeout = 0 +                    thread.join(timeout)              self.stop()      def stop(self): -        if self.thread.isAlive(): -            self.process.terminate() +        for thread in self.threads: +            if thread.isAlive(): +                self.process.terminate()              # let's give it more time to terminate gracefully before killing it -            self.thread.join(5) -            if self.thread.isAlive(): +            thread.join(5) +            if thread.isAlive():                  self.process.kill() -                self.thread.join() +                thread.join() -        if not self.output: -            self.output = "" -        else: -            self.output = self.output.decode("utf-8", errors='replace').rstrip() -        self.status = self.process.poll() +        def finalize_output(data): +            if not data: +                data = "" +            else: +                data = b"".join(data) +                data = data.decode("utf-8", errors='replace').rstrip() +            return data + +        self.output = finalize_output(self._output_chunks) +        self._output_chunks = None +        # self.error used to be a byte string earlier, probably unintentionally. +        # Now it is a normal string, just like self.output. +        self.error = finalize_output(self._error_chunks) +        self._error_chunks = None +        # At this point we know that the process has closed stdout/stderr, so +        # it is safe and necessary to wait for the actual process completion. +        self.status = self.process.wait()          self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status))          # logging the complete output is insane @@ -98,7 +161,7 @@ class Result(object):  def runCmd(command, ignore_status=False, timeout=None, assert_error=True, -          native_sysroot=None, limit_exc_output=0, **options): +          native_sysroot=None, limit_exc_output=0, output_log=None, **options):      result = Result()      if native_sysroot: @@ -108,7 +171,7 @@ def runCmd(command, ignore_status=False, timeout=None, assert_error=True,          nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '')          options['env'] = nenv -    cmd = Command(command, timeout=timeout, **options) +    cmd = Command(command, timeout=timeout, output_log=output_log, **options)      cmd.run()      result.command = command @@ -132,7 +195,7 @@ def runCmd(command, ignore_status=False, timeout=None, assert_error=True,      return result -def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **options): +def bitbake(command, ignore_status=False, timeout=None, postconfig=None, output_log=None, **options):      if postconfig:          postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf') @@ -147,7 +210,7 @@ def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **optio          cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]      try: -        return runCmd(cmd, ignore_status, timeout, **options) +        return runCmd(cmd, ignore_status, timeout, output_log=output_log, **options)      finally:          if postconfig:              os.remove(postconfig_file) | 
