diff options
Diffstat (limited to 'meta/lib/oeqa/utils')
| -rw-r--r-- | meta/lib/oeqa/utils/__init__.py | 68 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/buildproject.py | 55 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/commands.py | 303 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/decorators.py | 295 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/dump.py | 91 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/ftools.py | 46 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/git.py | 80 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/httpserver.py | 36 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/logparser.py | 125 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/metadata.py | 118 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/network.py | 8 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/package_manager.py | 210 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/qemurunner.py | 604 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/qemutinyrunner.py | 175 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/sshcontrol.py | 245 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/subprocesstweak.py | 19 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/targetbuild.py | 139 | ||||
| -rw-r--r-- | meta/lib/oeqa/utils/testexport.py | 263 |
18 files changed, 2880 insertions, 0 deletions
diff --git a/meta/lib/oeqa/utils/__init__.py b/meta/lib/oeqa/utils/__init__.py new file mode 100644 index 0000000000..485de031a9 --- /dev/null +++ b/meta/lib/oeqa/utils/__init__.py @@ -0,0 +1,68 @@ +# Enable other layers to have modules in the same named directory +from pkgutil import extend_path +__path__ = extend_path(__path__, __name__) + + +# Borrowed from CalledProcessError + +class CommandError(Exception): + def __init__(self, retcode, cmd, output = None): + self.retcode = retcode + self.cmd = cmd + self.output = output + def __str__(self): + return "Command '%s' returned non-zero exit status %d with output: %s" % (self.cmd, self.retcode, self.output) + +def avoid_paths_in_environ(paths): + """ + Searches for every path in os.environ['PATH'] + if found remove it. + + Returns new PATH without avoided PATHs. + """ + import os + + new_path = '' + for p in os.environ['PATH'].split(':'): + avoid = False + for pa in paths: + if pa in p: + avoid = True + break + if avoid: + continue + + new_path = new_path + p + ':' + + new_path = new_path[:-1] + return new_path + +def make_logger_bitbake_compatible(logger): + import logging + + """ + Bitbake logger redifines debug() in order to + set a level within debug, this breaks compatibility + with vainilla logging, so we neeed to redifine debug() + method again also add info() method with INFO + 1 level. + """ + def _bitbake_log_debug(*args, **kwargs): + lvl = logging.DEBUG + + if isinstance(args[0], int): + lvl = args[0] + msg = args[1] + args = args[2:] + else: + msg = args[0] + args = args[1:] + + logger.log(lvl, msg, *args, **kwargs) + + def _bitbake_log_info(msg, *args, **kwargs): + logger.log(logging.INFO + 1, msg, *args, **kwargs) + + logger.debug = _bitbake_log_debug + logger.info = _bitbake_log_info + + return logger diff --git a/meta/lib/oeqa/utils/buildproject.py b/meta/lib/oeqa/utils/buildproject.py new file mode 100644 index 0000000000..487f08be49 --- /dev/null +++ b/meta/lib/oeqa/utils/buildproject.py @@ -0,0 +1,55 @@ +# Copyright (C) 2013-2016 Intel Corporation +# +# Released under the MIT license (see COPYING.MIT) + +# Provides a class for automating build tests for projects + +import os +import re +import subprocess +import shutil +import tempfile + +from abc import ABCMeta, abstractmethod + +class BuildProject(metaclass=ABCMeta): + def __init__(self, uri, foldername=None, tmpdir=None, dl_dir=None): + self.uri = uri + self.archive = os.path.basename(uri) + if not tmpdir: + tmpdir = tempfile.mkdtemp(prefix='buildproject') + self.localarchive = os.path.join(tmpdir, self.archive) + self.dl_dir = dl_dir + if foldername: + self.fname = foldername + else: + self.fname = re.sub(r'\.tar\.bz2$|\.tar\.gz$|\.tar\.xz$', '', self.archive) + + # Download self.archive to self.localarchive + def _download_archive(self): + if self.dl_dir and os.path.exists(os.path.join(self.dl_dir, self.archive)): + shutil.copyfile(os.path.join(self.dl_dir, self.archive), self.localarchive) + return + + cmd = "wget -O %s %s" % (self.localarchive, self.uri) + subprocess.check_output(cmd, shell=True) + + # This method should provide a way to run a command in the desired environment. + @abstractmethod + def _run(self, cmd): + pass + + # The timeout parameter of target.run is set to 0 to make the ssh command + # run with no timeout. + def run_configure(self, configure_args='', extra_cmds=''): + return self._run('cd %s; gnu-configize; %s ./configure %s' % (self.targetdir, extra_cmds, configure_args)) + + def run_make(self, make_args=''): + return self._run('cd %s; make %s' % (self.targetdir, make_args)) + + def run_install(self, install_args=''): + return self._run('cd %s; make install %s' % (self.targetdir, install_args)) + + def clean(self): + self._run('rm -rf %s' % self.targetdir) + subprocess.call('rm -f %s' % self.localarchive, shell=True) diff --git a/meta/lib/oeqa/utils/commands.py b/meta/lib/oeqa/utils/commands.py new file mode 100644 index 0000000000..57286fcb10 --- /dev/null +++ b/meta/lib/oeqa/utils/commands.py @@ -0,0 +1,303 @@ +# Copyright (c) 2013-2014 Intel Corporation +# +# Released under the MIT license (see COPYING.MIT) + +# DESCRIPTION +# This module is mainly used by scripts/oe-selftest and modules under meta/oeqa/selftest +# It provides a class and methods for running commands on the host in a convienent way for tests. + + + +import os +import sys +import signal +import subprocess +import threading +import logging +from oeqa.utils import CommandError +from oeqa.utils import ftools +import re +import contextlib +# Export test doesn't require bb +try: + import bb +except ImportError: + pass + +class Command(object): + def __init__(self, command, bg=False, timeout=None, data=None, **options): + + self.defaultopts = { + "stdout": subprocess.PIPE, + "stderr": subprocess.STDOUT, + "stdin": None, + "shell": False, + "bufsize": -1, + } + + self.cmd = command + self.bg = bg + self.timeout = timeout + self.data = data + + self.options = dict(self.defaultopts) + if isinstance(self.cmd, str): + self.options["shell"] = True + if self.data: + self.options['stdin'] = subprocess.PIPE + self.options.update(options) + + self.status = None + self.output = None + self.error = None + self.thread = None + + self.log = logging.getLogger("utils.commands") + + def run(self): + self.process = subprocess.Popen(self.cmd, **self.options) + + def commThread(): + self.output, self.error = self.process.communicate(self.data) + + self.thread = threading.Thread(target=commThread) + self.thread.start() + + self.log.debug("Running command '%s'" % self.cmd) + + if not self.bg: + self.thread.join(self.timeout) + self.stop() + + def stop(self): + if self.thread.isAlive(): + self.process.terminate() + # let's give it more time to terminate gracefully before killing it + self.thread.join(5) + if self.thread.isAlive(): + self.process.kill() + self.thread.join() + + if not self.output: + self.output = "" + else: + self.output = self.output.decode("utf-8", errors='replace').rstrip() + self.status = self.process.poll() + + self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status)) + # logging the complete output is insane + # bitbake -e output is really big + # and makes the log file useless + if self.status: + lout = "\n".join(self.output.splitlines()[-20:]) + self.log.debug("Last 20 lines:\n%s" % lout) + + +class Result(object): + pass + + +def runCmd(command, ignore_status=False, timeout=None, assert_error=True, + native_sysroot=None, limit_exc_output=0, **options): + result = Result() + + if native_sysroot: + extra_paths = "%s/sbin:%s/usr/sbin:%s/usr/bin" % \ + (native_sysroot, native_sysroot, native_sysroot) + nenv = dict(options.get('env', os.environ)) + nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '') + options['env'] = nenv + + cmd = Command(command, timeout=timeout, **options) + cmd.run() + + result.command = command + result.status = cmd.status + result.output = cmd.output + result.error = cmd.error + result.pid = cmd.process.pid + + if result.status and not ignore_status: + exc_output = result.output + if limit_exc_output > 0: + split = result.output.splitlines() + if len(split) > limit_exc_output: + exc_output = "\n... (last %d lines of output)\n" % limit_exc_output + \ + '\n'.join(split[-limit_exc_output:]) + if assert_error: + raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, result.status, exc_output)) + else: + raise CommandError(result.status, command, exc_output) + + return result + + +def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **options): + + if postconfig: + postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf') + ftools.write_file(postconfig_file, postconfig) + extra_args = "-R %s" % postconfig_file + else: + extra_args = "" + + if isinstance(command, str): + cmd = "bitbake " + extra_args + " " + command + else: + cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]] + + try: + return runCmd(cmd, ignore_status, timeout, **options) + finally: + if postconfig: + os.remove(postconfig_file) + + +def get_bb_env(target=None, postconfig=None): + if target: + return bitbake("-e %s" % target, postconfig=postconfig).output + else: + return bitbake("-e", postconfig=postconfig).output + +def get_bb_vars(variables=None, target=None, postconfig=None): + """Get values of multiple bitbake variables""" + bbenv = get_bb_env(target, postconfig=postconfig) + + if variables is not None: + variables = variables.copy() + var_re = re.compile(r'^(export )?(?P<var>\w+(_.*)?)="(?P<value>.*)"$') + unset_re = re.compile(r'^unset (?P<var>\w+)$') + lastline = None + values = {} + for line in bbenv.splitlines(): + match = var_re.match(line) + val = None + if match: + val = match.group('value') + else: + match = unset_re.match(line) + if match: + # Handle [unexport] variables + if lastline.startswith('# "'): + val = lastline.split('"')[1] + if val: + var = match.group('var') + if variables is None: + values[var] = val + else: + if var in variables: + values[var] = val + variables.remove(var) + # Stop after all required variables have been found + if not variables: + break + lastline = line + if variables: + # Fill in missing values + for var in variables: + values[var] = None + return values + +def get_bb_var(var, target=None, postconfig=None): + return get_bb_vars([var], target, postconfig)[var] + +def get_test_layer(): + layers = get_bb_var("BBLAYERS").split() + testlayer = None + for l in layers: + if '~' in l: + l = os.path.expanduser(l) + if "/meta-selftest" in l and os.path.isdir(l): + testlayer = l + break + return testlayer + +def create_temp_layer(templayerdir, templayername, priority=999, recipepathspec='recipes-*/*'): + os.makedirs(os.path.join(templayerdir, 'conf')) + with open(os.path.join(templayerdir, 'conf', 'layer.conf'), 'w') as f: + f.write('BBPATH .= ":${LAYERDIR}"\n') + f.write('BBFILES += "${LAYERDIR}/%s/*.bb \\' % recipepathspec) + f.write(' ${LAYERDIR}/%s/*.bbappend"\n' % recipepathspec) + f.write('BBFILE_COLLECTIONS += "%s"\n' % templayername) + f.write('BBFILE_PATTERN_%s = "^${LAYERDIR}/"\n' % templayername) + f.write('BBFILE_PRIORITY_%s = "%d"\n' % (templayername, priority)) + f.write('BBFILE_PATTERN_IGNORE_EMPTY_%s = "1"\n' % templayername) + + +@contextlib.contextmanager +def runqemu(pn, ssh=True, runqemuparams='', image_fstype=None, launch_cmd=None, qemuparams=None, overrides={}, discard_writes=True): + """ + launch_cmd means directly run the command, don't need set rootfs or env vars. + """ + + import bb.tinfoil + import bb.build + + tinfoil = bb.tinfoil.Tinfoil() + tinfoil.prepare(config_only=False, quiet=True) + try: + tinfoil.logger.setLevel(logging.WARNING) + import oeqa.targetcontrol + tinfoil.config_data.setVar("TEST_LOG_DIR", "${WORKDIR}/testimage") + tinfoil.config_data.setVar("TEST_QEMUBOOT_TIMEOUT", "1000") + # Tell QemuTarget() whether need find rootfs/kernel or not + if launch_cmd: + tinfoil.config_data.setVar("FIND_ROOTFS", '0') + else: + tinfoil.config_data.setVar("FIND_ROOTFS", '1') + + recipedata = tinfoil.parse_recipe(pn) + for key, value in overrides.items(): + recipedata.setVar(key, value) + + # The QemuRunner log is saved out, but we need to ensure it is at the right + # log level (and then ensure that since it's a child of the BitBake logger, + # we disable propagation so we don't then see the log events on the console) + logger = logging.getLogger('BitBake.QemuRunner') + logger.setLevel(logging.DEBUG) + logger.propagate = False + logdir = recipedata.getVar("TEST_LOG_DIR") + + qemu = oeqa.targetcontrol.QemuTarget(recipedata, image_fstype) + finally: + # We need to shut down tinfoil early here in case we actually want + # to run tinfoil-using utilities with the running QEMU instance. + # Luckily QemuTarget doesn't need it after the constructor. + tinfoil.shutdown() + + # Setup bitbake logger as console handler is removed by tinfoil.shutdown + bblogger = logging.getLogger('BitBake') + bblogger.setLevel(logging.INFO) + console = logging.StreamHandler(sys.stdout) + bbformat = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") + if sys.stdout.isatty(): + bbformat.enable_color() + console.setFormatter(bbformat) + bblogger.addHandler(console) + + try: + qemu.deploy() + try: + qemu.start(params=qemuparams, ssh=ssh, runqemuparams=runqemuparams, launch_cmd=launch_cmd, discard_writes=discard_writes) + except bb.build.FuncFailed: + raise Exception('Failed to start QEMU - see the logs in %s' % logdir) + + yield qemu + + finally: + try: + qemu.stop() + except: + pass + +def updateEnv(env_file): + """ + Source a file and update environment. + """ + + cmd = ". %s; env -0" % env_file + result = runCmd(cmd) + + for line in result.output.split("\0"): + (key, _, value) = line.partition("=") + os.environ[key] = value diff --git a/meta/lib/oeqa/utils/decorators.py b/meta/lib/oeqa/utils/decorators.py new file mode 100644 index 0000000000..d876896921 --- /dev/null +++ b/meta/lib/oeqa/utils/decorators.py @@ -0,0 +1,295 @@ +# Copyright (C) 2013 Intel Corporation +# +# Released under the MIT license (see COPYING.MIT) + +# Some custom decorators that can be used by unittests +# Most useful is skipUnlessPassed which can be used for +# creating dependecies between two test methods. + +import os +import logging +import sys +import unittest +import threading +import signal +from functools import wraps + +#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame +class getResults(object): + def __init__(self): + #dynamically determine the unittest.case frame and use it to get the name of the test method + ident = threading.current_thread().ident + upperf = sys._current_frames()[ident] + while (upperf.f_globals['__name__'] != 'unittest.case'): + upperf = upperf.f_back + + def handleList(items): + ret = [] + # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception()) + for i in items: + s = i[0].id() + #Handle the _ErrorHolder objects from skipModule failures + if "setUpModule (" in s: + ret.append(s.replace("setUpModule (", "").replace(")","")) + else: + ret.append(s) + # Append also the test without the full path + testname = s.split('.')[-1] + if testname: + ret.append(testname) + return ret + self.faillist = handleList(upperf.f_locals['result'].failures) + self.errorlist = handleList(upperf.f_locals['result'].errors) + self.skiplist = handleList(upperf.f_locals['result'].skipped) + + def getFailList(self): + return self.faillist + + def getErrorList(self): + return self.errorlist + + def getSkipList(self): + return self.skiplist + +class skipIfFailure(object): + + def __init__(self,testcase): + self.testcase = testcase + + def __call__(self,f): + @wraps(f) + def wrapped_f(*args, **kwargs): + res = getResults() + if self.testcase in (res.getFailList() or res.getErrorList()): + raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) + return f(*args, **kwargs) + wrapped_f.__name__ = f.__name__ + return wrapped_f + +class skipIfSkipped(object): + + def __init__(self,testcase): + self.testcase = testcase + + def __call__(self,f): + @wraps(f) + def wrapped_f(*args, **kwargs): + res = getResults() + if self.testcase in res.getSkipList(): + raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) + return f(*args, **kwargs) + wrapped_f.__name__ = f.__name__ + return wrapped_f + +class skipUnlessPassed(object): + + def __init__(self,testcase): + self.testcase = testcase + + def __call__(self,f): + @wraps(f) + def wrapped_f(*args, **kwargs): + res = getResults() + if self.testcase in res.getSkipList() or \ + self.testcase in res.getFailList() or \ + self.testcase in res.getErrorList(): + raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) + return f(*args, **kwargs) + wrapped_f.__name__ = f.__name__ + wrapped_f._depends_on = self.testcase + return wrapped_f + +class testcase(object): + def __init__(self, test_case): + self.test_case = test_case + + def __call__(self, func): + @wraps(func) + def wrapped_f(*args, **kwargs): + return func(*args, **kwargs) + wrapped_f.test_case = self.test_case + wrapped_f.__name__ = func.__name__ + return wrapped_f + +class NoParsingFilter(logging.Filter): + def filter(self, record): + return record.levelno == 100 + +import inspect + +def LogResults(original_class): + orig_method = original_class.run + + from time import strftime, gmtime + caller = os.path.basename(sys.argv[0]) + timestamp = strftime('%Y%m%d%H%M%S',gmtime()) + logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log') + linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log') + + def get_class_that_defined_method(meth): + if inspect.ismethod(meth): + for cls in inspect.getmro(meth.__self__.__class__): + if cls.__dict__.get(meth.__name__) is meth: + return cls + meth = meth.__func__ # fallback to __qualname__ parsing + if inspect.isfunction(meth): + cls = getattr(inspect.getmodule(meth), + meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]) + if isinstance(cls, type): + return cls + return None + + #rewrite the run method of unittest.TestCase to add testcase logging + def run(self, result, *args, **kws): + orig_method(self, result, *args, **kws) + passed = True + testMethod = getattr(self, self._testMethodName) + #if test case is decorated then use it's number, else use it's name + try: + test_case = testMethod.test_case + except AttributeError: + test_case = self._testMethodName + + class_name = str(get_class_that_defined_method(testMethod)).split("'")[1] + + #create custom logging level for filtering. + custom_log_level = 100 + logging.addLevelName(custom_log_level, 'RESULTS') + + def results(self, message, *args, **kws): + if self.isEnabledFor(custom_log_level): + self.log(custom_log_level, message, *args, **kws) + logging.Logger.results = results + + logging.basicConfig(filename=logfile, + filemode='w', + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%H:%M:%S', + level=custom_log_level) + for handler in logging.root.handlers: + handler.addFilter(NoParsingFilter()) + local_log = logging.getLogger(caller) + + #check status of tests and record it + + tcid = self.id() + for (name, msg) in result.errors: + if tcid == name.id(): + local_log.results("Testcase "+str(test_case)+": ERROR") + local_log.results("Testcase "+str(test_case)+":\n"+msg) + passed = False + for (name, msg) in result.failures: + if tcid == name.id(): + local_log.results("Testcase "+str(test_case)+": FAILED") + local_log.results("Testcase "+str(test_case)+":\n"+msg) + passed = False + for (name, msg) in result.skipped: + if tcid == name.id(): + local_log.results("Testcase "+str(test_case)+": SKIPPED") + passed = False + if passed: + local_log.results("Testcase "+str(test_case)+": PASSED") + + # XXX: In order to avoid race condition when test if exists the linkfile + # use bb.utils.lock, the best solution is to create a unique name for the + # link file. + try: + import bb + has_bb = True + lockfilename = linkfile + '.lock' + except ImportError: + has_bb = False + + if has_bb: + lf = bb.utils.lockfile(lockfilename, block=True) + # Create symlink to the current log + if os.path.lexists(linkfile): + os.remove(linkfile) + os.symlink(logfile, linkfile) + if has_bb: + bb.utils.unlockfile(lf) + + original_class.run = run + + return original_class + +class TimeOut(BaseException): + pass + +def timeout(seconds): + def decorator(fn): + if hasattr(signal, 'alarm'): + @wraps(fn) + def wrapped_f(*args, **kw): + current_frame = sys._getframe() + def raiseTimeOut(signal, frame): + if frame is not current_frame: + raise TimeOut('%s seconds' % seconds) + prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) + try: + signal.alarm(seconds) + return fn(*args, **kw) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, prev_handler) + return wrapped_f + else: + return fn + return decorator + +__tag_prefix = "tag__" +def tag(*args, **kwargs): + """Decorator that adds attributes to classes or functions + for use with the Attribute (-a) plugin. + """ + def wrap_ob(ob): + for name in args: + setattr(ob, __tag_prefix + name, True) + for name, value in kwargs.items(): + setattr(ob, __tag_prefix + name, value) + return ob + return wrap_ob + +def gettag(obj, key, default=None): + key = __tag_prefix + key + if not isinstance(obj, unittest.TestCase): + return getattr(obj, key, default) + tc_method = getattr(obj, obj._testMethodName) + ret = getattr(tc_method, key, getattr(obj, key, default)) + return ret + +def getAllTags(obj): + def __gettags(o): + r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)} + return r + if not isinstance(obj, unittest.TestCase): + return __gettags(obj) + tc_method = getattr(obj, obj._testMethodName) + ret = __gettags(obj) + ret.update(__gettags(tc_method)) + return ret + +def timeout_handler(seconds): + def decorator(fn): + if hasattr(signal, 'alarm'): + @wraps(fn) + def wrapped_f(self, *args, **kw): + current_frame = sys._getframe() + def raiseTimeOut(signal, frame): + if frame is not current_frame: + try: + self.target.restart() + raise TimeOut('%s seconds' % seconds) + except: + raise TimeOut('%s seconds' % seconds) + prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) + try: + signal.alarm(seconds) + return fn(self, *args, **kw) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, prev_handler) + return wrapped_f + else: + return fn + return decorator diff --git a/meta/lib/oeqa/utils/dump.py b/meta/lib/oeqa/utils/dump.py new file mode 100644 index 0000000000..5a7edc1a86 --- /dev/null +++ b/meta/lib/oeqa/utils/dump.py @@ -0,0 +1,91 @@ +import os +import sys +import errno +import datetime +import itertools +from .commands import runCmd + +class BaseDumper(object): + """ Base class to dump commands from host/target """ + + def __init__(self, cmds, parent_dir): + self.cmds = [] + # Some testing doesn't inherit testimage, so it is needed + # to set some defaults. + self.parent_dir = parent_dir or "/tmp/oe-saved-tests" + dft_cmds = """ top -bn1 + iostat -x -z -N -d -p ALL 20 2 + ps -ef + free + df + memstat + dmesg + ip -s link + netstat -an""" + if not cmds: + cmds = dft_cmds + for cmd in cmds.split('\n'): + cmd = cmd.lstrip() + if not cmd or cmd[0] == '#': + continue + self.cmds.append(cmd) + + def create_dir(self, dir_suffix): + dump_subdir = ("%s_%s" % ( + datetime.datetime.now().strftime('%Y%m%d%H%M'), + dir_suffix)) + dump_dir = os.path.join(self.parent_dir, dump_subdir) + try: + os.makedirs(dump_dir) + except OSError as err: + if err.errno != errno.EEXIST: + raise err + self.dump_dir = dump_dir + + def _write_dump(self, command, output): + if isinstance(self, HostDumper): + prefix = "host" + elif isinstance(self, TargetDumper): + prefix = "target" + else: + prefix = "unknown" + for i in itertools.count(): + filename = "%s_%02d_%s" % (prefix, i, command) + fullname = os.path.join(self.dump_dir, filename) + if not os.path.exists(fullname): + break + with open(fullname, 'w') as dump_file: + dump_file.write(output) + + +class HostDumper(BaseDumper): + """ Class to get dumps from the host running the tests """ + + def __init__(self, cmds, parent_dir): + super(HostDumper, self).__init__(cmds, parent_dir) + + def dump_host(self, dump_dir=""): + if dump_dir: + self.dump_dir = dump_dir + for cmd in self.cmds: + result = runCmd(cmd, ignore_status=True) + self._write_dump(cmd.split()[0], result.output) + +class TargetDumper(BaseDumper): + """ Class to get dumps from target, it only works with QemuRunner """ + + def __init__(self, cmds, parent_dir, runner): + super(TargetDumper, self).__init__(cmds, parent_dir) + self.runner = runner + + def dump_target(self, dump_dir=""): + if dump_dir: + self.dump_dir = dump_dir + for cmd in self.cmds: + # We can continue with the testing if serial commands fail + try: + (status, output) = self.runner.run_serial(cmd) + self._write_dump(cmd.split()[0], output) + except: + print("Tried to dump info from target but " + "serial console failed") diff --git a/meta/lib/oeqa/utils/ftools.py b/meta/lib/oeqa/utils/ftools.py new file mode 100644 index 0000000000..a7233d4ca6 --- /dev/null +++ b/meta/lib/oeqa/utils/ftools.py @@ -0,0 +1,46 @@ +import os +import re +import errno + +def write_file(path, data): + # In case data is None, return immediately + if data is None: + return + wdata = data.rstrip() + "\n" + with open(path, "w") as f: + f.write(wdata) + +def append_file(path, data): + # In case data is None, return immediately + if data is None: + return + wdata = data.rstrip() + "\n" + with open(path, "a") as f: + f.write(wdata) + +def read_file(path): + data = None + with open(path) as f: + data = f.read() + return data + +def remove_from_file(path, data): + # In case data is None, return immediately + if data is None: + return + try: + rdata = read_file(path) + except IOError as e: + # if file does not exit, just quit, otherwise raise an exception + if e.errno == errno.ENOENT: + return + else: + raise + + contents = rdata.strip().splitlines() + for r in data.strip().splitlines(): + try: + contents.remove(r) + except ValueError: + pass + write_file(path, "\n".join(contents)) diff --git a/meta/lib/oeqa/utils/git.py b/meta/lib/oeqa/utils/git.py new file mode 100644 index 0000000000..e0cb3f0db2 --- /dev/null +++ b/meta/lib/oeqa/utils/git.py @@ -0,0 +1,80 @@ +# +# Copyright (C) 2016 Intel Corporation +# +# Released under the MIT license (see COPYING.MIT) +# +"""Git repository interactions""" +import os + +from oeqa.utils.commands import runCmd + + +class GitError(Exception): + """Git error handling""" + pass + +class GitRepo(object): + """Class representing a Git repository clone""" + def __init__(self, path, is_topdir=False): + git_dir = self._run_git_cmd_at(['rev-parse', '--git-dir'], path) + git_dir = git_dir if os.path.isabs(git_dir) else os.path.join(path, git_dir) + self.git_dir = os.path.realpath(git_dir) + + if self._run_git_cmd_at(['rev-parse', '--is-bare-repository'], path) == 'true': + self.ba |
