summaryrefslogtreecommitdiff
path: root/meta/lib/oeqa/controllers/masterimage.py
blob: d151e24bd7578744db541fb1eba6897ece907dc4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# Copyright (C) 2014 Intel Corporation
#
# Released under the MIT license (see COPYING.MIT)

# This module adds support to testimage.bbclass to deploy images and run
# tests using a "master image" - this is a "known good" image that is
# installed onto the device as part of initial setup and will be booted into
# with no interaction; we can then use it to deploy the image to be tested
# to a second partition before running the tests.
#
# For an example master image, see core-image-testmaster
# (meta/recipes-extended/images/core-image-testmaster.bb)

import os
import bb
import traceback
import time
import subprocess

import oeqa.targetcontrol
import oeqa.utils.sshcontrol as sshcontrol
import oeqa.utils.commands as commands

from abc import ABCMeta, abstractmethod

class MasterImageHardwareTarget(oeqa.targetcontrol.BaseTarget):

    __metaclass__ = ABCMeta

    def __init__(self, d):
        super(MasterImageHardwareTarget, self).__init__(d)

        # target ip
        addr = d.getVar("TEST_TARGET_IP", True) or bb.fatal('Please set TEST_TARGET_IP with the IP address of the machine you want to run the tests on.')
        self.ip = addr.split(":")[0]
        try:
            self.port = addr.split(":")[1]
        except IndexError:
            self.port = None
        bb.note("Target IP: %s" % self.ip)
        self.server_ip = d.getVar("TEST_SERVER_IP", True)
        if not self.server_ip:
            try:
                self.server_ip = subprocess.check_output(['ip', 'route', 'get', self.ip ]).split("\n")[0].split()[-1]
            except Exception as e:
                bb.fatal("Failed to determine the host IP address (alternatively you can set TEST_SERVER_IP with the IP address of this machine): %s" % e)
        bb.note("Server IP: %s" % self.server_ip)

        # test rootfs + kernel
        self.rootfs = os.path.join(d.getVar("DEPLOY_DIR_IMAGE", True), d.getVar("IMAGE_LINK_NAME", True) + '.tar.gz')
        self.kernel = os.path.join(d.getVar("DEPLOY_DIR_IMAGE", True), d.getVar("KERNEL_IMAGETYPE"))
        if not os.path.isfile(self.rootfs):
            # we could've checked that IMAGE_FSTYPES contains tar.gz but the config for running testimage might not be
            # the same as the config with which the image was build, ie
            # you bitbake core-image-sato with IMAGE_FSTYPES += "tar.gz"
            # and your autobuilder overwrites the config, adds the test bits and runs bitbake core-image-sato -c testimage
            bb.fatal("No rootfs found. Did you build the image ?\nIf yes, did you build it with IMAGE_FSTYPES += \"tar.gz\" ? \
                      \nExpected path: %s" % self.rootfs)
        if not os.path.isfile(self.kernel):
            bb.fatal("No kernel found. Expected path: %s" % self.kernel)

        # master ssh connection
        self.master = None
        # if the user knows what they are doing, then by all means...
        self.user_cmds = d.getVar("TEST_DEPLOY_CMDS", True)
        self.deploy_cmds = None

        # this is the name of the command that controls the power for a board
        # e.g: TEST_POWERCONTROL_CMD = "/home/user/myscripts/powercontrol.py ${MACHINE} what-ever-other-args-the-script-wants"
        # the command should take as the last argument "off" and "on" and "cycle" (off, on)
        self.powercontrol_cmd = d.getVar("TEST_POWERCONTROL_CMD", True) or None
        self.powercontrol_args = d.getVar("TEST_POWERCONTROL_EXTRA_ARGS") or ""

        self.serialcontrol_cmd = d.getVar("TEST_SERIALCONTROL_CMD", True) or None
        self.serialcontrol_args = d.getVar("TEST_SERIALCONTROL_EXTRA_ARGS") or ""

        self.origenv = os.environ
        if self.powercontrol_cmd or self.serialcontrol_cmd:
            # the external script for controlling power might use ssh
            # ssh + keys means we need the original user env
            bborigenv = d.getVar("BB_ORIGENV", False) or {}
            for key in bborigenv:
                val = bborigenv.getVar(key, True)
                if val is not None:
                    self.origenv[key] = str(val)

        if self.powercontrol_cmd:
            if self.powercontrol_args:
                self.powercontrol_cmd = "%s %s" % (self.powercontrol_cmd, self.powercontrol_args)
        if self.serialcontrol_cmd:
            if self.serialcontrol_args:
                self.serialcontrol_cmd = "%s %s" % (self.serialcontrol_cmd, self.serialcontrol_args)

    def power_ctl(self, msg):
        if self.powercontrol_cmd:
            cmd = "%s %s" % (self.powercontrol_cmd, msg)
            commands.runCmd(cmd, preexec_fn=os.setsid, env=self.origenv)

    def power_cycle(self, conn):
        if self.powercontrol_cmd:
            # be nice, don't just cut power
            conn.run("shutdown -h now")
            time.sleep(10)
            self.power_ctl("cycle")
        else:
            status, output = conn.run("reboot")
            if status != 0:
                bb.error("Failed rebooting target and no power control command defined. You need to manually reset the device.\n%s" % output)

    def _wait_until_booted(self):
        ''' Waits until the target device has booted (if we have just power cycled it) '''
        # Subclasses with better methods of determining boot can override this
        time.sleep(120)

    def deploy(self):
        # base class just sets the ssh log file for us
        super(MasterImageHardwareTarget, self).deploy()
        self.master = sshcontrol.SSHControl(ip=self.ip, logfile=self.sshlog, timeout=600, port=self.port)
        status, output = self.master.run("cat /etc/masterimage")
        if status != 0:
            # We're not booted into the master image, so try rebooting
            bb.plain("%s - booting into the master image" % self.pn)
            self.power_ctl("cycle")
            self._wait_until_booted()

        bb.plain("%s - deploying image on target" % self.pn)
        status, output = self.master.run("cat /etc/masterimage")
        if status != 0:
            bb.fatal("No ssh connectivity or target isn't running a master image.\n%s" % output)
        if self.user_cmds:
            self.deploy_cmds = self.user_cmds.split("\n")
        try:
            self._deploy()
        except Exception as e:
            bb.fatal("Failed deploying test image: %s" % e)

    @abstractmethod
    def _deploy(self):
        pass

    def start(self, params=None):
        bb.plain("%s - boot test image on target" % self.pn)
        self._start()
        # set the ssh object for the target/test image
        self.connection = sshcontrol.SSHControl(self.ip, logfile=self.sshlog, port=self.port)
        bb.plain("%s - start running tests" % self.pn)

    @abstractmethod
    def _start(self):
        pass

    def stop(self):
        bb.plain("%s - reboot/powercycle target" % self.pn)
        self.power_cycle(self.connection)


