diff options
Diffstat (limited to 'meta/lib/oeqa/core/context.py')
| -rw-r--r-- | meta/lib/oeqa/core/context.py | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py new file mode 100644 index 0000000000..4476750a3c --- /dev/null +++ b/meta/lib/oeqa/core/context.py @@ -0,0 +1,243 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import sys +import json +import time +import logging +import collections +import re + +from oeqa.core.loader import OETestLoader +from oeqa.core.runner import OETestRunner, OEStreamLogger, xmlEnabled + +class OETestContext(object): + loaderClass = OETestLoader + runnerClass = OETestRunner + streamLoggerClass = OEStreamLogger + + files_dir = os.path.abspath(os.path.join(os.path.dirname( + os.path.abspath(__file__)), "../files")) + + def __init__(self, td=None, logger=None): + if not type(td) is dict: + raise TypeError("td isn't dictionary type") + + self.td = td + self.logger = logger + self._registry = {} + self._registry['cases'] = collections.OrderedDict() + self._results = {} + + def _read_modules_from_manifest(self, manifest): + if not os.path.exists(manifest): + raise + + modules = [] + for line in open(manifest).readlines(): + line = line.strip() + if line and not line.startswith("#"): + modules.append(line) + + return modules + + def loadTests(self, module_paths, modules=[], tests=[], + modules_manifest="", modules_required=[], filters={}): + if modules_manifest: + modules = self._read_modules_from_manifest(modules_manifest) + + self.loader = self.loaderClass(self, module_paths, modules, tests, + modules_required, filters) + self.suites = self.loader.discover() + + def runTests(self): + streamLogger = self.streamLoggerClass(self.logger) + self.runner = self.runnerClass(self, stream=streamLogger, verbosity=2) + + self._run_start_time = time.time() + result = self.runner.run(self.suites) + self._run_end_time = time.time() + + return result + + def logSummary(self, result, component, context_msg=''): + self.logger.info("SUMMARY:") + self.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component, + context_msg, result.testsRun, result.testsRun != 1 and "s" or "", + (self._run_end_time - self._run_start_time))) + + if result.wasSuccessful(): + msg = "%s - OK - All required tests passed" % component + else: + msg = "%s - FAIL - Required tests failed" % component + skipped = len(self._results['skipped']) + if skipped: + msg += " (skipped=%d)" % skipped + self.logger.info(msg) + + def _getDetailsNotPassed(self, case, type, desc): + found = False + + for (scase, msg) in self._results[type]: + # XXX: When XML reporting is enabled scase is + # xmlrunner.result._TestInfo instance instead of + # string. + if xmlEnabled: + if case.id() == scase.test_id: + found = True + break + scase_str = scase.test_id + else: + if case == scase: + found = True + break + scase_str = str(scase) + + # When fails at module or class level the class name is passed as string + # so figure out to see if match + m = re.search("^setUpModule \((?P<module_name>.*)\)$", scase_str) + if m: + if case.__class__.__module__ == m.group('module_name'): + found = True + break + + m = re.search("^setUpClass \((?P<class_name>.*)\)$", scase_str) + if m: + class_name = "%s.%s" % (case.__class__.__module__, + case.__class__.__name__) + + if class_name == m.group('class_name'): + found = True + break + + if found: + return (found, msg) + + return (found, None) + + def logDetails(self): + self.logger.info("RESULTS:") + for case_name in self._registry['cases']: + case = self._registry['cases'][case_name] + + result_types = ['failures', 'errors', 'skipped', 'expectedFailures'] + result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL'] + + fail = False + desc = None + for idx, name in enumerate(result_types): + (fail, msg) = self._getDetailsNotPassed(case, result_types[idx], + result_desc[idx]) + if fail: + desc = result_desc[idx] + break + + oeid = -1 + for d in case.decorators: + if hasattr(d, 'oeid'): + oeid = d.oeid + + if fail: + self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), + oeid, desc)) + if msg: + self.logger.info(msg) + else: + self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), + oeid, 'PASSED')) + +class OETestContextExecutor(object): + _context_class = OETestContext + + name = 'core' + help = 'core test component example' + description = 'executes core test suite example' + + default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'cases/example')] + default_test_data = os.path.join(default_cases[0], 'data.json') + default_tests = None + + def register_commands(self, logger, subparsers): + self.parser = subparsers.add_parser(self.name, help=self.help, + description=self.description, group='components') + + self.default_output_log = '%s-results-%s.log' % (self.name, + time.strftime("%Y%m%d%H%M%S")) + self.parser.add_argument('--output-log', action='store', + default=self.default_output_log, + help="results output log, default: %s" % self.default_output_log) + self.parser.add_argument('--run-tests', action='store', + default=self.default_tests, + help="tests to run in <module>[.<class>[.<name>]] format. Just works for modules now") + + if self.default_test_data: + self.parser.add_argument('--test-data-file', action='store', + default=self.default_test_data, + help="data file to load, default: %s" % self.default_test_data) + else: + self.parser.add_argument('--test-data-file', action='store', + help="data file to load") + + if self.default_cases: + self.parser.add_argument('CASES_PATHS', action='store', + default=self.default_cases, nargs='*', + help="paths to directories with test cases, default: %s"\ + % self.default_cases) + else: + self.parser.add_argument('CASES_PATHS', action='store', + nargs='+', help="paths to directories with test cases") + + self.parser.set_defaults(func=self.run) + + def _setup_logger(self, logger, args): + formatter = logging.Formatter('%(asctime)s - ' + self.name + \ + ' - %(levelname)s - %(message)s') + sh = logger.handlers[0] + sh.setFormatter(formatter) + fh = logging.FileHandler(args.output_log) + fh.setFormatter(formatter) + logger.addHandler(fh) + + return logger + + def _process_args(self, logger, args): + self.tc_kwargs = {} + self.tc_kwargs['init'] = {} + self.tc_kwargs['load'] = {} + self.tc_kwargs['run'] = {} + + self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) + if args.test_data_file: + self.tc_kwargs['init']['td'] = json.load( + open(args.test_data_file, "r")) + else: + self.tc_kwargs['init']['td'] = {} + + + if args.run_tests: + self.tc_kwargs['load']['modules'] = args.run_tests.split() + else: + self.tc_kwargs['load']['modules'] = None + + self.module_paths = args.CASES_PATHS + + def run(self, logger, args): + self._process_args(logger, args) + + self.tc = self._context_class(**self.tc_kwargs['init']) + self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) + rc = self.tc.runTests(**self.tc_kwargs['run']) + self.tc.logSummary(rc, self.name) + self.tc.logDetails() + + output_link = os.path.join(os.path.dirname(args.output_log), + "%s-results.log" % self.name) + if os.path.exists(output_link): + os.remove(output_link) + os.symlink(args.output_log, output_link) + + return rc + +_executor_class = OETestContextExecutor |
