From 5d9f37873d88a33cc0f1c326a2cb0c2ff673a3a6 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 18 Aug 2010 11:30:53 +0100 Subject: bitbake: Split Runqueue into two classes, a data processor and the execution part Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 16 ++-- bitbake/lib/bb/runqueue.py | 221 ++++++++++++++++++++++++--------------------- 2 files changed, 124 insertions(+), 113 deletions(-) (limited to 'bitbake/lib/bb') diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 3f6f6ef0b6..8d65ba809a 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -275,7 +275,7 @@ class BBCooker: taskdata.add_unresolved(localdata, self.status) rq = bb.runqueue.RunQueue(self, self.configuration.data, self.status, taskdata, runlist) - rq.prepare_runqueue() + rq.rqdata.prepare() seen_fnids = [] depend_tree = {} @@ -287,9 +287,9 @@ class BBCooker: depend_tree["rdepends-pkg"] = {} depend_tree["rrecs-pkg"] = {} - for task in range(len(rq.runq_fnid)): - taskname = rq.runq_task[task] - fnid = rq.runq_fnid[task] + for task in range(len(rq.rqdata.runq_fnid)): + taskname = rq.rqdata.runq_task[task] + fnid = rq.rqdata.runq_fnid[task] fn = taskdata.fn_index[fnid] pn = self.status.pkg_fn[fn] version = "%s:%s-%s" % self.status.pkg_pepvpr[fn] @@ -297,13 +297,13 @@ class BBCooker: depend_tree["pn"][pn] = {} depend_tree["pn"][pn]["filename"] = fn depend_tree["pn"][pn]["version"] = version - for dep in rq.runq_depends[task]: - depfn = taskdata.fn_index[rq.runq_fnid[dep]] + for dep in rq.rqdata.runq_depends[task]: + depfn = taskdata.fn_index[rq.rqdata.runq_fnid[dep]] deppn = self.status.pkg_fn[depfn] - dotname = "%s.%s" % (pn, rq.runq_task[task]) + dotname = "%s.%s" % (pn, rq.rqdata.runq_task[task]) if not dotname in depend_tree["tdepends"]: depend_tree["tdepends"][dotname] = [] - depend_tree["tdepends"][dotname].append("%s.%s" % (deppn, rq.runq_task[dep])) + depend_tree["tdepends"][dotname].append("%s.%s" % (deppn, rq.rqdata.runq_task[dep])) if fnid not in seen_fnids: seen_fnids.append(fnid) packages = [] diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index bdd806a2c1..c25adc37fa 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -76,12 +76,13 @@ class RunQueueScheduler(object): """ name = "basic" - def __init__(self, runqueue): + def __init__(self, runqueue, rqdata): """ The default scheduler just returns the first buildable task (the priority map is sorted by task numer) """ self.rq = runqueue + self.rqdata = rqdata numTasks = len(self.rq.runq_fnid) self.prio_map = [] @@ -91,7 +92,7 @@ class RunQueueScheduler(object): """ Return the id of the first task we find that is buildable """ - for task1 in range(len(self.rq.runq_fnid)): + for task1 in range(len(self.rqdata.runq_fnid)): task = self.prio_map[task1] if self.rq.runq_running[task] == 1: continue @@ -105,16 +106,17 @@ class RunQueueSchedulerSpeed(RunQueueScheduler): """ name = "speed" - def __init__(self, runqueue): + def __init__(self, runqueue, rqdata): """ The priority map is sorted by task weight. """ from copy import deepcopy self.rq = runqueue + self.rqdata = rqdata - sortweight = sorted(deepcopy(self.rq.runq_weight)) - copyweight = deepcopy(self.rq.runq_weight) + sortweight = sorted(deepcopy(self.rqdata.runq_weight)) + copyweight = deepcopy(self.rqdata.runq_weight) self.prio_map = [] for weight in sortweight: @@ -134,8 +136,8 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): """ name = "completion" - def __init__(self, runqueue): - RunQueueSchedulerSpeed.__init__(self, runqueue) + def __init__(self, runqueue, rqdata): + RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata) from copy import deepcopy #FIXME - whilst this groups all fnids together it does not reorder the @@ -146,10 +148,10 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): while (len(basemap) > 0): entry = basemap.pop(0) self.prio_map.append(entry) - fnid = self.rq.runq_fnid[entry] + fnid = self.rqdata.runq_fnid[entry] todel = [] for entry in basemap: - entry_fnid = self.rq.runq_fnid[entry] + entry_fnid = self.rqdata.runq_fnid[entry] if entry_fnid == fnid: todel.append(basemap.index(entry)) self.prio_map.append(entry) @@ -157,30 +159,27 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): for idx in todel: del basemap[idx] -class RunQueue: +class RunQueueData: """ BitBake Run Queue implementation """ - def __init__(self, cooker, cfgData, dataCache, taskData, targets): - self.reset_runqueue() + def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets): self.cooker = cooker self.dataCache = dataCache self.taskData = taskData - self.cfgData = cfgData self.targets = targets + self.rq = rq - self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) - self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() - self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" - self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" + self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() + + self.reset() - def reset_runqueue(self): + def reset(self): self.runq_fnid = [] self.runq_task = [] self.runq_depends = [] self.runq_revdeps = [] - self.state = runQueuePrepare def runq_depends_names(self, ids): import re @@ -348,7 +347,7 @@ class RunQueue: return weight - def prepare_runqueue(self): + def prepare(self): """ Turn a set of taskData into a RunQueue and compute data needed to optimise the execution order. @@ -644,17 +643,6 @@ class RunQueue: # Check of higher length circular dependencies self.runq_weight = self.calculate_task_weights(endpoints) - schedulers = [obj for obj in globals().itervalues() - if type(obj) is type and issubclass(obj, RunQueueScheduler)] - for scheduler in schedulers: - if self.scheduler == scheduler.name: - self.sched = scheduler(self) - break - else: - bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler) - bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) - self.sched = RunQueueSchedulerSpeed(self) - # Sanity Check - Check for multiple tasks building the same provider prov_list = {} seen_fn = [] @@ -690,7 +678,43 @@ class RunQueue: #self.dump_data(taskData) - self.state = runQueueRunInit + def dump_data(self, taskQueue): + """ + Dump some debug information on the internal data structures + """ + bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") + for task in range(len(self.rqdata.runq_task)): + bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task])) + + bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") + for task1 in range(len(self.rqdata.runq_task)): + if task1 in self.prio_map: + task = self.prio_map[task1] + bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, + taskQueue.fn_index[self.rqdata.runq_fnid[task]], + self.rqdata.runq_task[task], + self.rqdata.runq_weight[task], + self.rqdata.runq_depends[task], + self.rqdata.runq_revdeps[task])) + + +class RunQueue: + def __init__(self, cooker, cfgData, dataCache, taskData, targets): + + self.cooker = cooker + self.cfgData = cfgData + self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) + + self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) + self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" + self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" + + self.state = runQueuePrepare def check_stamps(self): unchecked = {} @@ -704,29 +728,29 @@ class RunQueue: fulldeptree = True stampwhitelist = [] if self.stamppolicy == "whitelist": - stampwhitelist = self.self.stampfnwhitelist + stampwhitelist = self.rqdata.stampfnwhitelist - for task in range(len(self.runq_fnid)): + for task in range(len(self.rqdata.runq_fnid)): unchecked[task] = "" - if len(self.runq_depends[task]) == 0: + if len(self.rqdata.runq_depends[task]) == 0: buildable.append(task) def check_buildable(self, task, buildable): - for revdep in self.runq_revdeps[task]: + for revdep in self.rqdata.runq_revdeps[task]: alldeps = 1 - for dep in self.runq_depends[revdep]: + for dep in self.rqdata.runq_depends[revdep]: if dep in unchecked: alldeps = 0 if alldeps == 1: if revdep in unchecked: buildable.append(revdep) - for task in range(len(self.runq_fnid)): + for task in range(len(self.rqdata.runq_fnid)): if task not in unchecked: continue - fn = self.taskData.fn_index[self.runq_fnid[task]] - taskname = self.runq_task[task] - stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] + taskname = self.rqdata.runq_task[task] + stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname) # If the stamp is missing its not current if not os.access(stampfile, os.F_OK): del unchecked[task] @@ -734,7 +758,7 @@ class RunQueue: check_buildable(self, task, buildable) continue # If its a 'nostamp' task, it's not current - taskdep = self.dataCache.task_deps[fn] + taskdep = self.rqdata.dataCache.task_deps[fn] if 'nostamp' in taskdep and task in taskdep['nostamp']: del unchecked[task] notcurrent.append(task) @@ -745,17 +769,17 @@ class RunQueue: nextbuildable = [] for task in buildable: if task in unchecked: - fn = self.taskData.fn_index[self.runq_fnid[task]] - taskname = self.runq_task[task] - stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) + fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]] + taskname = self.rqdata.runq_task[task] + stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname) iscurrent = True t1 = os.stat(stampfile)[stat.ST_MTIME] - for dep in self.runq_depends[task]: + for dep in self.rqdata.runq_depends[task]: if iscurrent: - fn2 = self.taskData.fn_index[self.runq_fnid[dep]] - taskname2 = self.runq_task[dep] - stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2) + fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]] + taskname2 = self.rqdata.runq_task[dep] + stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2) if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): if dep in notcurrent: iscurrent = False @@ -794,29 +818,29 @@ class RunQueue: fulldeptree = True stampwhitelist = [] if self.stamppolicy == "whitelist": - stampwhitelist = self.stampfnwhitelist + stampwhitelist = self.rqdata.stampfnwhitelist - fn = self.taskData.fn_index[self.runq_fnid[task]] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] if taskname is None: - taskname = self.runq_task[task] - stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) + taskname = self.rqdata.runq_task[task] + stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname) # If the stamp is missing its not current if not os.access(stampfile, os.F_OK): bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile) return False # If its a 'nostamp' task, it's not current - taskdep = self.dataCache.task_deps[fn] + taskdep = self.rqdata.dataCache.task_deps[fn] if 'nostamp' in taskdep and taskname in taskdep['nostamp']: bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname)) return False iscurrent = True t1 = os.stat(stampfile)[stat.ST_MTIME] - for dep in self.runq_depends[task]: + for dep in self.rqdata.runq_depends[task]: if iscurrent: - fn2 = self.taskData.fn_index[self.runq_fnid[dep]] - taskname2 = self.runq_task[dep] - stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2) + fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]] + taskname2 = self.rqdata.runq_task[dep] + stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2) if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): try: t2 = os.stat(stampfile2)[stat.ST_MTIME] @@ -831,13 +855,14 @@ class RunQueue: def execute_runqueue(self): """ - Run the tasks in a queue prepared by prepare_runqueue + Run the tasks in a queue prepared by rqdata.prepare() Upon failure, optionally try to recover the build using any alternate providers (if the abort on failure configuration option isn't set) """ if self.state is runQueuePrepare: - self.prepare_runqueue() + self.rqdata.prepare() + self.state = runQueueRunInit if self.state is runQueueRunInit: bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") @@ -850,11 +875,11 @@ class RunQueue: self.finish_runqueue() if self.state is runQueueFailed: - if not self.taskData.tryaltconfigs: + if not self.rqdata.taskData.tryaltconfigs: raise bb.runqueue.TaskFailure(self.failed_fnids) for fnid in self.failed_fnids: - self.taskData.fail_fnid(fnid) - self.reset_runqueue() + self.rqdata.taskData.fail_fnid(fnid) + self.rqdata.reset() if self.state is runQueueComplete: # All done @@ -870,7 +895,7 @@ class RunQueue: def execute_runqueue_initVars(self): - self.stats = RunQueueStats(len(self.runq_fnid)) + self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) self.runq_buildable = [] self.runq_running = [] @@ -883,14 +908,25 @@ class RunQueue: for task in range(self.stats.total): self.runq_running.append(0) self.runq_complete.append(0) - if len(self.runq_depends[task]) == 0: + if len(self.rqdata.runq_depends[task]) == 0: self.runq_buildable.append(1) else: self.runq_buildable.append(0) self.state = runQueueRunning - event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) + event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) + + schedulers = [obj for obj in globals().itervalues() + if type(obj) is type and issubclass(obj, RunQueueScheduler)] + for scheduler in schedulers: + if self.scheduler == scheduler.name: + self.sched = scheduler(self, self.rqdata) + break + else: + bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler) + bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) + self.sched = RunQueueSchedulerSpeed(self, self.rqdata) def task_complete(self, task): """ @@ -899,19 +935,19 @@ class RunQueue: completed dependencies as buildable """ self.runq_complete[task] = 1 - for revdep in self.runq_revdeps[task]: + for revdep in self.rqdata.runq_revdeps[task]: if self.runq_running[revdep] == 1: continue if self.runq_buildable[revdep] == 1: continue alldeps = 1 - for dep in self.runq_depends[revdep]: + for dep in self.rqdata.runq_depends[revdep]: if self.runq_complete[dep] != 1: alldeps = 0 if alldeps == 1: self.runq_buildable[revdep] = 1 - fn = self.taskData.fn_index[self.runq_fnid[revdep]] - taskname = self.runq_task[revdep] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]] + taskname = self.rqdata.runq_task[revdep] bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) def task_fail(self, task, exitcode): @@ -919,17 +955,17 @@ class RunQueue: Called when a task has failed Updates the state engine with the failure """ - bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode)) + bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.rqdata.get_user_idstring(task), exitcode)) self.stats.taskFailed() - fnid = self.runq_fnid[task] + fnid = self.rqdata.runq_fnid[task] self.failed_fnids.append(fnid) bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) - if self.taskData.abort: + if self.rqdata.taskData.abort: self.state = runQueueCleanUp def execute_runqueue_internal(self): """ - Run the tasks in a queue prepared by prepare_runqueue + Run the tasks in a queue prepared by rqdata.prepare() """ if self.stats.total == 0: @@ -941,11 +977,11 @@ class RunQueue: if self.stats.active < self.number_tasks: task = self.sched.next() if task is not None: - fn = self.taskData.fn_index[self.runq_fnid[task]] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] - taskname = self.runq_task[task] + taskname = self.rqdata.runq_task[task] if self.check_stamp_task(task, taskname): - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) + bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) self.runq_running[task] = 1 self.runq_buildable[task] = 1 self.task_complete(task) @@ -1072,7 +1108,7 @@ class RunQueue: "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, self.stats.total, task, - self.get_user_idstring(task))) + self.rqdata.get_user_idstring(task))) bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) @@ -1096,31 +1132,6 @@ class RunQueue: return pid, pipein, pipeout - def dump_data(self, taskQueue): - """ - Dump some debug information on the internal data structures - """ - bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") - for task in range(len(self.runq_task)): - bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.runq_fnid[task]], - self.runq_task[task], - self.runq_weight[task], - self.runq_depends[task], - self.runq_revdeps[task])) - - bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") - for task1 in range(len(self.runq_task)): - if task1 in self.prio_map: - task = self.prio_map[task1] - bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, - taskQueue.fn_index[self.runq_fnid[task]], - self.runq_task[task], - self.runq_weight[task], - self.runq_depends[task], - self.runq_revdeps[task])) - - class TaskFailure(Exception): """ Exception raised when a task in a runqueue fails @@ -1145,7 +1156,7 @@ class runQueueEvent(bb.event.Event): """ def __init__(self, task, stats, rq): self.taskid = task - self.taskstring = rq.get_user_idstring(task) + self.taskstring = rq.rqdata.get_user_idstring(task) self.stats = stats bb.event.Event.__init__(self) @@ -1176,7 +1187,7 @@ class runQueueTaskCompleted(runQueueEvent): def check_stamp_fn(fn, taskname, d): rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d) - fnid = rq.taskData.getfn_id(fn) + fnid = rq.rqdata.taskData.getfn_id(fn) taskid = rq.get_task_id(fnid, taskname) if taskid is not None: return rq.check_stamp_task(taskid) -- cgit v1.2.3