diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 443 |
1 files changed, 227 insertions, 216 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index b4134f8266..a3f444c2ab 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -22,19 +22,18 @@ Handles preparation and execution of a queue of tasks # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import bb, os, sys -import subprocess -from bb import msg, data, event +import copy +import os +import sys import signal import stat import fcntl -import copy +import logging +import bb +from bb import msg, data, event -try: - import cPickle as pickle -except ImportError: - import pickle - bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") +bblogger = logging.getLogger("BitBake") +logger = logging.getLogger("BitBake.RunQueue") class RunQueueStats: """ @@ -87,21 +86,28 @@ class RunQueueScheduler(object): """ self.rq = runqueue self.rqdata = rqdata - numTasks = len(self.rq.runq_fnid) + numTasks = len(self.rqdata.runq_fnid) self.prio_map = [] self.prio_map.extend(range(numTasks)) - def next(self): + def next_buildable_task(self): """ Return the id of the first task we find that is buildable """ - for task1 in range(len(self.rqdata.runq_fnid)): - task = self.prio_map[task1] - if self.rq.runq_running[task] == 1: + for tasknum in xrange(len(self.rqdata.runq_fnid)): + taskid = self.prio_map[tasknum] + if self.rq.runq_running[taskid] == 1: continue - if self.rq.runq_buildable[task] == 1: - return task + if self.rq.runq_buildable[taskid] == 1: + return taskid + + def next(self): + """ + Return the id of the task we should build next + """ + if self.rq.stats.active < self.rq.number_tasks: + return self.next_buildable_task() class RunQueueSchedulerSpeed(RunQueueScheduler): """ @@ -114,13 +120,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler): """ The priority map is sorted by task weight. """ - from copy import deepcopy self.rq = runqueue self.rqdata = rqdata - sortweight = sorted(deepcopy(self.rqdata.runq_weight)) - copyweight = deepcopy(self.rqdata.runq_weight) + sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight)) + copyweight = copy.deepcopy(self.rqdata.runq_weight) self.prio_map = [] for weight in sortweight: @@ -142,12 +147,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): def __init__(self, runqueue, rqdata): RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata) - from copy import deepcopy #FIXME - whilst this groups all fnids together it does not reorder the #fnid groups optimally. - basemap = deepcopy(self.prio_map) + basemap = copy.deepcopy(self.prio_map) self.prio_map = [] while (len(basemap) > 0): entry = basemap.pop(0) @@ -201,7 +205,7 @@ class RunQueueData: return "%s, %s" % (fn, taskname) def get_task_id(self, fnid, taskname): - for listid in range(len(self.runq_fnid)): + for listid in xrange(len(self.runq_fnid)): if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname: return listid return None @@ -223,7 +227,7 @@ class RunQueueData: """ lowest = 0 new_chain = [] - for entry in range(len(chain)): + for entry in xrange(len(chain)): if chain[entry] < chain[lowest]: lowest = entry new_chain.extend(chain[lowest:]) @@ -236,7 +240,7 @@ class RunQueueData: """ if len(chain1) != len(chain2): return False - for index in range(len(chain1)): + for index in xrange(len(chain1)): if chain1[index] != chain2[index]: return False return True @@ -281,7 +285,7 @@ class RunQueueData: if dep in explored_deps[revdep]: scan = True if scan: - find_chains(revdep, deepcopy(prev_chain)) + find_chains(revdep, copy.deepcopy(prev_chain)) for dep in explored_deps[revdep]: if dep not in total_deps: total_deps.append(dep) @@ -298,7 +302,7 @@ class RunQueueData: Calculate a number representing the "weight" of each task. Heavier weighted tasks have more dependencies and hence should be executed sooner for maximum speed. - This function also sanity checks the task list finding tasks that its not + This function also sanity checks the task list finding tasks that are not possible to execute due to circular dependencies. """ @@ -307,7 +311,7 @@ class RunQueueData: deps_left = [] task_done = [] - for listid in range(numTasks): + for listid in xrange(numTasks): task_done.append(False) weight.append(0) deps_left.append(len(self.runq_revdeps[listid])) @@ -331,17 +335,17 @@ class RunQueueData: # Circular dependency sanity check problem_tasks = [] - for task in range(numTasks): + for task in xrange(numTasks): if task_done[task] is False or deps_left[task] != 0: problem_tasks.append(task) - bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task))) - bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task])) + logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task)) + logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task]) if problem_tasks: message = "Unbuildable tasks were found.\n" message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" message = message + "Identifying dependency loops (this may take a short while)...\n" - bb.msg.error(bb.msg.domain.RunQueue, message) + logger.error(message) msgs = self.circular_depchains_handler(problem_tasks) @@ -369,7 +373,7 @@ class RunQueueData: # Nothing to do return 0 - bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue") + logger.info("Preparing runqueue") # Step A - Work out a list of tasks to run # @@ -409,14 +413,14 @@ class RunQueueData: if taskid is not None: depends.append(taskid) - for task in range(len(taskData.tasks_name)): + for task in xrange(len(taskData.tasks_name)): depends = [] recrdepends = [] fnid = taskData.tasks_fnid[task] fn = taskData.fn_index[fnid] task_deps = self.dataCache.task_deps[fn] - bb.msg.debug(2, bb.msg.domain.RunQueue, "Processing %s:%s" %(fn, taskData.tasks_name[task])) + logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task]) if fnid not in taskData.failed_fnids: @@ -454,7 +458,9 @@ class RunQueueData: depdata = taskData.build_targets[depid][0] if depdata is not None: dep = taskData.fn_index[depdata] - taskid = taskData.gettask_id(dep, idependtask) + taskid = taskData.gettask_id(dep, idependtask, False) + if taskid is None: + bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep)) depends.append(taskid) if depdata != fnid: tdepends_fnid[fnid].add(taskid) @@ -474,7 +480,7 @@ class RunQueueData: # Rmove all self references if task in depends: newdep = [] - bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)) + logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends) for dep in depends: if task != dep: newdep.append(dep) @@ -498,7 +504,7 @@ class RunQueueData: # Algorithm is O(tasks) + O(tasks)*O(fnids) # reccumdepends = {} - for task in range(len(self.runq_fnid)): + for task in xrange(len(self.runq_fnid)): fnid = self.runq_fnid[task] if fnid not in reccumdepends: if fnid in tdepends_fnid: @@ -506,7 +512,7 @@ class RunQueueData: else: reccumdepends[fnid] = set() reccumdepends[fnid].update(self.runq_depends[task]) - for task in range(len(self.runq_fnid)): + for task in xrange(len(self.runq_fnid)): taskfnid = self.runq_fnid[task] for fnid in reccumdepends: if task in reccumdepends[fnid]: @@ -519,7 +525,7 @@ class RunQueueData: # # e.g. do_sometask[recrdeptask] = "do_someothertask" # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) - for task in range(len(self.runq_fnid)): + for task in xrange(len(self.runq_fnid)): if len(runq_recrdepends[task]) > 0: taskfnid = self.runq_fnid[task] for dep in reccumdepends[taskfnid]: @@ -536,7 +542,7 @@ class RunQueueData: # as active too. If the task is to be 'forced', clear its stamp. Once # all active tasks are marked, prune the ones we don't need. - bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks") + logger.verbose("Marking Active Tasks") def mark_active(listid, depth): """ @@ -567,11 +573,6 @@ class RunQueueData: fn = taskData.fn_index[fnid] self.target_pairs.append((fn, target[1])) - # Remove stamps for targets if force mode active - if self.cooker.configuration.force: - bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn)) - bb.build.del_stamp(target[1], self.dataCache, fn) - if fnid in taskData.failed_fnids: continue @@ -588,7 +589,7 @@ class RunQueueData: maps = [] delcount = 0 - for listid in range(len(self.runq_fnid)): + for listid in xrange(len(self.runq_fnid)): if runq_build[listid-delcount] == 1: maps.append(listid-delcount) else: @@ -612,11 +613,11 @@ class RunQueueData: else: bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.") - bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid))) + logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid)) # Remap the dependencies to account for the deleted tasks # Check we didn't delete a task we depend on - for listid in range(len(self.runq_fnid)): + for listid in xrange(len(self.runq_fnid)): newdeps = [] origdeps = self.runq_depends[listid] for origdep in origdeps: @@ -625,17 +626,17 @@ class RunQueueData: newdeps.append(maps[origdep]) self.runq_depends[listid] = set(newdeps) - bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings") + logger.verbose("Assign Weightings") # Generate a list of reverse dependencies to ease future calculations - for listid in range(len(self.runq_fnid)): + for listid in xrange(len(self.runq_fnid)): for dep in self.runq_depends[listid]: self.runq_revdeps[dep].add(listid) # Identify tasks at the end of dependency chains # Error on circular dependency loops (length two) endpoints = [] - for listid in range(len(self.runq_fnid)): + for listid in xrange(len(self.runq_fnid)): revdeps = self.runq_revdeps[listid] if len(revdeps) == 0: endpoints.append(listid) @@ -644,7 +645,7 @@ class RunQueueData: #self.dump_data(taskData) bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid])) - bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints)) + logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) # Calculate task weights # Check of higher length circular dependencies @@ -653,7 +654,7 @@ class RunQueueData: # Sanity Check - Check for multiple tasks building the same provider prov_list = {} seen_fn = [] - for task in range(len(self.runq_fnid)): + for task in xrange(len(self.runq_fnid)): fn = taskData.fn_index[self.runq_fnid[task]] if fn in seen_fn: continue @@ -667,9 +668,7 @@ class RunQueueData: for prov in prov_list: if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist: error = True - bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov]))) - #if error: - # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...") + logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov])) # Create a whitelist usable by the stamp checks @@ -683,20 +682,15 @@ class RunQueueData: stampfnwhitelist.append(fn) self.stampfnwhitelist = stampfnwhitelist - #self.dump_data(taskData) - # Interate over the task list looking for tasks with a 'setscene' function - self.runq_setscene = [] for task in range(len(self.runq_fnid)): setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False) if not setscene: continue - #bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task])) self.runq_setscene.append(task) # Interate over the task list and call into the siggen code - dealtwith = set() todeal = set(range(len(self.runq_fnid))) while len(todeal) > 0: @@ -709,21 +703,24 @@ class RunQueueData: procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep]) self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache) - hashdata = {} - hashdata["hashes"] = {} - hashdata["deps"] = {} - for task in range(len(self.runq_fnid)): - hashdata["hashes"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = self.runq_hash[task] + self.hashes = {} + self.hash_deps = {} + for task in xrange(len(self.runq_fnid)): + identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]], + self.runq_task[task]) + self.hashes[identifier] = self.runq_hash[task] deps = [] for dep in self.runq_depends[task]: - deps.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep]) - hashdata["deps"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = deps + depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]], + self.runq_task[dep]) + deps.append(depidentifier) + self.hash_deps[identifier] = deps - hashdata["msg-debug"] = self.cooker.configuration.debug - hashdata["msg-debug-domains"] = self.cooker.configuration.debug_domains - hashdata["verbose"] = self.cooker.configuration.verbose - - self.hashdata = hashdata + # Remove stamps for targets if force mode active + if self.cooker.configuration.force: + for (fn, target) in self.target_pairs: + logger.verbose("Remove stamp %s, %s", target, fn) + bb.build.del_stamp(target, self.dataCache, fn) return len(self.runq_fnid) @@ -731,25 +728,25 @@ class RunQueueData: """ Dump some debug information on the internal data structures """ - bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") - for task in range(len(self.rqdata.runq_task)): - bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.rqdata.runq_fnid[task]], - self.rqdata.runq_task[task], - self.rqdata.runq_weight[task], - self.rqdata.runq_depends[task], - self.rqdata.runq_revdeps[task])) - - bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") - for task1 in range(len(self.rqdata.runq_task)): + logger.debug(3, "run_tasks:") + for task in xrange(len(self.rqdata.runq_task)): + logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task]) + + logger.debug(3, "sorted_tasks:") + for task1 in xrange(len(self.rqdata.runq_task)): if task1 in self.prio_map: task = self.prio_map[task1] - bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.rqdata.runq_fnid[task]], - self.rqdata.runq_task[task], - self.rqdata.runq_weight[task], - self.rqdata.runq_depends[task], - self.rqdata.runq_revdeps[task])) + logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task]) class RunQueue: def __init__(self, cooker, cfgData, dataCache, taskData, targets): @@ -777,7 +774,7 @@ class RunQueue: if self.stamppolicy == "whitelist": stampwhitelist = self.rqdata.stampfnwhitelist - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): unchecked[task] = "" if len(self.rqdata.runq_depends[task]) == 0: buildable.append(task) @@ -792,12 +789,12 @@ class RunQueue: if revdep in unchecked: buildable.append(revdep) - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): if task not in unchecked: continue fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] - stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname) + stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) # If the stamp is missing its not current if not os.access(stampfile, os.F_OK): del unchecked[task] @@ -818,7 +815,7 @@ class RunQueue: if task in unchecked: fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] - stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname) + stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) iscurrent = True t1 = os.stat(stampfile)[stat.ST_MTIME] @@ -826,7 +823,7 @@ class RunQueue: if iscurrent: fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]] taskname2 = self.rqdata.runq_task[dep] - stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2) + stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2) if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): if dep in notcurrent: iscurrent = False @@ -877,20 +874,20 @@ class RunQueue: fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] if taskname is None: taskname = self.rqdata.runq_task[task] - - stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], taskname, self.rqdata.runq_hash[task]) + + stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) # If the stamp is missing its not current if not os.access(stampfile, os.F_OK): - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile) + logger.debug(2, "Stampfile %s not available", stampfile) return False # If its a 'nostamp' task, it's not current taskdep = self.rqdata.dataCache.task_deps[fn] if 'nostamp' in taskdep and taskname in taskdep['nostamp']: - bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname)) + logger.debug(2, "%s.%s is nostamp\n", fn, taskname) return False - if taskname.endswith("_setscene"): + if taskname != "do_setscene" and taskname.endswith("_setscene"): return True iscurrent = True @@ -899,18 +896,18 @@ class RunQueue: if iscurrent: fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]] taskname2 = self.rqdata.runq_task[dep] - stampfile2 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], taskname2, self.rqdata.runq_hash[dep]) - stampfile3 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], taskname2 + "_setscene", self.rqdata.runq_hash[dep]) + stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2) + stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2) t2 = get_timestamp(stampfile2) t3 = get_timestamp(stampfile3) if t3 and t3 > t2: continue if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): if not t2: - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s does not exist" % (stampfile2)) + logger.debug(2, 'Stampfile %s does not exist', stampfile2) iscurrent = False if t1 < t2: - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile, stampfile2)) + logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2) iscurrent = False return iscurrent @@ -941,7 +938,7 @@ class RunQueue: retval = self.rqexe.execute() if self.state is runQueueRunInit: - bb.msg.note(1, bb.msg.domain.RunQueue, "Executing RunQueue Tasks") + logger.info("Executing RunQueue Tasks") self.rqexe = RunQueueExecuteTasks(self) self.state = runQueueRunning @@ -960,7 +957,7 @@ class RunQueue: if self.state is runQueueComplete: # All done - bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)) + logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed) return False if self.state is runQueueChildProcess: @@ -982,8 +979,8 @@ class RunQueue: bb.note("Reparsing files to collect dependency data") for task in range(len(self.rqdata.runq_fnid)): if self.rqdata.runq_fnid[task] not in done: - fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] - the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] + the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) done.add(self.rqdata.runq_fnid[task]) bb.parse.siggen.dump_sigs(self.rqdata.dataCache) @@ -1022,16 +1019,16 @@ class RunQueueExecute: self.build_pipes[result[0]].close() del self.build_pipes[result[0]] if result[1] != 0: - self.task_fail(task, result[1]) + self.task_fail(task, result[1]>>8) else: self.task_complete(task) def finish_now(self): if self.stats.active: - bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) + logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) for k, v in self.build_pids.iteritems(): try: - os.kill(-k, signal.SIGINT) + os.kill(-k, signal.SIGTERM) except: pass for pipe in self.build_pipes: @@ -1055,8 +1052,8 @@ class RunQueueExecute: self.rq.state = runQueueComplete return - def fork_off_task(self, fn, task, taskname): - the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) + def fork_off_task(self, fn, task, taskname, quieterrors=False): + the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) env = bb.data.export_vars(the_data) env = bb.data.export_envvars(env, the_data) @@ -1070,55 +1067,59 @@ class RunQueueExecute: fakedirs = (the_data.getVar("FAKEROOTDIRS", True) or "").split() for p in fakedirs: bb.mkdirhier(p) - bb.msg.debug(2, bb.msg.domain.RunQueue, "Running %s:%s under fakeroot, state dir is %s" % (fn, taskname, fakedirs)) + logger.debug(2, "Running %s:%s under fakeroot, state dir is %s" % (fn, taskname, fakedirs)) - env['BB_TASKHASH'] = self.rqdata.runq_hash[task] env['PATH'] = self.cooker.configuration.initial_path envbackup = os.environ.copy() - os.environ = env + for e in envbackup: + os.unsetenv(e) + for e in env: + os.putenv(e, env[e]) sys.stdout.flush() sys.stderr.flush() - try: - pipeinfd, pipeoutfd = os.pipe() - pipein = os.fdopen(pipeinfd, 'rb', 4096) - pipeout = os.fdopen(pipeoutfd, 'wb', 4096) - + pipein, pipeout = os.pipe() + pipein = os.fdopen(pipein, 'rb', 4096) + pipeout = os.fdopen(pipeout, 'wb', 0) pid = os.fork() except OSError as e: bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) if pid == 0: pipein.close() + # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() bb.event.worker_pipe = pipeout bb.event.useStdout = False + # Child processes should send their messages to the UI + # process via the server process, not print them + # themselves + bblogger.handlers = [bb.event.LogHandler()] + self.rq.state = runQueueChildProcess # Make the child the process group leader os.setpgid(0, 0) # No stdin - newsi = os.open('/dev/null', os.O_RDWR) + newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) - # Stdout to a logfile - #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True) - #mkdirhier(os.path.dirname(logout)) - #newso = open(logout, 'w') - #os.dup2(newso.fileno(), sys.stdout.fileno()) - #os.dup2(newso.fileno(), sys.stderr.fileno()) - if taskname.endswith("_setscene"): + if quieterrors: the_data.setVarFlag(taskname, "quieterrors", "1") + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) bb.data.setVar("BB_WORKERCONTEXT", "1", the_data) - bb.parse.siggen.set_taskdata(self.rqdata.hashdata["hashes"], self.rqdata.hashdata["deps"]) + bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps) - for h in self.rqdata.hashdata["hashes"]: - bb.data.setVar("BBHASH_%s" % h, self.rqdata.hashdata["hashes"][h], the_data) - for h in self.rqdata.hashdata["deps"]: - bb.data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hashdata["deps"][h], the_data) + for h in self.rqdata.hashes: + bb.data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h], the_data) + for h in self.rqdata.hash_deps: + bb.data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h], the_data) + + bb.data.setVar("BB_TASKHASH", self.rqdata.runq_hash[task], the_data) ret = 0 try: @@ -1128,7 +1129,10 @@ class RunQueueExecute: except: os._exit(1) - os.environ = envbackup + for e in env: + os.unsetenv(e) + for e in envbackup: + os.putenv(e, envbackup[e]) return pid, pipein, pipeout @@ -1136,9 +1140,10 @@ class RunQueueExecuteDummy(RunQueueExecute): def __init__(self, rq): self.rq = rq self.stats = RunQueueStats(0) + def finish(self): self.rq.state = runQueueComplete - return + return class RunQueueExecuteTasks(RunQueueExecute): def __init__(self, rq): @@ -1147,7 +1152,7 @@ class RunQueueExecuteTasks(RunQueueExecute): self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) # Mark initial buildable tasks - for task in range(self.stats.total): + for task in xrange(self.stats.total): self.runq_running.append(0) self.runq_complete.append(0) if len(self.rqdata.runq_depends[task]) == 0: @@ -1160,31 +1165,52 @@ class RunQueueExecuteTasks(RunQueueExecute): found = True while found: found = False - for task in range(self.stats.total): + for task in xrange(self.stats.total): if task in self.rq.scenequeue_covered: continue if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered): self.rq.scenequeue_covered.add(task) found = True - bb.debug("Full skip list %s" % self.rq.scenequeue_covered) + logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered) for task in self.rq.scenequeue_covered: self.task_skip(task) event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) - schedulers = [obj for obj in globals().itervalues() - if type(obj) is type and issubclass(obj, RunQueueScheduler)] + schedulers = self.get_schedulers() for scheduler in schedulers: if self.scheduler == scheduler.name: self.sched = scheduler(self, self.rqdata) + logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) break else: - bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler) - bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) - self.sched = RunQueueSchedulerSpeed(self, self.rqdata) + bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % + (self.scheduler, ", ".join(obj.name for obj in schedulers))) + + + def get_schedulers(self): + schedulers = set(obj for obj in globals().values() + if type(obj) is type and + issubclass(obj, RunQueueScheduler)) + user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True) + if user_schedulers: + for sched in user_schedulers.split(): + if not "." in sched: + bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) + continue + + modname, name = sched.rsplit(".", 1) + try: + module = __import__(modname, fromlist=(name,)) + except ImportError, exc: + logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) + raise SystemExit(1) + else: + schedulers.add(getattr(module, name)) + return schedulers def task_completeoutright(self, task): """ @@ -1206,7 +1232,7 @@ class RunQueueExecuteTasks(RunQueueExecute): self.runq_buildable[revdep] = 1 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]] taskname = self.rqdata.runq_task[revdep] - bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) + logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname) def task_complete(self, task): self.stats.taskCompleted() @@ -1218,11 +1244,10 @@ class RunQueueExecuteTasks(RunQueueExecute): Called when a task has failed Updates the state engine with the failure """ - bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.rqdata.get_user_idstring(task), exitcode)) self.stats.taskFailed() fnid = self.rqdata.runq_fnid[task] self.failed_fnids.append(fnid) - bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData) + bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData) if self.rqdata.taskData.abort: self.rq.state = runQueueCleanUp @@ -1242,38 +1267,30 @@ class RunQueueExecuteTasks(RunQueueExecute): # nothing to do self.rq.state = runQueueCleanUp - task = None - if self.stats.active < self.number_tasks: - task = self.sched.next() + task = self.sched.next() if task is not None: fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] if self.rq.check_stamp_task(task, taskname): - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) + logger.debug(2, "Stamp current task %s (%s)", task, + self.rqdata.get_user_idstring(task)) self.task_skip(task) return True - bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) - taskdep = self.rqdata.dataCache.task_deps[fn] if 'noexec' in taskdep and taskname in taskdep['noexec']: - bb.msg.note(1, bb.msg.domain.RunQueue, - "Noexec task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, - self.stats.total, - task, - self.rqdata.get_user_idstring(task))) + startevent = runQueueTaskStarted(task, self.stats, self.rq, + noexec=True) + bb.event.fire(startevent, self.cfgData) self.runq_running[task] = 1 self.stats.taskActive() bb.build.make_stamp(taskname, self.rqdata.dataCache, fn) self.task_complete(task) return True - - bb.msg.note(1, bb.msg.domain.RunQueue, - "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, - self.stats.total, - task, - self.rqdata.get_user_idstring(task))) + else: + startevent = runQueueTaskStarted(task, self.stats, self.rq) + bb.event.fire(startevent, self.cfgData) pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) @@ -1281,8 +1298,6 @@ class RunQueueExecuteTasks(RunQueueExecute): self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.runq_running[task] = 1 self.stats.taskActive() - if self.stats.active < self.number_tasks: - return True for pipe in self.build_pipes: self.build_pipes[pipe].read() @@ -1297,13 +1312,13 @@ class RunQueueExecuteTasks(RunQueueExecute): return True # Sanity Checks - for task in range(self.stats.total): + for task in xrange(self.stats.total): if self.runq_buildable[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) + logger.error("Task %s never buildable!", task) if self.runq_running[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) + logger.error("Task %s never ran!", task) if self.runq_complete[task] == 0: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) + logger.error("Task %s never completed!", task) self.rq.state = runQueueComplete return True @@ -1332,12 +1347,12 @@ class RunQueueExecuteScenequeue(RunQueueExecute): # therefore aims to collapse the huge runqueue dependency tree into a smaller one # only containing the setscene functions. - for task in range(self.stats.total): + for task in xrange(self.stats.total): self.runq_running.append(0) self.runq_complete.append(0) self.runq_buildable.append(0) - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task])) sq_revdeps_new.append(set()) if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene: @@ -1368,7 +1383,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): process_endpoints(endpoints) - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): if task in self.rqdata.runq_setscene: deps = set() for dep in sq_revdeps_new[task]: @@ -1377,20 +1392,20 @@ class RunQueueExecuteScenequeue(RunQueueExecute): elif len(sq_revdeps_new[task]) != 0: bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") - #for task in range(len(sq_revdeps_squash)): + #for task in xrange(len(sq_revdeps_squash)): # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task]) self.sq_deps = [] self.sq_revdeps = sq_revdeps_squash self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps) - for task in range(len(self.sq_revdeps)): + for task in xrange(len(self.sq_revdeps)): self.sq_deps.append(set()) - for task in range(len(self.sq_revdeps)): + for task in xrange(len(self.sq_revdeps)): for dep in self.sq_revdeps[task]: self.sq_deps[dep].add(task) - for task in range(len(self.sq_revdeps)): + for task in xrange(len(self.sq_revdeps)): if len(self.sq_revdeps[task]) == 0: self.runq_buildable[task] = 1 @@ -1401,7 +1416,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): sq_taskname = [] sq_task = [] noexec = [] - for task in range(len(self.sq_revdeps)): + for task in xrange(len(self.sq_revdeps)): realtask = self.rqdata.runq_setscene[task] fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] taskname = self.rqdata.runq_task[realtask] @@ -1424,14 +1439,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute): for v in valid: valid_new.append(sq_task[v]) - for task in range(len(self.sq_revdeps)): + for task in xrange(len(self.sq_revdeps)): if task not in valid_new and task not in noexec: - bb.msg.debug(2, bb.msg.domain.RunQueue, "No package found so skipping setscene task %s" % (self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task]))) + logger.debug(2, 'No package found, so skipping setscene task %s', + self.rqdata.get_user_idstring(task)) self.task_failoutright(task) - #print(str(valid)) - - bb.msg.note(1, bb.msg.domain.RunQueue, "Executing SetScene Tasks") + logger.info('Executing SetScene Tasks') self.rq.state = runQueueSceneRun @@ -1449,7 +1463,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): """ index = self.rqdata.runq_setscene[task] - bb.msg.debug(1, bb.msg.domain.RunQueue, "Found task %s could be accelerated" % self.rqdata.get_user_idstring(index)) + logger.debug(1, 'Found task %s which could be accelerated', + self.rqdata.get_user_idstring(index)) self.scenequeue_covered.add(task) self.scenequeue_updatecounters(task) @@ -1461,7 +1476,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): def task_fail(self, task, result): self.stats.taskFailed() index = self.rqdata.runq_setscene[task] - bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + bb.event.fire(runQueueTaskFailed(task, self.stats, result, self), self.cfgData) self.scenequeue_notcovered.add(task) self.scenequeue_updatecounters(task) @@ -1489,13 +1504,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): task = None if self.stats.active < self.number_tasks: # Find the next setscene to run - for nexttask in range(self.stats.total): + for nexttask in xrange(self.stats.total): if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1: - #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered)) - #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered): - # bb.note("Skipping task %s" % nexttask) - # self.scenequeue_skip(nexttask) - # return True task = nexttask break if task is not None: @@ -1504,7 +1514,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): taskname = self.rqdata.runq_task[realtask] + "_setscene" if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]): - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task))) + logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant', + task, self.rqdata.get_user_idstring(task)) self.task_failoutright(task) return True @@ -1515,12 +1526,12 @@ class RunQueueExecuteScenequeue(RunQueueExecute): return True if self.rq.check_stamp_task(realtask, taskname): - bb.msg.debug(2, bb.msg.domain.RunQueue, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask))) + logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies', + task, self.rqdata.get_user_idstring(realtask)) self.task_skip(task) return True - bb.msg.note(1, bb.msg.domain.RunQueue, - "Running setscene task %d of %d (%s:%s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, + logger.info("Running setscene task %d of %d (%s:%s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, self.stats.total, fn, taskname)) pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) @@ -1546,11 +1557,14 @@ class RunQueueExecuteScenequeue(RunQueueExecute): for task in oldcovered: self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task]) - bb.debug("We can skip tasks %s" % self.rq.scenequeue_covered) + logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered) self.rq.state = runQueueRunInit return True + def fork_off_task(self, fn, task, taskname): + return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True) + class TaskFailure(Exception): """ Exception raised when a task in a runqueue fails @@ -1583,51 +1597,48 @@ class runQueueTaskStarted(runQueueEvent): """ Event notifing a task was started """ - def __init__(self, task, stats, rq): + def __init__(self, task, stats, rq, noexec=False): runQueueEvent.__init__(self, task, stats, rq) - self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) + self.noexec = noexec class runQueueTaskFailed(runQueueEvent): """ Event notifing a task failed """ - def __init__(self, task, stats, rq): + def __init__(self, task, stats, exitcode, rq): runQueueEvent.__init__(self, task, stats, rq) - self.message = "Task %s failed (%s)" % (task, self.taskstring) + self.exitcode = exitcode class runQueueTaskCompleted(runQueueEvent): """ Event notifing a task completed """ - def __init__(self, task, stats, rq): - runQueueEvent.__init__(self, task, stats, rq) - self.message = "Task %s completed (%s)" % (task, self.taskstring) -#def check_stamp_fn(fn, taskname, d): -# rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) -# fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d) -# fnid = rq.rqdata.taskData.getfn_id(fn) -# taskid = rq.get_task_id(fnid, taskname) -# if taskid is not None: -# return rq.check_stamp_task(taskid) -# return None +def check_stamp_fn(fn, taskname, d): + rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) + fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d) + fnid = rqexe.rqdata.taskData.getfn_id(fn) + taskid = rqexe.rqdata.get_task_id(fnid, taskname) + if taskid is not None: + return rqexe.rq.check_stamp_task(taskid) + return None class runQueuePipe(): """ Abstraction for a pipe between a worker thread and the server """ def __init__(self, pipein, pipeout, d): - self.fd = pipein + self.input = pipein pipeout.close() - fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK) + fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK) self.queue = "" self.d = d def read(self): start = len(self.queue) try: - self.queue = self.queue + self.fd.read(1024) - except IOError: + self.queue = self.queue + self.input.read(102400) + except (OSError, IOError): pass end = len(self.queue) index = self.queue.find("</event>") @@ -1642,4 +1653,4 @@ class runQueuePipe(): continue if len(self.queue) > 0: print("Warning, worker left partial message: %s" % self.queue) - self.fd.close() + self.input.close() |