1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
|
#!/usr/bin/env python3
# Copyright (c) 2013 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# DESCRIPTION
# This script runs tests defined in meta/lib/oeqa/selftest/
# It's purpose is to automate the testing of different bitbake tools.
# To use it you just need to source your build environment setup script and
# add the meta-selftest layer to your BBLAYERS.
# Call the script as: "oe-selftest -a" to run all the tests in meta/lib/oeqa/selftest/
# Call the script as: "oe-selftest -r <module>.<Class>.<method>" to run just a single test
# E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py
import os
import sys
import unittest
import logging
import argparse
import subprocess
import time as t
import re
import fnmatch
import collections
import imp
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib')
import scriptpath
scriptpath.add_bitbake_lib_path()
scriptpath.add_oe_lib_path()
import argparse_oe
import oeqa.selftest
import oeqa.utils.ftools as ftools
from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer
from oeqa.utils.metadata import metadata_from_bb, write_metadata_file
from oeqa.selftest.base import oeSelfTest, get_available_machines
try:
import xmlrunner
from xmlrunner.result import _XMLTestResult as TestResult
from xmlrunner import XMLTestRunner as _TestRunner
except ImportError:
# use the base runner instead
from unittest import TextTestResult as TestResult
from unittest import TextTestRunner as _TestRunner
log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S")
def logger_create():
log_file = log_prefix + ".log"
if os.path.lexists("oe-selftest.log"):
os.remove("oe-selftest.log")
os.symlink(log_file, "oe-selftest.log")
log = logging.getLogger("selftest")
log.setLevel(logging.DEBUG)
fh = logging.FileHandler(filename=log_file, mode='w')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
log.addHandler(fh)
log.addHandler(ch)
return log
log = logger_create()
def get_args_parser():
description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information."
parser = argparse_oe.ArgumentParser(description=description)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>')
group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests')
group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.')
group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.')
parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing")
parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from")
parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement")
parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement")
group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*',
help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*',
help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
group.add_argument('-l', '--list-tests', required=False, action="store_true", dest="list_tests", default=False,
help='List all available tests.')
group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true",
help='List all tags that have been set to test cases.')
parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None,
help='Run tests on different machines (random/all).')
parser.add_argument('--repository', required=False, dest='repository', default='', action='store',
help='Submit test results to a repository')
return parser
builddir = None
def preflight_check():
global builddir
log.info("Checking that everything is in order before running the tests")
if not os.environ.get("BUILDDIR"):
log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?")
return False
builddir = os.environ.get("BUILDDIR")
if os.getcwd() != builddir:
log.info("Changing cwd to %s" % builddir)
os.chdir(builddir)
if not "meta-selftest" in get_bb_var("BBLAYERS"):
log.error("You don't seem to have the meta-selftest layer in BBLAYERS")
return False
log.info("Running bitbake -p")
runCmd("bitbake -p")
return True
def add_include():
global builddir
if "#include added by oe-selftest.py" \
not in ftools.read_file(os.path.join(builddir, "conf/local.conf")):
log.info("Adding: \"include selftest.inc\" in local.conf")
ftools.append_file(os.path.join(builddir, "conf/local.conf"), \
"\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")
if "#include added by oe-selftest.py" \
not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
log.info("Adding: \"include bblayers.inc\" in bblayers.conf")
ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \
"\n#include added by oe-selftest.py\ninclude bblayers.inc")
def remove_include():
global builddir
if builddir is None:
return
if "#include added by oe-selftest.py" \
in ftools.read_file(os.path.join(builddir, "conf/local.conf")):
log.info("Removing the include from local.conf")
ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"), \
"\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")
if "#include added by oe-selftest.py" \
in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
log.info("Removing the include from bblayers.conf")
ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \
"\n#include added by oe-selftest.py\ninclude bblayers.inc")
def remove_inc_files():
global builddir
if builddir is None:
return
try:
os.remove(os.path.join(builddir, "conf/selftest.inc"))
for root, _, files in os.walk(get_test_layer()):
for f in files:
if f == 'test_recipe.inc':
os.remove(os.path.join(root, f))
except OSError as e:
pass
for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']:
try:
os.remove(os.path.join(builddir, incl_file))
except:
pass
def get_tests_modules(include_hidden=False):
modules_list = list()
for modules_path in oeqa.selftest.__path__:
for (p, d, f) in os.walk(modules_path):
files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py'])
for f in files:
submodules = p.split("selftest")[-1]
module = ""
if submodules:
module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0]
else:
module = 'oeqa.selftest.' + f.split('.py')[0]
if module not in modules_list:
modules_list.append(module)
return modules_list
def get_tests(exclusive_modules=[], include_hidden=False):
test_modules = list()
for x in exclusive_modules:
test_modules.append('oeqa.selftest.' + x)
if not test_modules:
inc_hidden = include_hidden
test_modules = get_tests_modules(inc_hidden)
return test_modules
class Tc:
def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None):
self.tcname = tcname
self.tcclass = tcclass
self.tcmodule = tcmodule
self.tcid = tcid
# A test case can have multiple tags (as tuples) otherwise str will suffice
self.tctag = tctag
self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname])
def get_tests_from_module(tmod):
tlist = []
prefix = 'oeqa.selftest.'
try:
import importlib
modlib = importlib.import_module(tmod)
for mod in list(vars(modlib).values()):
if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest:
for test in dir(mod):
if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'):
# Get test case id and feature tag
# NOTE: if testcase decorator or feature tag not set will throw error
try:
tid = vars(mod)[test].test_case
except:
print('DEBUG: tc id missing for ' + str(test))
tid = None
try:
ttag = vars(mod)[test].tag__feature
except:
# print('DEBUG: feature tag missing for ' + str(test))
ttag = None
# NOTE: for some reason lstrip() doesn't work for mod.__module__
tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag))
except:
pass
return tlist
def get_all_tests():
# Get all the test modules (except the hidden ones)
testlist = []
tests_modules = get_tests_modules()
# Get all the tests from modules
for tmod in sorted(tests_modules):
testlist += get_tests_from_module(tmod)
return testlist
def get_testsuite_by(criteria, keyword):
# Get a testsuite based on 'keyword'
# criteria: name, class, module, id, tag
# keyword: a list of tests, classes, modules, ids, tags
ts = []
all_tests = get_all_tests()
def get_matches(values):
# Get an item and return the ones that match with keyword(s)
# values: the list of items (names, modules, classes...)
result = []
remaining = values[:]
for key in keyword:
found = False
if key in remaining:
# Regular matching of exact item
result.append(key)
remaining.remove(key)
found = True
else:
# Wildcard matching
pattern = re.compile(fnmatch.translate(r"%s" % key))
added = [x for x in remaining if pattern.match(x)]
if added:
result.extend(added)
remaining = [x for x in remaining if x not in added]
found = True
if not found:
log.error("Failed to find test: %s" % key)
return result
if criteria == 'name':
names = get_matches([ tc.tcname for tc in all_tests ])
ts = [ tc for tc in all_tests if tc.tcname in names ]
elif criteria == 'class':
classes = get_matches([ tc.tcclass for tc in all_tests ])
ts = [ tc for tc in all_tests if tc.tcclass in classes ]
elif criteria == 'module':
modules = get_matches([ tc.tcmodule for tc in all_tests ])
ts = [ tc for tc in all_tests if tc.tcmodule in modules ]
elif criteria == 'id':
ids = get_matches([ str(tc.tcid) for tc in all_tests ])
ts = [ tc for tc in all_tests if str(tc.tcid) in ids ]
elif criteria == 'tag':
values = set()
for tc in all_tests:
# tc can have multiple tags (as tuple) otherwise str will suffice
if isinstance(tc.tctag, tuple):
values |= { str(tag) for tag in tc.tctag }
else:
values.add(str(tc.tctag))
tags = get_matches(list(values))
for tc in all_tests:
for tag in tags:
if isinstance(tc.tctag, tuple) and tag in tc.tctag:
ts.append(tc)
elif tag == tc.tctag:
ts.append(tc)
# Remove duplicates from the list
ts = list(set(ts))
return ts
def list_testsuite_by(criteria, keyword):
# Get a testsuite based on 'keyword'
# criteria: name, class, module, id, tag
# keyword: a list of tests, classes, modules, ids, tags
def tc_key(t):
if t[0] is None:
return (0,) + t[1:]
return t
# tcid may be None if no ID was assigned, in which case sorted() will throw
# a TypeError as Python 3 does not allow comparison (<,<=,>=,>) of
# heterogeneous types, handle this by using a custom key generator
ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) \
for tc in get_testsuite_by(criteria, keyword) ], key=tc_key)
print('_' * 150)
for t in ts:
if isinstance(t[1], (tuple, list)):
print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4]))
else:
print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t)
print('_' * 150)
print('Filtering by:\t %s' % criteria)
print('Looking for:\t %s' % ', '.join(str(x) for x in keyword))
print('Total found:\t %s' % len(ts))
def list_tests():
# List all available oe-selftest tests
ts = get_all_tests()
print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test'))
print('_' * 80)
for t in ts:
if isinstance(t.tctag, (tuple, list)):
print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname])))
else:
print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname])))
print('_' * 80)
print('Total found:\t %s' % len(ts))
def list_tags():
# Get all tags set to test cases
# This is useful when setting tags to test cases
# The list of tags should be kept as minimal as possible
tags = set()
all_tests = get_all_tests()
for tc in all_tests:
if isinstance(tc.tctag, (tuple, list)):
tags.update(set(tc.tctag))
else:
tags.add(tc.tctag)
print('Tags:\t%s' % ', '.join(str(x) for x in tags))
def coverage_setup(coverage_source, coverage_include, coverage_omit):
""" Set up the coverage measurement for the testcases to be run """
import datetime
import subprocess
global builddir
pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8')
coveragerc = "%s/.coveragerc" % builddir
data_file = "%s/.coverage." % builddir
data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
if os.path.isfile(data_file):
os.remove(data_file)
with open(coveragerc, 'w') as cps:
cps.write("# Generated with command '%s'\n" % " ".join(sys.argv))
cps.write("# HEAD commit %s\n" % curcommit.strip())
cps.write("[run]\n")
cps.write("data_file = %s\n" % data_file)
cps.write("branch = True\n")
# Measure just BBLAYERS, scripts and bitbake folders
cps.write("source = \n")
if coverage_source:
for directory in coverage_source:
if not os.path.isdir(directory):
log.warn("Directory %s is not valid.", directory)
cps.write(" %s\n" % directory)
else:
for layer in get_bb_var('BBLAYERS').split():
cps.write(" %s\n" % layer)
cps.write(" %s\n" % os.path.dirname(os.path.realpath(__file__)))
cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake'))
if coverage_include:
cps.write("include = \n")
for pattern in coverage_include:
cps.write(" %s\n" % pattern)
if coverage_omit:
cps.write("omit = \n")
for pattern in coverage_omit:
cps.write(" %s\n" % pattern)
return coveragerc
def coverage_report():
""" Loads the coverage data gathered and reports it back """
try:
# Coverage4 uses coverage.Coverage
from coverage import Coverage
except:
# Coverage under version 4 uses coverage.coverage
from coverage import coverage as Coverage
import io as StringIO
from coverage.misc import CoverageException
cov_output = StringIO.StringIO()
# Creating the coverage data with the setting from the configuration file
cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START'))
try:
# Load data from the data file specified in the configuration
cov.load()
# Store report data in a StringIO variable
cov.report(file = cov_output, show_missing=False)
log.info("\n%s" % cov_output.getvalue())
except CoverageException as e:
# Show problems with the reporting. Since Coverage4 not finding any data to report raises an exception
log.warn("%s" % str(e))
finally:
cov_output.close()
def main():
parser = get_args_parser()
args = parser.parse_args()
# Add <layer>/lib to sys.path, so layers can add selftests
log.info("Running bitbake -e to get BBPATH")
bbpath = get_bb_var('BBPATH').split(':')
layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
sys.path.extend(layer_libdirs)
imp.reload(oeqa.selftest)
# act like bitbake and enforce en_US.UTF-8 locale
os.environ["LC_ALL"] = "en_US.UTF-8"
if args.run_tests_by and len(args.run_tests_by) >= 2:
valid_options = ['name', 'class', 'module', 'id', 'tag']
if args.run_tests_by[0] not in valid_options:
print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0])
return 1
else:
criteria = args.run_tests_by[0]
keyword = args.run_tests_by[1:]
ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ])
if not ts:
return 1
if args.list_tests_by and len(args.list_tests_by) >= 2:
valid_options = ['name', 'class', 'module', 'id', 'tag']
if args.list_tests_by[0] not in valid_options:
print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0])
return 1
else:
criteria = args.list_tests_by[0]
keyword = args.list_tests_by[1:]
list_testsuite_by(criteria, keyword)
if args.list_tests:
list_tests()
if args.list_tags:
list_tags()
if args.list_allclasses:
args.list_modules = True
if args.list_modules:
log.info('Listing all available test modules:')
testslist = get_tests(include_hidden=True)
for test in testslist:
module = test.split('oeqa.selftest.')[-1]
info = ''
if module.startswith('_'):
info = ' (hidden)'
print(module + info)
if args.list_allclasses:
try:
import importlib
modlib = importlib.import_module(test)
for v in vars(modlib):
t = vars(modlib)[v]
if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
print(" --", v)
for method in dir(t):
if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable):
print(" -- --", method)
except (AttributeError, ImportError) as e:
print(e)
pass
if args.run_tests or args.run_all_tests or args.run_tests_by:
if not preflight_check():
return 1
if args.run_tests_by:
testslist = ts
else:
testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False)
suite = unittest.TestSuite()
loader = unittest.TestLoader()
loader.sortTestMethodsUsing = None
runner = TestRunner(verbosity=2,
resultclass=buildResultClass(args))
# we need to do this here, otherwise just loading the tests
# will take 2 minutes (bitbake -e calls)
oeSelfTest.testlayer_path = get_test_layer()
for test in testslist:
log.info("Loading tests from: %s" % test)
try:
suite.addTests(loader.loadTestsFromName(test))
except AttributeError as e:
log.error("Failed to import %s" % test)
log.error(e)
return 1
add_include()
if args.machine:
# Custom machine sets only weak default values (??=) for MACHINE in machine.inc
# This let test cases that require a specific MACHINE to be able to override it, using (?= or =)
log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine)
if args.machine == 'random':
os.environ['CUSTOMMACHINE'] = 'random'
result = runner.run(suite)
else: # all
machines = get_available_machines()
for m in machines:
log.info('Run tests with custom MACHINE set to: %s' % m)
os.environ['CUSTOMMACHINE'] = m
result = runner.run(suite)
else:
result = runner.run(suite)
log.info("Finished")
if args.repository:
import git
# Commit tests results to repository
metadata = metadata_from_bb()
git_dir = os.path.join(os.getcwd(), 'selftest')
if not os.path.isdir(git_dir):
os.mkdir(git_dir)
log.debug('Checking for git repository in %s' % git_dir)
try:
repo = git.Repo(git_dir)
except git.exc.InvalidGitRepositoryError:
log.debug("Couldn't find git repository %s; "
"cloning from %s" % (git_dir, args.repository))
repo = git.Repo.clone_from(args.repository, git_dir)
r_branches = repo.git.branch(r=True)
r_branches = set(r_branches.replace('origin/', '').split())
l_branches = {str(branch) for branch in repo.branches}
branch = '%s/%s/%s' % (metadata['hostname'],
metadata['layers']['meta']['branch'],
metadata['machine'])
if branch in l_branches:
log.debug('Found branch in local repository, checking out')
repo.git.checkout(branch)
elif branch in r_branches:
log.debug('Found branch in remote repository, checking'
' out and pulling')
repo.git.checkout(branch)
repo.git.pull()
else:
log.debug('New branch %s' % branch)
repo.git.checkout('master')
repo.git.checkout(b=branch)
cleanResultsDir(repo)
xml_dir = os.path.join(os.getcwd(), log_prefix)
copyResultFiles(xml_dir, git_dir, repo)
metadata_file = os.path.join(git_dir, 'metadata.xml')
write_metadata_file(metadata_file, metadata)
repo.index.add([metadata_file])
repo.index.write()
# Get information for commit message
layer_info = ''
for layer, values in metadata['layers'].items():
layer_info = '%s%-17s = %s:%s\n' % (layer_info, layer,
values['branch'], values['revision'])
msg = 'Selftest for build %s of %s for machine %s on %s\n\n%s' % (
log_prefix[12:], metadata['distro']['pretty_name'],
metadata['machine'], metadata['hostname'], layer_info)
log.debug('Commiting results to local repository')
repo.index.commit(msg)
if not repo.is_dirty():
try:
if branch in r_branches:
log.debug('Pushing changes to remote repository')
repo.git.push()
else:
log.debug('Pushing changes to remote repository '
'creating new branch')
repo.git.push('-u', 'origin', branch)
except GitCommandError:
log.error('Falied to push to remote repository')
return 1
else:
log.error('Local repository is dirty, not pushing commits')
if result.wasSuccessful():
return 0
else:
return 1
def buildResultClass(args):
"""Build a Result Class to use in the testcase execution"""
import site
class StampedResult(TestResult):
"""
Custom TestResult that prints the time when a test starts. As oe-selftest
can take a long time (ie a few hours) to run, timestamps help us understand
what tests are taking a long time to execute.
If coverage is required, this class executes the coverage setup and reporting.
"""
def startTest(self, test):
import time
self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ")
super(StampedResult, self).startTest(test)
def startTestRun(self):
""" Setup coverage before running any testcase """
# variable holding the coverage configuration file allowing subprocess to be measured
self.coveragepth = None
# indicates the system if coverage is currently installed
self.coverage_installed = True
if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
try:
# check if user can do coverage
import coverage
except:
log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage")
self.coverage_installed = False
if self.coverage_installed:
log.info("Coverage is enabled")
major_version = int(coverage.version.__version__[0])
if major_version < 4:
log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__)
self.stop()
# In case the user has not set the variable COVERAGE_PROCESS_START,
# create a default one and export it. The COVERAGE_PROCESS_START
# value indicates where the coverage configuration file resides
# More info on https://pypi.python.org/pypi/coverage
if not os.environ.get('COVERAGE_PROCESS_START'):
os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit)
# Use default site.USER_SITE and write corresponding config file
site.ENABLE_USER_SITE = True
if not os.path.exists(site.USER_SITE):
os.makedirs(site.USER_SITE)
self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth")
with open(self.coveragepth, 'w') as cps:
cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();')
def stopTestRun(self):
""" Report coverage data after the testcases are run """
if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
if self.coverage_installed:
with open(os.environ['COVERAGE_PROCESS_START']) as ccf:
log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START'))
log.info("===========================")
log.info("\n%s" % "".join(ccf.readlines()))
log.info("Coverage Report")
log.info("===============")
try:
coverage_report()
finally:
# remove the pth file
try:
os.remove(self.coveragepth)
except OSError:
log.warn("Expected temporal file from coverage is missing, ignoring removal.")
return StampedResult
def cleanResultsDir(repo):
""" Remove result files from directory """
xml_files = []
directory = repo.working_tree_dir
for f in os.listdir(directory):
path = os.path.join(directory, f)
if os.path.isfile(path) and path.endswith('.xml'):
xml_files.append(f)
repo.index.remove(xml_files, working_tree=True)
def copyResultFiles(src, dst, repo):
""" Copy result files from src to dst removing the time stamp. """
import shutil
re_time = re.compile("-[0-9]+")
file_list = []
for root, subdirs, files in os.walk(src):
tmp_dir = root.replace(src, '').lstrip('/')
for s in subdirs:
os.mkdir(os.path.join(dst, tmp_dir, s))
for f in files:
file_name = os.path.join(dst, tmp_dir, re_time.sub("", f))
shutil.copy2(os.path.join(root, f), file_name)
file_list.append(file_name)
repo.index.add(file_list)
class TestRunner(_TestRunner):
"""Test runner class aware of exporting tests."""
def __init__(self, *args, **kwargs):
try:
exportdir = os.path.join(os.getcwd(), log_prefix)
kwargsx = dict(**kwargs)
# argument specific to XMLTestRunner, if adding a new runner then
# also add logic to use other runner's args.
kwargsx['output'] = exportdir
kwargsx['descriptions'] = False
# done for the case where telling the runner where to export
super(TestRunner, self).__init__(*args, **kwargsx)
except TypeError:
log.info("test runner init'ed like unittest")
super(TestRunner, self).__init__(*args, **kwargs)
if __name__ == "__main__":
try:
ret = main()
except Exception:
ret = 1
import traceback
traceback.print_exc()
finally:
remove_include()
remove_inc_files()
sys.exit(ret)
|