diff options
author | Aníbal Limón <anibal.limon@linux.intel.com> | 2017-05-26 15:37:37 -0500 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2017-05-30 10:15:22 +0100 |
commit | 48b7a407d692e6c49c41b16f2bd11e8c3f47a421 (patch) | |
tree | 6ca7b601652a9fda51693f8ceddbec10a03bbc1d /meta/lib/oeqa | |
parent | 8e71844fc4dd3fcc8a19f9d4c25aafb09c5525fe (diff) | |
download | openembedded-core-48b7a407d692e6c49c41b16f2bd11e8c3f47a421.tar.gz openembedded-core-48b7a407d692e6c49c41b16f2bd11e8c3f47a421.tar.bz2 openembedded-core-48b7a407d692e6c49c41b16f2bd11e8c3f47a421.zip |
oeqa/core/threaded: Add support of OETestRunnerThreaded
The OETestRunnerThreaded overrides the run method of OETestRunner
it recieves a list of suites to be executed by a ThreadPool.
The new run method handles the ThreadPool creation and the
OETestResultThreaded fill.
[YOCTO #11450]
Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta/lib/oeqa')
-rw-r--r-- | meta/lib/oeqa/core/threaded.py | 75 |
1 files changed, 74 insertions, 1 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py index f216685f46..81df340366 100644 --- a/meta/lib/oeqa/core/threaded.py +++ b/meta/lib/oeqa/core/threaded.py @@ -3,11 +3,13 @@ import threading import multiprocessing +import queue +import time from unittest.suite import TestSuite from oeqa.core.loader import OETestLoader -from oeqa.core.runner import OEStreamLogger, OETestResult +from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner class OETestLoaderThreaded(OETestLoader): def __init__(self, tc, module_paths, modules, tests, modules_required, @@ -185,3 +187,74 @@ class OETestResultThreaded(object): tid = list(self._results)[0] result = self._results[tid]['result'] result.logDetails() + +class _Worker(threading.Thread): + """Thread executing tasks from a given tasks queue""" + def __init__(self, tasks, result, stream): + threading.Thread.__init__(self) + self.tasks = tasks + + self.result = result + self.stream = stream + + def run(self): + while True: + try: + func, args, kargs = self.tasks.get(block=False) + except queue.Empty: + break + + try: + run_start_time = time.time() + rc = func(*args, **kargs) + run_end_time = time.time() + self.result.addResult(rc, run_start_time, run_end_time) + self.stream.finish() + except Exception as e: + print(e) + finally: + self.tasks.task_done() + +class _ThreadedPool: + """Pool of threads consuming tasks from a queue""" + def __init__(self, num_workers, num_tasks, stream=None, result=None): + self.tasks = queue.Queue(num_tasks) + self.workers = [] + + for _ in range(num_workers): + worker = _Worker(self.tasks, result, stream) + self.workers.append(worker) + + def start(self): + for worker in self.workers: + worker.start() + + def add_task(self, func, *args, **kargs): + """Add a task to the queue""" + self.tasks.put((func, args, kargs)) + + def wait_completion(self): + """Wait for completion of all the tasks in the queue""" + self.tasks.join() + for worker in self.workers: + worker.join() + +class OETestRunnerThreaded(OETestRunner): + streamLoggerClass = OEStreamLoggerThreaded + + def __init__(self, tc, *args, **kwargs): + super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs) + self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__ + + def run(self, suites): + result = OETestResultThreaded(self.tc) + + pool = _ThreadedPool(len(suites), len(suites), stream=self.stream, + result=result) + for s in suites: + pool.add_task(super(OETestRunnerThreaded, self).run, s) + pool.start() + pool.wait_completion() + result._fill_tc_results() + + return result |