diff options
authorRichard Purdie <>2018-07-09 15:20:34 +0000
committerRichard Purdie <>2018-07-16 16:44:26 +0100
commit326ababfd620ae5ea29bf486b9d68ba3d60cad30 (patch)
parent26e04b23ba1b6942aa7c7df478d41dfe7b73e6e0 (diff)
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test parallelisation to use. Currently this is "module" based with each module being split and run in a separate build directory. Further splitting could be done but this seems a good compromise between test setup and parallelism. You need python-testtools and python-subunit installed to use this but only when the -j option is specified. See notes posted to the openedmbedded-architecture list for more details about the design choices here. Some of this functionality may make more sense in the oeqa core ultimately. Signed-off-by: Richard Purdie <>
4 files changed, 288 insertions, 8 deletions
diff --git a/meta/lib/oeqa/core/ b/meta/lib/oeqa/core/
index 10481b44b6..8cdfbf834f 100644
--- a/meta/lib/oeqa/core/
+++ b/meta/lib/oeqa/core/
@@ -58,14 +58,20 @@ class OETestContext(object):
modules_required, filters)
self.suites =
- def runTests(self, skips=[]):
+ def runTests(self, processes=None, skips=[]):
self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
# Dinamically skip those tests specified though arguments
self._run_start_time = time.time()
- result =
+ if processes:
+ from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
+ concurrent_suite = ConcurrentTestSuite(self.suites, processes)
+ result =
+ else:
+ result =
self._run_end_time = time.time()
return result
diff --git a/meta/lib/oeqa/core/ b/meta/lib/oeqa/core/
index 219102c6b0..6adbe3827b 100644
--- a/meta/lib/oeqa/core/
+++ b/meta/lib/oeqa/core/
@@ -43,11 +43,17 @@ class OETestResult(_TestResult):
super(OETestResult, self).__init__(*args, **kwargs)
self.successes = []
+ self.starttime = {}
+ self.endtime = {}
+ self.progressinfo = {} = tc
def startTest(self, test):
+ # May have been set by concurrencytest
+ if not in self.starttime:
+ self.starttime[] = time.time()
super(OETestResult, self).startTest(test)
def _tc_map_results(self):
@@ -57,6 +63,12 @@ class OETestResult(_TestResult):['expectedFailures'] = self.expectedFailures['successes'] = self.successes
+ def stopTest(self, test):
+ self.endtime[] = time.time()
+ super(OETestResult, self).stopTest(test)
+ if in self.progressinfo:
+ print(self.progressinfo[])
def logSummary(self, component, context_msg=''):
elapsed_time = -"SUMMARY:")
@@ -141,12 +153,16 @@ class OETestResult(_TestResult):
if hasattr(d, 'oeid'):
oeid = d.oeid
+ t = ""
+ if in self.starttime and in self.endtime:
+ t = " (" + "{0:.2f}".format(self.endtime[] - self.starttime[]) + "s)"
if fail:
-"RESULTS - %s - Testcase %s: %s" % (,
- oeid, desc))
+"RESULTS - %s - Testcase %s: %s%s" % (,
+ oeid, desc, t))
-"RESULTS - %s - Testcase %s: %s" % (,
- oeid, 'UNKNOWN'))
+"RESULTS - %s - Testcase %s: %s%s" % (,
+ oeid, 'UNKNOWN', t))
class OEListTestsResult(object):
def wasSuccessful(self):
diff --git a/meta/lib/oeqa/core/utils/ b/meta/lib/oeqa/core/utils/
new file mode 100644
index 0000000000..850586516a
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/
@@ -0,0 +1,254 @@
+#!/usr/bin/env python3
+# Modified for use in OE by Richard Purdie, 2018
+# Modified by: Corey Goldberg, 2013
+# License: GPLv2+
+# Original code from:
+# Bazaar (, v2.6, copied Jun 01 2013)
+# Copyright (C) 2005-2011 Canonical Ltd
+# License: GPLv2+
+import os
+import sys
+import traceback
+import unittest
+import subprocess
+import testtools
+import threading
+import time
+import io
+from queue import Queue
+from itertools import cycle
+from subunit import ProtocolTestCase, TestProtocolClient
+from subunit.test_results import AutoTimingTestResultDecorator
+from testtools import ThreadsafeForwardingResult, iterate_tests
+import bb.utils
+import oe.path
+_all__ = [
+ 'ConcurrentTestSuite',
+ 'fork_for_tests',
+ 'partition_tests',
+# Patch the version from testtools to allow access to _test_start and allow
+# computation of timing information and threading progress
+class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
+ def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
+ super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
+ self.threadnum = threadnum
+ self.totalinprocess = totalinprocess
+ self.totaltests = totaltests
+ def _add_result_with_semaphore(self, method, test, *args, **kwargs):
+ self.semaphore.acquire()
+ try:
+ self.result.starttime[] = self._test_start.timestamp()
+ self.result.threadprogress[self.threadnum].append(
+ totalprogress = sum(len(x) for x in self.result.threadprogress.values())
+ self.result.progressinfo[] = "%s: %s/%s %s/%s (%ss) (%s)" % (
+ self.threadnum,
+ len(self.result.threadprogress[self.threadnum]),
+ self.totalinprocess,
+ totalprogress,
+ self.totaltests,
+ "{0:.2f}".format(time.time()-self._test_start.timestamp()),
+ finally:
+ self.semaphore.release()
+ super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
+# A dummy structure to add to io.StringIO so that the .buffer object
+# is available and accepts writes. This allows unittest with buffer=True
+# to interact ok with subunit which wants to access sys.stdout.buffer.
+class dummybuf(object):
+ def __init__(self, parent):
+ self.p = parent
+ def write(self, data):
+ self.p.write(data.decode("utf-8"))
+# Taken from testtools.ConncurrencyTestSuite but modified for OE use
+class ConcurrentTestSuite(unittest.TestSuite):
+ def __init__(self, suite, processes):
+ super(ConcurrentTestSuite, self).__init__([suite])
+ self.processes = processes
+ def run(self, result):
+ tests, totaltests = fork_for_tests(self.processes, self)
+ try:
+ threads = {}
+ queue = Queue()
+ semaphore = threading.Semaphore(1)
+ result.threadprogress = {}
+ for i, (test, testnum) in enumerate(tests):
+ result.threadprogress[i] = []
+ process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
+ # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+ # as per default in parent code
+ process_result.buffer = True
+ # We have to add a buffer object to stdout to keep subunit happy
+ process_result._stderr_buffer = io.StringIO()
+ process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
+ process_result._stdout_buffer = io.StringIO()
+ process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
+ reader_thread = threading.Thread(
+ target=self._run_test, args=(test, process_result, queue))
+ threads[test] = reader_thread, process_result
+ reader_thread.start()
+ while threads:
+ finished_test = queue.get()
+ threads[finished_test][0].join()
+ del threads[finished_test]
+ except:
+ for thread, process_result in threads.values():
+ process_result.stop()
+ raise
+ def _run_test(self, test, process_result, queue):
+ try:
+ try:
+ except Exception:
+ # The run logic itself failed
+ case = testtools.ErrorHolder(
+ "broken-runner",
+ error=sys.exc_info())
+ finally:
+ queue.put(test)
+def removebuilddir(d):
+ delay = 5
+ while delay and os.path.exists(d + "/bitbake.lock"):
+ time.sleep(1)
+ delay = delay - 1
+ bb.utils.prunedir(d)
+def fork_for_tests(concurrency_num, suite):
+ result = []
+ test_blocks = partition_tests(suite, concurrency_num)
+ # Clear the tests from the original suite so it doesn't keep them alive
+ suite._tests[:] = []
+ totaltests = sum(len(x) for x in test_blocks)
+ for process_tests in test_blocks:
+ numtests = len(process_tests)
+ process_suite = unittest.TestSuite(process_tests)
+ # Also clear each split list so new suite has only reference
+ process_tests[:] = []
+ c2pread, c2pwrite = os.pipe()
+ # Clear buffers before fork to avoid duplicate output
+ sys.stdout.flush()
+ sys.stderr.flush()
+ pid = os.fork()
+ if pid == 0:
+ ourpid = os.getpid()
+ try:
+ newbuilddir = None
+ stream = os.fdopen(c2pwrite, 'wb', 1)
+ os.close(c2pread)
+ # Create a new separate BUILDDIR for each group of tests
+ if 'BUILDDIR' in os.environ:
+ builddir = os.environ['BUILDDIR']
+ newbuilddir = builddir + "-st-" + str(ourpid)
+ selftestdir = os.path.abspath(builddir + "/../meta-selftest")
+ newselftestdir = newbuilddir + "/meta-selftest"
+ bb.utils.mkdirhier(newbuilddir)
+ oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
+ oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
+ oe.path.copytree(selftestdir, newselftestdir)
+ for e in os.environ:
+ if builddir in os.environ[e]:
+ os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
+ subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
+ # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
+ subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
+ os.chdir(newbuilddir)
+ for t in process_suite:
+ if not hasattr(t, "tc"):
+ continue
+ cp =
+ for p in cp:
+ if selftestdir in cp[p] and newselftestdir not in cp[p]:
+ cp[p] = cp[p].replace(selftestdir, newselftestdir)
+ if builddir in cp[p] and newbuilddir not in cp[p]:
+ cp[p] = cp[p].replace(builddir, newbuilddir)
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides to
+ # read from stdin (otherwise its a roulette to see what
+ # child actually gets keystrokes for pdb etc).
+ newsi =, os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
+ subunit_client = TestProtocolClient(stream)
+ # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+ # as per default in parent code
+ subunit_client.buffer = True
+ subunit_result = AutoTimingTestResultDecorator(subunit_client)
+ if ourpid != os.getpid():
+ os._exit(0)
+ if newbuilddir:
+ removebuilddir(newbuilddir)
+ except:
+ # Don't do anything with process children
+ if ourpid != os.getpid():
+ os._exit(1)
+ # Try and report traceback on stream, but exit with error
+ # even if stream couldn't be created or something else
+ # goes wrong. The traceback is formatted to a string and
+ # written in one go to avoid interleaving lines from
+ # multiple failing children.
+ try:
+ stream.write(traceback.format_exc().encode('utf-8'))
+ except:
+ sys.stderr.write(traceback.format_exc())
+ finally:
+ if newbuilddir:
+ removebuilddir(newbuilddir)
+ os._exit(1)
+ os._exit(0)
+ else:
+ os.close(c2pwrite)
+ stream = os.fdopen(c2pread, 'rb', 1)
+ test = ProtocolTestCase(stream)
+ result.append((test, numtests))
+ return result, totaltests
+def partition_tests(suite, count):
+ # Keep tests from the same class together but allow tests from modules
+ # to go to different processes to aid parallelisation.
+ modules = {}
+ for test in iterate_tests(suite):
+ m = test.__module__ + "." + test.__class__.__name__
+ if m not in modules:
+ modules[m] = []
+ modules[m].append(test)
+ # Simply divide the test blocks between the available processes
+ partitions = [list() for _ in range(count)]
+ for partition, m in zip(cycle(partitions), modules):
+ partition.extend(modules[m])
+ # No point in empty threads so drop them
+ return [p for p in partitions if p]
diff --git a/meta/lib/oeqa/selftest/ b/meta/lib/oeqa/selftest/
index 9e90d3c256..c937b8171c 100644
--- a/meta/lib/oeqa/selftest/
+++ b/meta/lib/oeqa/selftest/
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
self.custommachine = None
self.config_paths = config_paths
- def runTests(self, machine=None, skips=[]):
+ def runTests(self, processes=None, machine=None, skips=[]):
if machine:
self.custommachine = machine
if machine == 'random':
self.custommachine = choice(self.machines)'Run tests with custom MACHINE set to: %s' % \
- return super(OESelftestTestContext, self).runTests(skips)
+ return super(OESelftestTestContext, self).runTests(processes, skips)
def listTests(self, display_type, machine=None):
return super(OESelftestTestContext, self).listTests(display_type)
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
action="store_true", default=False,
help='List all available tests.')
+ parser.add_argument('-j', '--num-processes', dest='processes', action='store',
+ type=int, help="number of processes to execute in parallel with")
parser.add_argument('--machine', required=False, choices=['random', 'all'],
help='Run tests on different machines (random/all).')
@@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
self.tc_kwargs['run']['skips'] = args.skips
+ self.tc_kwargs['run']['processes'] = args.processes
def _pre_run(self):
def _check_required_env_variables(vars):