From 9336ba1fd2ae750d3d399cc046896ef50f4cc0ed Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Mon, 10 Jan 2011 12:48:49 +0000 Subject: bitbake/runqueue.py: Sync with changes in upstream bitbake Signed-off-by: Richard Purdie --- bitbake/lib/bb/runqueue.py | 170 +++++++++++++++++++++------------------------ 1 file changed, 80 insertions(+), 90 deletions(-) diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 4e37aaf723..8580f51693 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -22,13 +22,13 @@ Handles preparation and execution of a queue of tasks # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import copy import os import sys import subprocess import signal import stat import fcntl -import copy import logging import bb from bb import msg, data, event @@ -36,12 +36,6 @@ from bb import msg, data, event bblogger = logging.getLogger("BitBake") logger = logging.getLogger("BitBake.RunQueue") -try: - import cPickle as pickle -except ImportError: - import pickle - logger.info("Importing cPickle failed. Falling back to a very slow implementation.") - class RunQueueStats: """ Holds statistics on the tasks handled by the associated runQueue @@ -93,28 +87,28 @@ class RunQueueScheduler(object): """ self.rq = runqueue self.rqdata = rqdata - numTasks = len(self.rq.runq_fnid) + numTasks = len(self.rqdata.runq_fnid) self.prio_map = [] self.prio_map.extend(range(numTasks)) - def next_buildable_tasks(self): + def next_buildable_task(self): """ Return the id of the first task we find that is buildable """ - for tasknum in range(len(self.rqdata.runq_fnid)): + for tasknum in xrange(len(self.rqdata.runq_fnid)): taskid = self.prio_map[tasknum] if self.rq.runq_running[taskid] == 1: continue if self.rq.runq_buildable[taskid] == 1: - yield taskid + return taskid def next(self): """ Return the id of the task we should build next """ if self.rq.stats.active < self.rq.number_tasks: - return next(self.next_buildable_tasks(), None) + return self.next_buildable_task() class RunQueueSchedulerSpeed(RunQueueScheduler): """ @@ -127,13 +121,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler): """ The priority map is sorted by task weight. """ - from copy import deepcopy self.rq = runqueue self.rqdata = rqdata - sortweight = sorted(deepcopy(self.rqdata.runq_weight)) - copyweight = deepcopy(self.rqdata.runq_weight) + sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight)) + copyweight = copy.deepcopy(self.rqdata.runq_weight) self.prio_map = [] for weight in sortweight: @@ -155,12 +148,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): def __init__(self, runqueue, rqdata): RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata) - from copy import deepcopy #FIXME - whilst this groups all fnids together it does not reorder the #fnid groups optimally. - basemap = deepcopy(self.prio_map) + basemap = copy.deepcopy(self.prio_map) self.prio_map = [] while (len(basemap) > 0): entry = basemap.pop(0) @@ -190,25 +182,6 @@ class RunQueueData: self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() - self.schedulers = set(obj for obj in globals().itervalues() - if type(obj) is type and issubclass(obj, RunQueueScheduler)) - - user_schedulers = bb.data.getVar("BB_SCHEDULERS", cfgData, True) - if user_schedulers: - for sched in user_schedulers.split(): - if not "." in sched: - bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) - continue - - modname, name = sched.rsplit(".", 1) - try: - module = __import__(modname, fromlist=(name,)) - except ImportError, exc: - logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) - raise SystemExit(1) - else: - self.schedulers.add(getattr(module, name)) - self.reset() def reset(self): @@ -313,7 +286,7 @@ class RunQueueData: if dep in explored_deps[revdep]: scan = True if scan: - find_chains(revdep, deepcopy(prev_chain)) + find_chains(revdep, copy.deepcopy(prev_chain)) for dep in explored_deps[revdep]: if dep not in total_deps: total_deps.append(dep) @@ -715,20 +688,15 @@ class RunQueueData: stampfnwhitelist.append(fn) self.stampfnwhitelist = stampfnwhitelist - #self.dump_data(taskData) - # Interate over the task list looking for tasks with a 'setscene' function - self.runq_setscene = [] for task in range(len(self.runq_fnid)): setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False) if not setscene: continue - #bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task])) self.runq_setscene.append(task) # Interate over the task list and call into the siggen code - dealtwith = set() todeal = set(range(len(self.runq_fnid))) while len(todeal) > 0: @@ -744,7 +712,7 @@ class RunQueueData: hashdata = {} hashdata["hashes"] = {} hashdata["deps"] = {} - for task in range(len(self.runq_fnid)): + for task in xrange(len(self.runq_fnid)): hashdata["hashes"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = self.runq_hash[task] deps = [] for dep in self.runq_depends[task]: @@ -764,24 +732,24 @@ class RunQueueData: Dump some debug information on the internal data structures """ logger.debug(3, "run_tasks:") - for task in range(len(self.rqdata.runq_task)): - logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.rqdata.runq_fnid[task]], - self.rqdata.runq_task[task], - self.rqdata.runq_weight[task], - self.rqdata.runq_depends[task], - self.rqdata.runq_revdeps[task])) + for task in xrange(len(self.rqdata.runq_task)): + logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task]) logger.debug(3, "sorted_tasks:") - for task1 in range(len(self.rqdata.runq_task)): + for task1 in xrange(len(self.rqdata.runq_task)): if task1 in self.prio_map: task = self.prio_map[task1] - logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.rqdata.runq_fnid[task]], - self.rqdata.runq_task[task], - self.rqdata.runq_weight[task], - self.rqdata.runq_depends[task], - self.rqdata.runq_revdeps[task])) + logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task]) class RunQueue: def __init__(self, cooker, cfgData, dataCache, taskData, targets): @@ -809,7 +777,7 @@ class RunQueue: if self.stamppolicy == "whitelist": stampwhitelist = self.rqdata.stampfnwhitelist - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): unchecked[task] = "" if len(self.rqdata.runq_depends[task]) == 0: buildable.append(task) @@ -824,7 +792,7 @@ class RunQueue: if revdep in unchecked: buildable.append(revdep) - for task in range(len(self.rqdata.runq_fnid)): + for task in xrange(len(self.rqdata.runq_fnid)): if task not in unchecked: continue fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] @@ -909,7 +877,7 @@ class RunQueue: fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] if taskname is None: taskname = self.rqdata.runq_task[task] - + stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname) # If the stamp is missing its not current @@ -919,7 +887,7 @@ class RunQueue: # If its a 'nostamp' task, it's not current taskdep = self.rqdata.dataCache.task_deps[fn] if 'nostamp' in taskdep and taskname in taskdep['nostamp']: - logger.debug(2, "%s.%s is nostamp\n" % (fn, taskname)) + logger.debug(2, "%s.%s is nostamp\n", fn, taskname) return False if taskname != "do_setscene" and taskname.endswith("_setscene"): @@ -939,10 +907,10 @@ class RunQueue: continue if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): if not t2: - logger.debug(2, "Stampfile %s does not exist" % (stampfile2)) + logger.debug(2, 'Stampfile %s does not exist', stampfile2) iscurrent = False if t1 < t2: - logger.debug(2, "Stampfile %s < %s" % (stampfile, stampfile2)) + logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2) iscurrent = False return iscurrent @@ -1014,7 +982,7 @@ class RunQueue: bb.note("Reparsing files to collect dependency data") for task in range(len(self.rqdata.runq_fnid)): if self.rqdata.runq_fnid[task] not in done: - fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) done.add(self.rqdata.runq_fnid[task]) @@ -1219,14 +1187,38 @@ class RunQueueExecuteTasks(RunQueueExecute): event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) - for scheduler in self.rqdata.schedulers: + schedulers = self.get_schedulers() + for scheduler in schedulers: if self.scheduler == scheduler.name: self.sched = scheduler(self, self.rqdata) logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) break else: - bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % - (self.scheduler, ", ".join(obj.name for obj in self.rqdata.schedulers))) + bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % + (self.scheduler, ", ".join(obj.name for obj in schedulers))) + + + def get_schedulers(self): + schedulers = set(obj for obj in globals().values() + if type(obj) is type and + issubclass(obj, RunQueueScheduler)) + + user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True) + if user_schedulers: + for sched in user_schedulers.split(): + if not "." in sched: + bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) + continue + + modname, name = sched.rsplit(".", 1) + try: + module = __import__(modname, fromlist=(name,)) + except ImportError, exc: + logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) + raise SystemExit(1) + else: + schedulers.add(getattr(module, name)) + return schedulers def task_completeoutright(self, task): """ @@ -1283,12 +1275,14 @@ class RunQueueExecuteTasks(RunQueueExecute): # nothing to do self.rq.state = runQueueCleanUp - for task in iter(self.sched.next, None): + task = self.sched.next() + if task is not None: fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] if self.rq.check_stamp_task(task, taskname): - logger.debug(2, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) + logger.debug(2, "Stamp current task %s (%s)", task, + self.rqdata.get_user_idstring(task)) self.task_skip(task) return True @@ -1455,12 +1449,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute): for task in xrange(len(self.sq_revdeps)): if task not in valid_new and task not in noexec: - logger.debug(2, "No package found so skipping setscene task %s" % (self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task]))) + logger.debug(2, 'No package found, so skipping setscene task %s', + self.rqdata.get_user_idstring(task)) self.task_failoutright(task) - #print(str(valid)) - - logger.info("Executing SetScene Tasks") + logger.info('Executing SetScene Tasks') self.rq.state = runQueueSceneRun @@ -1521,11 +1514,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute): # Find the next setscene to run for nexttask in xrange(self.stats.total): if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1: - #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered)) - #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered): - # bb.note("Skipping task %s" % nexttask) - # self.scenequeue_skip(nexttask) - # return True task = nexttask break if task is not None: @@ -1534,7 +1522,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): taskname = self.rqdata.runq_task[realtask] + "_setscene" if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]): - logger.debug(2, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task))) + logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant', + task, self.rqdata.get_user_idstring(task)) self.task_failoutright(task) return True @@ -1545,7 +1534,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): return True if self.rq.check_stamp_task(realtask, taskname): - logger.debug(2, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask))) + logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies', + task, self.rqdata.get_user_idstring(realtask)) self.task_skip(task) return True @@ -1575,7 +1565,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): for task in oldcovered: self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task]) - bb.debug(1, "We can skip tasks %s" % self.rq.scenequeue_covered) + logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered) self.rq.state = runQueueRunInit return True @@ -1630,12 +1620,12 @@ class runQueueTaskCompleted(runQueueEvent): """ #def check_stamp_fn(fn, taskname, d): -# rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) +# rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) # fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d) -# fnid = rq.rqdata.taskData.getfn_id(fn) -# taskid = rq.get_task_id(fnid, taskname) +# fnid = rqexe.rqdata.taskData.getfn_id(fn) +# taskid = rqexe.rqdata.get_task_id(fnid, taskname) # if taskid is not None: -# return rq.check_stamp_task(taskid) +# return rqexe.rq.check_stamp_task(taskid) # return None class runQueuePipe(): @@ -1643,17 +1633,17 @@ class runQueuePipe(): Abstraction for a pipe between a worker thread and the server """ def __init__(self, pipein, pipeout, d): - self.fd = pipein + self.input = pipein pipeout.close() - fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK) + fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK) self.queue = "" self.d = d def read(self): start = len(self.queue) try: - self.queue = self.queue + self.fd.read(1024) - except IOError: + self.queue = self.queue + self.input.read(1024) + except (OSError, IOError): pass end = len(self.queue) index = self.queue.find("") @@ -1668,4 +1658,4 @@ class runQueuePipe(): continue if len(self.queue) > 0: print("Warning, worker left partial message: %s" % self.queue) - self.fd.close() + self.input.close() -- cgit v1.2.3