class GummibootTarget(MasterImageHardwareTarget):

    def __init__(self, d):
        super(GummibootTarget, self).__init__(d)
        # this the value we need to set in the LoaderEntryOneShot EFI variable
        # so the system boots the 'test' bootloader label and not the default
        # The first four bytes are EFI bits, and the rest is an utf-16le string
        # (EFI vars values need to be utf-16)
        # $ echo -en "test\0" | iconv -f ascii -t utf-16le | hexdump -C
        # 00000000  74 00 65 00 73 00 74 00  00 00                    |t.e.s.t...|
        self.efivarvalue = r'\x07\x00\x00\x00\x74\x00\x65\x00\x73\x00\x74\x00\x00\x00'
        self.deploy_cmds = [
                'mount -L boot /boot',
                'mkdir -p /mnt/testrootfs',
                'mount -L testrootfs /mnt/testrootfs',
                'modprobe efivarfs',
                'mount -t efivarfs efivarfs /sys/firmware/efi/efivars',
                'cp ~/test-kernel /boot',
                'rm -rf /mnt/testrootfs/*',
                'tar xzvf ~/test-rootfs.tar.gz -C /mnt/testrootfs',
                'printf "%s" > /sys/firmware/efi/efivars/LoaderEntryOneShot-4a67b082-0a4c-41cf-b6c7-440b29bb8c4f' % self.efivarvalue
                ]

    def _deploy(self):
        # make sure these aren't mounted
        self.master.run("umount /boot; umount /mnt/testrootfs; umount /sys/firmware/efi/efivars;")
        # from now on, every deploy cmd should return 0
        # else an exception will be thrown by sshcontrol
        self.master.ignore_status = False
        self.master.copy_to(self.rootfs, "~/test-rootfs.tar.gz")
        self.master.copy_to(self.kernel, "~/test-kernel")
        for cmd in self.deploy_cmds:
            self.master.run(cmd)

    def _start(self, params=None):
        self.power_cycle(self.master)
        # there are better ways than a timeout but this should work for now
        time.sleep(120)