diff options
Diffstat (limited to 'meta/lib')
-rw-r--r-- | meta/lib/oe/utils.py | 70 | ||||
-rw-r--r-- | meta/lib/oeqa/selftest/cases/oelib/utils.py | 46 |
2 files changed, 115 insertions, 1 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 6aed6dc993..753b577555 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py @@ -1,4 +1,6 @@ import subprocess +import multiprocessing +import traceback def read_file(filename): try: @@ -280,6 +282,74 @@ def multiprocess_exec(commands, function): return results +# For each item in items, call the function 'target' with item as the first +# argument, extraargs as the other arguments and handle any exceptions in the +# parent thread +def multiprocess_launch(target, items, d, extraargs=None): + + class ProcessLaunch(multiprocessing.Process): + def __init__(self, *args, **kwargs): + multiprocessing.Process.__init__(self, *args, **kwargs) + self._pconn, self._cconn = multiprocessing.Pipe() + self._exception = None + self._result = None + + def run(self): + try: + ret = self._target(*self._args, **self._kwargs) + self._cconn.send((None, ret)) + except Exception as e: + tb = traceback.format_exc() + self._cconn.send((e, tb)) + + def update(self): + if self._pconn.poll(): + (e, tb) = self._pconn.recv() + if e is not None: + self._exception = (e, tb) + else: + self._result = tb + + @property + def exception(self): + self.update() + return self._exception + + @property + def result(self): + self.update() + return self._result + + max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1) + launched = [] + errors = [] + results = [] + items = list(items) + while (items and not errors) or launched: + if not errors and items and len(launched) < max_process: + args = (items.pop(),) + if extraargs is not None: + args = args + extraargs + p = ProcessLaunch(target=target, args=args) + p.start() + launched.append(p) + for q in launched: + # The finished processes are joined when calling is_alive() + if not q.is_alive(): + if q.exception: + errors.append(q.exception) + if q.result: + results.append(q.result) + launched.remove(q) + # Paranoia doesn't hurt + for p in launched: + p.join() + if errors: + for (e, tb) in errors: + bb.error(str(tb)) + bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above") + return results + def squashspaces(string): import re return re.sub("\s+", " ", string).strip() diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py index 9fb6c1576e..275aeda74e 100644 --- a/meta/lib/oeqa/selftest/cases/oelib/utils.py +++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py @@ -1,5 +1,8 @@ +import sys from unittest.case import TestCase -from oe.utils import packages_filter_out_system, trim_version +from contextlib import contextmanager +from io import StringIO +from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch class TestPackagesFilterOutSystem(TestCase): def test_filter(self): @@ -49,3 +52,44 @@ class TestTrimVersion(TestCase): self.assertEqual(trim_version("1.2.3", 2), "1.2") self.assertEqual(trim_version("1.2.3", 3), "1.2.3") self.assertEqual(trim_version("1.2.3", 4), "1.2.3") + + +class TestMultiprocessLaunch(TestCase): + + def test_multiprocesslaunch(self): + import bb + + def testfunction(item, d): + if item == "2" or item == "1": + raise KeyError("Invalid number %s" % item) + return "Found %s" % item + + def dummyerror(msg): + print("ERROR: %s" % msg) + + @contextmanager + def captured_output(): + new_out, new_err = StringIO(), StringIO() + old_out, old_err = sys.stdout, sys.stderr + try: + sys.stdout, sys.stderr = new_out, new_err + yield sys.stdout, sys.stderr + finally: + sys.stdout, sys.stderr = old_out, old_err + + d = bb.data_smart.DataSmart() + bb.error = dummyerror + + # Assert the function returns the right results + result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,)) + self.assertIn("Found 3", result) + self.assertIn("Found 4", result) + self.assertIn("Found 5", result) + self.assertIn("Found 6", result) + self.assertEqual(len(result), 4) + + # Assert the function prints exceptions + with captured_output() as (out, err): + self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,)) + self.assertIn("KeyError: 'Invalid number 1'", out.getvalue()) + self.assertIn("KeyError: 'Invalid number 2'", out.getvalue()) |