| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
 | import unittest, os, shutil
from oeqa.oetest import oeRuntimeTest, skipModule
from oeqa.utils.decorators import *
from oeqa.utils.logparser import *
from oeqa.utils.httpserver import HTTPService
import bb
import glob
from oe.package_manager import RpmPkgsList
import subprocess
def setUpModule():
    if not oeRuntimeTest.hasFeature("package-management"):
        skipModule("Image doesn't have package management feature")
    if not oeRuntimeTest.hasPackage("smart"):
        skipModule("Image doesn't have smart installed")
    if "package_rpm" != oeRuntimeTest.tc.d.getVar("PACKAGE_CLASSES", True).split()[0]:
        skipModule("Rpm is not the primary package manager")
class PtestRunnerTest(oeRuntimeTest):
    # a ptest log parser
    def parse_ptest(self, logfile):
        parser = Lparser(test_0_pass_regex="^PASS:(.+)", test_0_fail_regex="^FAIL:(.+)", section_0_begin_regex="^BEGIN: .*/(.+)/ptest", section_0_end_regex="^END: .*/(.+)/ptest")
        parser.init()
        result = Result()
        with open(logfile) as f:
            for line in f:
                result_tuple = parser.parse_line(line)
                if not result_tuple:
                    continue
                result_tuple = line_type, category, status, name = parser.parse_line(line)
                if line_type == 'section' and status == 'begin':
                    current_section = name
                    continue
                if line_type == 'section' and status == 'end':
                    current_section = None
                    continue
                if line_type == 'test' and status == 'pass':
                    result.store(current_section, name, status)
                    continue
                if line_type == 'test' and status == 'fail':
                    result.store(current_section, name, status)
                    continue
        result.sort_tests()
        return result
    @classmethod
    def setUpClass(self):
        #note the existing channels that are on the board before creating new ones
#        self.existingchannels = set()
#        (status, result) = oeRuntimeTest.tc.target.run('smart channel --show | grep "\["', 0)
#        for x in result.split("\n"):
#            self.existingchannels.add(x)
        self.repo_server = HTTPService(oeRuntimeTest.tc.d.getVar('DEPLOY_DIR', True), oeRuntimeTest.tc.target.server_ip)
        self.repo_server.start()
    @classmethod
    def tearDownClass(self):
        self.repo_server.stop()
        #remove created channels to be able to repeat the tests on same image
#        (status, result) = oeRuntimeTest.tc.target.run('smart channel --show | grep "\["', 0)
#        for x in result.split("\n"):
#            if x not in self.existingchannels:
#                oeRuntimeTest.tc.target.run('smart channel --remove '+x[1:-1]+' -y', 0)
    def add_smart_channel(self):
        image_pkgtype = self.tc.d.getVar('IMAGE_PKGTYPE', True)
        deploy_url = 'http://%s:%s/%s' %(self.target.server_ip, self.repo_server.port, image_pkgtype)
        pkgarchs = self.tc.d.getVar('PACKAGE_ARCHS', True).replace("-","_").split()
        for arch in os.listdir('%s/%s' % (self.repo_server.root_dir, image_pkgtype)):
            if arch in pkgarchs:
                self.target.run('smart channel -y --add {a} type=rpm-md baseurl={u}/{a}'.format(a=arch, u=deploy_url), 0)
        self.target.run('smart update', 0)
    def install_complementary(self, globs=None):
        installed_pkgs_file = os.path.join(oeRuntimeTest.tc.d.getVar('WORKDIR', True),
                                           "installed_pkgs.txt")
        self.pkgs_list = RpmPkgsList(oeRuntimeTest.tc.d, oeRuntimeTest.tc.d.getVar('IMAGE_ROOTFS', True), oeRuntimeTest.tc.d.getVar('arch_var', True), oeRuntimeTest.tc.d.getVar('os_var', True))
        with open(installed_pkgs_file, "w+") as installed_pkgs:
            installed_pkgs.write(self.pkgs_list.list("arch"))
        cmd = [bb.utils.which(os.getenv('PATH'), "oe-pkgdata-util"),
               "-p", oeRuntimeTest.tc.d.getVar('PKGDATA_DIR', True), "glob", installed_pkgs_file,
               globs]
        try:
            bb.note("Installing complementary packages ...")
            complementary_pkgs = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError as e:
            bb.fatal("Could not compute complementary packages list. Command "
                     "'%s' returned %d:\n%s" %
                     (' '.join(cmd), e.returncode, e.output))
        return complementary_pkgs.split()
    def setUp(self):
        self.ptest_log = os.path.join(oeRuntimeTest.tc.d.getVar("TEST_LOG_DIR",True), "ptest-%s.log" % oeRuntimeTest.tc.d.getVar('DATETIME', True))
    @skipUnlessPassed('test_ssh')
    def test_ptestrunner(self):
        self.add_smart_channel()
        (runnerstatus, result) = self.target.run('which ptest-runner', 0)
        cond = oeRuntimeTest.hasPackage("ptest-runner") and oeRuntimeTest.hasFeature("ptest") and oeRuntimeTest.hasPackage("-ptest") and (runnerstatus != 0)
        if cond:
            self.install_packages(self.install_complementary("*-ptest"))
            self.install_packages(['ptest-runner'])
        (runnerstatus, result) = self.target.run('/usr/bin/ptest-runner > /tmp/ptest.log 2>&1', 0)
        #exit code is !=0 even if ptest-runner executes because some ptest tests fail.
        self.assertTrue(runnerstatus != 127, msg="Cannot execute ptest-runner!")
        self.target.copy_from('/tmp/ptest.log', self.ptest_log)
        shutil.copyfile(self.ptest_log, "ptest.log")
        result = self.parse_ptest("ptest.log")
        log_results_to_location = "./results"
        if os.path.exists(log_results_to_location):
            shutil.rmtree(log_results_to_location)
        os.makedirs(log_results_to_location)
        result.log_as_files(log_results_to_location, test_status = ['pass','fail'])
 |