diff options
author | Richard Purdie <rpurdie@linux.intel.com> | 2010-01-20 18:46:02 +0000 |
---|---|---|
committer | Richard Purdie <rpurdie@linux.intel.com> | 2010-01-20 18:46:02 +0000 |
commit | 22c29d8651668195f72e2f6a8e059d625eb511c3 (patch) | |
tree | dd1dd43f0ec47a9964c8a766eb8b3ad75cf51a64 /bitbake/lib/bb/runqueue.py | |
parent | 1bfd6edef9db9c9175058ae801d1b601e4f15263 (diff) | |
download | openembedded-core-22c29d8651668195f72e2f6a8e059d625eb511c3.tar.gz openembedded-core-22c29d8651668195f72e2f6a8e059d625eb511c3.tar.bz2 openembedded-core-22c29d8651668195f72e2f6a8e059d625eb511c3.zip |
bitbake: Switch to bitbake-dev version (bitbake master upstream)
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 341 |
1 files changed, 250 insertions, 91 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index cce5da4057..c3ad442e47 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -37,20 +37,38 @@ class RunQueueStats: """ Holds statistics on the tasks handled by the associated runQueue """ - def __init__(self): + def __init__(self, total): self.completed = 0 self.skipped = 0 self.failed = 0 + self.active = 0 + self.total = total def taskFailed(self): + self.active = self.active - 1 self.failed = self.failed + 1 def taskCompleted(self, number = 1): + self.active = self.active - number self.completed = self.completed + number def taskSkipped(self, number = 1): + self.active = self.active + number self.skipped = self.skipped + number + def taskActive(self): + self.active = self.active + 1 + +# These values indicate the next step due to be run in the +# runQueue state machine +runQueuePrepare = 2 +runQueueRunInit = 3 +runQueueRunning = 4 +runQueueFailed = 6 +runQueueCleanUp = 7 +runQueueComplete = 8 +runQueueChildProcess = 9 + class RunQueueScheduler: """ Control the order tasks are scheduled in. @@ -142,9 +160,9 @@ class RunQueue: self.cooker = cooker self.dataCache = dataCache self.taskData = taskData + self.cfgData = cfgData self.targets = targets - self.cfgdata = cfgData self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" @@ -152,12 +170,13 @@ class RunQueue: self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" def reset_runqueue(self): - self.runq_fnid = [] self.runq_task = [] self.runq_depends = [] self.runq_revdeps = [] + self.state = runQueuePrepare + def get_user_idstring(self, task): fn = self.taskData.fn_index[self.runq_fnid[task]] taskname = self.runq_task[task] @@ -653,6 +672,8 @@ class RunQueue: #self.dump_data(taskData) + self.state = runQueueRunInit + def check_stamps(self): unchecked = {} current = [] @@ -796,39 +817,51 @@ class RunQueue: (if the abort on failure configuration option isn't set) """ - failures = 0 - while 1: - failed_fnids = [] - try: - self.execute_runqueue_internal() - finally: - if self.master_process: - failed_fnids = self.finish_runqueue() - if len(failed_fnids) == 0: - return failures + if self.state is runQueuePrepare: + self.prepare_runqueue() + + if self.state is runQueueRunInit: + bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") + self.execute_runqueue_initVars() + + if self.state is runQueueRunning: + self.execute_runqueue_internal() + + if self.state is runQueueCleanUp: + self.finish_runqueue() + + if self.state is runQueueFailed: if not self.taskData.tryaltconfigs: - raise bb.runqueue.TaskFailure(failed_fnids) - for fnid in failed_fnids: - #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid]) + raise bb.runqueue.TaskFailure(self.failed_fnids) + for fnid in self.failed_fnids: self.taskData.fail_fnid(fnid) - failures = failures + 1 self.reset_runqueue() - self.prepare_runqueue() + + if self.state is runQueueComplete: + # All done + bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) + return False + + if self.state is runQueueChildProcess: + print "Child process" + return False + + # Loop + return True def execute_runqueue_initVars(self): - self.stats = RunQueueStats() + self.stats = RunQueueStats(len(self.runq_fnid)) - self.active_builds = 0 self.runq_buildable = [] self.runq_running = [] self.runq_complete = [] self.build_pids = {} + self.build_pipes = {} self.failed_fnids = [] - self.master_process = True # Mark initial buildable tasks - for task in range(len(self.runq_fnid)): + for task in range(self.stats.total): self.runq_running.append(0) self.runq_complete.append(0) if len(self.runq_depends[task]) == 0: @@ -836,6 +869,10 @@ class RunQueue: else: self.runq_buildable.append(0) + self.state = runQueueRunning + + event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) + def task_complete(self, task): """ Mark a task as completed @@ -858,26 +895,32 @@ class RunQueue: taskname = self.runq_task[revdep] bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) + def task_fail(self, task, exitcode): + """ + Called when a task has failed + Updates the state engine with the failure + """ + bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode)) + self.stats.taskFailed() + fnid = self.runq_fnid[task] + self.failed_fnids.append(fnid) + bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + if self.taskData.abort: + self.state = runQueueCleanup + def execute_runqueue_internal(self): """ Run the tasks in a queue prepared by prepare_runqueue """ - bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") - - self.execute_runqueue_initVars() - - if len(self.runq_fnid) == 0: + if self.stats.total == 0: # nothing to do - return [] - - def sigint_handler(signum, frame): - raise KeyboardInterrupt - - event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata)) + self.state = runQueueCleanup while True: - task = self.sched.next() + task = None + if self.stats.active < self.number_tasks: + task = self.sched.next() if task is not None: fn = self.taskData.fn_index[self.runq_fnid[task]] @@ -885,107 +928,143 @@ class RunQueue: if self.check_stamp_task(task): bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) self.runq_running[task] = 1 + self.runq_buildable[task] = 1 self.task_complete(task) self.stats.taskCompleted() self.stats.taskSkipped() continue - bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task))) sys.stdout.flush() sys.stderr.flush() - try: + try: + pipein, pipeout = os.pipe() pid = os.fork() except OSError, e: bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) if pid == 0: - # Bypass master process' handling - self.master_process = False - # Stop Ctrl+C being sent to children - # signal.signal(signal.SIGINT, signal.SIG_IGN) + os.close(pipein) + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_pipe = pipeout + + self.state = runQueueChildProcess # Make the child the process group leader os.setpgid(0, 0) + # No stdin newsi = os.open('/dev/null', os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) - self.cooker.configuration.cmd = taskname[3:] + + bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) + bb.msg.note(1, bb.msg.domain.RunQueue, + "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, + self.stats.total, + task, + self.get_user_idstring(task))) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) try: - self.cooker.tryBuild(fn) + self.cooker.tryBuild(fn, taskname[3:]) except bb.build.EventException: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - sys.exit(1) + os._exit(1) except: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - raise - sys.exit(0) + os._exit(1) + os._exit(0) + self.build_pids[pid] = task + self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.runq_running[task] = 1 - self.active_builds = self.active_builds + 1 - if self.active_builds < self.number_tasks: + self.stats.taskActive() + if self.stats.active < self.number_tasks: continue - if self.active_builds > 0: - result = os.waitpid(-1, 0) - self.active_builds = self.active_builds - 1 + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + if self.stats.active > 0: + result = os.waitpid(-1, os.WNOHANG) + if result[0] is 0 and result[1] is 0: + return task = self.build_pids[result[0]] + del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] if result[1] != 0: - del self.build_pids[result[0]] - bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) - self.failed_fnids.append(self.runq_fnid[task]) - self.stats.taskFailed() - if not self.taskData.abort: - continue - break + self.task_fail(task, result[1]) + return self.task_complete(task) self.stats.taskCompleted() - del self.build_pids[result[0]] + bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) continue + + if len(self.failed_fnids) != 0: + self.state = runQueueFailed + return + + # Sanity Checks + for task in range(self.stats.total): + if self.runq_buildable[task] == 0: + bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) + if self.runq_running[task] == 0: + bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) + if self.runq_complete[task] == 0: + bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) + self.state = runQueueComplete return - def finish_runqueue(self): + def finish_runqueue_now(self): + bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) + for k, v in self.build_pids.iteritems(): + try: + os.kill(-k, signal.SIGINT) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + def finish_runqueue(self, now = False): + self.state = runQueueCleanUp + if now: + self.finish_runqueue_now() try: - while self.active_builds > 0: - bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds) + while self.stats.active > 0: + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active) tasknum = 1 for k, v in self.build_pids.iteritems(): - bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) - tasknum = tasknum + 1 - result = os.waitpid(-1, 0) + bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) + tasknum = tasknum + 1 + result = os.waitpid(-1, os.WNOHANG) + if result[0] is 0 and result[1] is 0: + return task = self.build_pids[result[0]] - if result[1] != 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) - self.failed_fnids.append(self.runq_fnid[task]) - self.stats.taskFailed() del self.build_pids[result[0]] - self.active_builds = self.active_builds - 1 - bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) - return self.failed_fnids - except KeyboardInterrupt: - bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds) - for k, v in self.build_pids.iteritems(): - try: - os.kill(-k, signal.SIGINT) - except: - pass + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] + if result[1] != 0: + self.task_fail(task, result[1]) + else: + self.stats.taskCompleted() + bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) + except: + self.finish_runqueue_now() raise - # Sanity Checks - for task in range(len(self.runq_fnid)): - if self.runq_buildable[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) - if self.runq_running[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) - if self.runq_complete[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) - - bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) + if len(self.failed_fnids) != 0: + self.state = runQueueFailed + return - return self.failed_fnids + self.state = runQueueComplete + return def dump_data(self, taskQueue): """ Dump some debug information on the internal data structures """ bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") - for task in range(len(self.runq_fnid)): + for task in range(len(self.runq_task)): bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, taskQueue.fn_index[self.runq_fnid[task]], self.runq_task[task], @@ -994,7 +1073,7 @@ class RunQueue: self.runq_revdeps[task])) bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") - for task1 in range(len(self.runq_fnid)): + for task1 in range(len(self.runq_task)): if task1 in self.prio_map: task = self.prio_map[task1] bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, @@ -1005,6 +1084,58 @@ class RunQueue: self.runq_revdeps[task])) +class TaskFailure(Exception): + """ + Exception raised when a task in a runqueue fails + """ + def __init__(self, x): + self.args = x + + +class runQueueExitWait(bb.event.Event): + """ + Event when waiting for task processes to exit + """ + + def __init__(self, remain): + self.remain = remain + self.message = "Waiting for %s active tasks to finish" % remain + bb.event.Event.__init__(self) + +class runQueueEvent(bb.event.Event): + """ + Base runQueue event class + """ + def __init__(self, task, stats, rq): + self.taskid = task + self.taskstring = rq.get_user_idstring(task) + self.stats = stats + bb.event.Event.__init__(self) + +class runQueueTaskStarted(runQueueEvent): + """ + Event notifing a task was started + """ + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) + self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) + +class runQueueTaskFailed(runQueueEvent): + """ + Event notifing a task failed + """ + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) + self.message = "Task %s failed (%s)" % (task, self.taskstring) + +class runQueueTaskCompleted(runQueueEvent): + """ + Event notifing a task completed + """ + def __init__(self, task, stats, rq): + runQueueEvent.__init__(self, task, stats, rq) + self.message = "Task %s completed (%s)" % (task, self.taskstring) + def check_stamp_fn(fn, taskname, d): rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) fnid = rq.taskData.getfn_id(fn) @@ -1013,3 +1144,31 @@ def check_stamp_fn(fn, taskname, d): return rq.check_stamp_task(taskid) return None +class runQueuePipe(): + """ + Abstraction for a pipe between a worker thread and the server + """ + def __init__(self, pipein, pipeout, d): + self.fd = pipein + os.close(pipeout) + self.queue = "" + self.d = d + + def read(self): + start = len(self.queue) + self.queue = self.queue + os.read(self.fd, 1024) + end = len(self.queue) + index = self.queue.find("</event>") + while index != -1: + bb.event.fire_from_worker(self.queue[:index+8], self.d) + self.queue = self.queue[index+8:] + index = self.queue.find("</event>") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print "Warning, worker left partial message" + os.close(self.fd) + |