diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
| -rw-r--r-- | bitbake/lib/bb/runqueue.py | 341 | 
1 files changed, 250 insertions, 91 deletions
| diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index cce5da4057..c3ad442e47 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -37,20 +37,38 @@ class RunQueueStats:      """      Holds statistics on the tasks handled by the associated runQueue      """ -    def __init__(self): +    def __init__(self, total):          self.completed = 0          self.skipped = 0          self.failed = 0 +        self.active = 0 +        self.total = total      def taskFailed(self): +        self.active = self.active - 1          self.failed = self.failed + 1      def taskCompleted(self, number = 1): +        self.active = self.active - number          self.completed = self.completed + number      def taskSkipped(self, number = 1): +        self.active = self.active + number          self.skipped = self.skipped + number +    def taskActive(self): +        self.active = self.active + 1 + +# These values indicate the next step due to be run in the  +# runQueue state machine +runQueuePrepare = 2 +runQueueRunInit = 3 +runQueueRunning = 4 +runQueueFailed = 6 +runQueueCleanUp = 7 +runQueueComplete = 8 +runQueueChildProcess = 9 +  class RunQueueScheduler:      """      Control the order tasks are scheduled in. @@ -142,9 +160,9 @@ class RunQueue:          self.cooker = cooker          self.dataCache = dataCache          self.taskData = taskData +        self.cfgData = cfgData          self.targets = targets -        self.cfgdata = cfgData          self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)          self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()          self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" @@ -152,12 +170,13 @@ class RunQueue:          self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""      def reset_runqueue(self): -          self.runq_fnid = []          self.runq_task = []          self.runq_depends = []          self.runq_revdeps = [] +        self.state = runQueuePrepare +      def get_user_idstring(self, task):          fn = self.taskData.fn_index[self.runq_fnid[task]]          taskname = self.runq_task[task] @@ -653,6 +672,8 @@ class RunQueue:          #self.dump_data(taskData) +        self.state = runQueueRunInit +      def check_stamps(self):          unchecked = {}          current = [] @@ -796,39 +817,51 @@ class RunQueue:          (if the abort on failure configuration option isn't set)          """ -        failures = 0 -        while 1: -            failed_fnids = [] -            try: -                self.execute_runqueue_internal() -            finally: -                if self.master_process: -                    failed_fnids = self.finish_runqueue() -            if len(failed_fnids) == 0: -                return failures +        if self.state is runQueuePrepare: +            self.prepare_runqueue() + +        if self.state is runQueueRunInit: +            bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") +            self.execute_runqueue_initVars() + +        if self.state is runQueueRunning: +            self.execute_runqueue_internal() + +        if self.state is runQueueCleanUp: +            self.finish_runqueue() + +        if self.state is runQueueFailed:              if not self.taskData.tryaltconfigs: -                raise bb.runqueue.TaskFailure(failed_fnids) -            for fnid in failed_fnids: -                #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid]) +                raise bb.runqueue.TaskFailure(self.failed_fnids) +            for fnid in self.failed_fnids:                  self.taskData.fail_fnid(fnid) -                failures = failures + 1              self.reset_runqueue() -            self.prepare_runqueue() + +        if self.state is runQueueComplete: +            # All done +            bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) +            return False + +        if self.state is runQueueChildProcess: +            print "Child process" +            return False + +        # Loop +        return True      def execute_runqueue_initVars(self): -        self.stats = RunQueueStats() +        self.stats = RunQueueStats(len(self.runq_fnid)) -        self.active_builds = 0          self.runq_buildable = []          self.runq_running = []          self.runq_complete = []          self.build_pids = {} +        self.build_pipes = {}          self.failed_fnids = [] -        self.master_process = True          # Mark initial buildable tasks -        for task in range(len(self.runq_fnid)): +        for task in range(self.stats.total):              self.runq_running.append(0)              self.runq_complete.append(0)              if len(self.runq_depends[task]) == 0: @@ -836,6 +869,10 @@ class RunQueue:              else:                  self.runq_buildable.append(0) +        self.state = runQueueRunning + +        event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) +      def task_complete(self, task):          """          Mark a task as completed @@ -858,26 +895,32 @@ class RunQueue:                  taskname = self.runq_task[revdep]                  bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) +    def task_fail(self, task, exitcode): +        """ +        Called when a task has failed +        Updates the state engine with the failure +        """ +        bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode)) +        self.stats.taskFailed() +        fnid = self.runq_fnid[task] +        self.failed_fnids.append(fnid) +        bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) +        if self.taskData.abort: +            self.state = runQueueCleanup +      def execute_runqueue_internal(self):          """          Run the tasks in a queue prepared by prepare_runqueue          """ -        bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") - -        self.execute_runqueue_initVars() - -        if len(self.runq_fnid) == 0: +        if self.stats.total == 0:              # nothing to do -            return [] - -        def sigint_handler(signum, frame): -            raise KeyboardInterrupt - -        event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata)) +            self.state = runQueueCleanup          while True: -            task = self.sched.next() +            task = None +            if self.stats.active < self.number_tasks: +                task = self.sched.next()              if task is not None:                  fn = self.taskData.fn_index[self.runq_fnid[task]] @@ -885,107 +928,143 @@ class RunQueue:                  if self.check_stamp_task(task):                      bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))                      self.runq_running[task] = 1 +                    self.runq_buildable[task] = 1                      self.task_complete(task)                      self.stats.taskCompleted()                      self.stats.taskSkipped()                      continue -                bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))                  sys.stdout.flush()                  sys.stderr.flush() -                try:  +                try: +                    pipein, pipeout = os.pipe()                      pid = os.fork()                   except OSError, e:                       bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))                  if pid == 0: -                    # Bypass master process' handling -                    self.master_process = False -                    # Stop Ctrl+C being sent to children -                    # signal.signal(signal.SIGINT, signal.SIG_IGN) +                    os.close(pipein) +                    # Save out the PID so that the event can include it the +                    # events +                    bb.event.worker_pid = os.getpid() +                    bb.event.worker_pipe = pipeout + +                    self.state = runQueueChildProcess                      # Make the child the process group leader                      os.setpgid(0, 0) +                    # No stdin                      newsi = os.open('/dev/null', os.O_RDWR)                      os.dup2(newsi, sys.stdin.fileno()) -                    self.cooker.configuration.cmd = taskname[3:] + +                    bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) +                    bb.msg.note(1, bb.msg.domain.RunQueue, +                                "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, +                                                                        self.stats.total, +                                                                        task, +                                                                        self.get_user_idstring(task))) +                      bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)                      try: -                        self.cooker.tryBuild(fn) +                        self.cooker.tryBuild(fn, taskname[3:])                      except bb.build.EventException:                          bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") -                        sys.exit(1) +                        os._exit(1)                      except:                          bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") -                        raise -                    sys.exit(0) +                        os._exit(1) +                    os._exit(0) +                  self.build_pids[pid] = task +                self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)                  self.runq_running[task] = 1 -                self.active_builds = self.active_builds + 1 -                if self.active_builds < self.number_tasks: +                self.stats.taskActive() +                if self.stats.active < self.number_tasks:                      continue -            if self.active_builds > 0: -                result = os.waitpid(-1, 0) -                self.active_builds = self.active_builds - 1 + +            for pipe in self.build_pipes: +                self.build_pipes[pipe].read() + +            if self.stats.active > 0: +                result = os.waitpid(-1, os.WNOHANG) +                if result[0] is 0 and result[1] is 0: +                    return                  task = self.build_pids[result[0]] +                del self.build_pids[result[0]] +                self.build_pipes[result[0]].close() +                del self.build_pipes[result[0]]                  if result[1] != 0: -                    del self.build_pids[result[0]] -                    bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) -                    self.failed_fnids.append(self.runq_fnid[task]) -                    self.stats.taskFailed() -                    if not self.taskData.abort: -                        continue -                    break +                    self.task_fail(task, result[1]) +                    return                  self.task_complete(task)                  self.stats.taskCompleted() -                del self.build_pids[result[0]] +                bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)                  continue + +            if len(self.failed_fnids) != 0: +                self.state = runQueueFailed +                return + +            # Sanity Checks +            for task in range(self.stats.total): +                if self.runq_buildable[task] == 0: +                    bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) +                if self.runq_running[task] == 0: +                    bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) +                if self.runq_complete[task] == 0: +                    bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) +            self.state = runQueueComplete              return -    def finish_runqueue(self): +    def finish_runqueue_now(self): +        bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) +        for k, v in self.build_pids.iteritems(): +             try: +                 os.kill(-k, signal.SIGINT) +             except: +                 pass +        for pipe in self.build_pipes: +            self.build_pipes[pipe].read() + +    def finish_runqueue(self, now = False): +        self.state = runQueueCleanUp +        if now: +            self.finish_runqueue_now()          try: -            while self.active_builds > 0: -                bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds) +            while self.stats.active > 0: +                bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) +                bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)                  tasknum = 1                  for k, v in self.build_pids.iteritems(): -                     bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) -                     tasknum = tasknum + 1 -                result = os.waitpid(-1, 0) +                    bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) +                    tasknum = tasknum + 1 +                result = os.waitpid(-1, os.WNOHANG) +                if result[0] is 0 and result[1] is 0: +                    return                  task = self.build_pids[result[0]] -                if result[1] != 0: -                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) -                     self.failed_fnids.append(self.runq_fnid[task]) -                     self.stats.taskFailed()                  del self.build_pids[result[0]] -                self.active_builds = self.active_builds - 1 -            bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) -            return self.failed_fnids -        except KeyboardInterrupt: -            bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds) -            for k, v in self.build_pids.iteritems(): -                 try: -                     os.kill(-k, signal.SIGINT) -                 except: -                     pass +                self.build_pipes[result[0]].close() +                del self.build_pipes[result[0]] +                if result[1] != 0: +                    self.task_fail(task, result[1]) +                else: +                    self.stats.taskCompleted() +                    bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) +        except: +            self.finish_runqueue_now()              raise -        # Sanity Checks -        for task in range(len(self.runq_fnid)): -            if self.runq_buildable[task] == 0: -                bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) -            if self.runq_running[task] == 0: -                bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) -            if self.runq_complete[task] == 0: -                bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) - -        bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) +        if len(self.failed_fnids) != 0: +            self.state = runQueueFailed +            return -        return self.failed_fnids +        self.state = runQueueComplete +        return      def dump_data(self, taskQueue):          """          Dump some debug information on the internal data structures          """          bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") -        for task in range(len(self.runq_fnid)): +        for task in range(len(self.runq_task)):                  bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task,                           taskQueue.fn_index[self.runq_fnid[task]],                           self.runq_task[task],  @@ -994,7 +1073,7 @@ class RunQueue:                          self.runq_revdeps[task]))          bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") -        for task1 in range(len(self.runq_fnid)): +        for task1 in range(len(self.runq_task)):              if task1 in self.prio_map:                  task = self.prio_map[task1]                  bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task,  @@ -1005,6 +1084,58 @@ class RunQueue:                          self.runq_revdeps[task])) +class TaskFailure(Exception): +    """ +    Exception raised when a task in a runqueue fails +    """ +    def __init__(self, x):  +        self.args = x + + +class runQueueExitWait(bb.event.Event): +    """ +    Event when waiting for task processes to exit +    """ + +    def __init__(self, remain): +        self.remain = remain +        self.message = "Waiting for %s active tasks to finish" % remain +        bb.event.Event.__init__(self) + +class runQueueEvent(bb.event.Event): +    """ +    Base runQueue event class +    """ +    def __init__(self, task, stats, rq): +        self.taskid = task +        self.taskstring = rq.get_user_idstring(task) +        self.stats = stats +        bb.event.Event.__init__(self) + +class runQueueTaskStarted(runQueueEvent): +    """ +    Event notifing a task was started +    """ +    def __init__(self, task, stats, rq): +        runQueueEvent.__init__(self, task, stats, rq) +        self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) + +class runQueueTaskFailed(runQueueEvent): +    """ +    Event notifing a task failed +    """ +    def __init__(self, task, stats, rq): +        runQueueEvent.__init__(self, task, stats, rq) +        self.message = "Task %s failed (%s)" % (task, self.taskstring) + +class runQueueTaskCompleted(runQueueEvent): +    """ +    Event notifing a task completed +    """ +    def __init__(self, task, stats, rq): +        runQueueEvent.__init__(self, task, stats, rq) +        self.message = "Task %s completed (%s)" % (task, self.taskstring) +  def check_stamp_fn(fn, taskname, d):      rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)      fnid = rq.taskData.getfn_id(fn) @@ -1013,3 +1144,31 @@ def check_stamp_fn(fn, taskname, d):          return rq.check_stamp_task(taskid)      return None +class runQueuePipe(): +    """ +    Abstraction for a pipe between a worker thread and the server +    """ +    def __init__(self, pipein, pipeout, d): +        self.fd = pipein +        os.close(pipeout) +        self.queue = "" +        self.d = d + +    def read(self): +        start = len(self.queue) +        self.queue = self.queue + os.read(self.fd, 1024) +        end = len(self.queue) +        index = self.queue.find("</event>") +        while index != -1: +            bb.event.fire_from_worker(self.queue[:index+8], self.d) +            self.queue = self.queue[index+8:] +            index = self.queue.find("</event>") +        return (end > start) + +    def close(self): +        while self.read(): +            continue +        if len(self.queue) > 0: +            print "Warning, worker left partial message" +        os.close(self.fd) + | 
