summaryrefslogtreecommitdiff
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <rpurdie@linux.intel.com>2010-08-18 11:30:53 +0100
committerRichard Purdie <rpurdie@linux.intel.com>2010-08-18 11:49:58 +0100
commit5d9f37873d88a33cc0f1c326a2cb0c2ff673a3a6 (patch)
treef9a0a5c3c0a0ffd6084824e95f0fc863898098ee /bitbake/lib/bb/runqueue.py
parentd7bc9b8ecec524294cc9143fd0b349249b329891 (diff)
downloadopenembedded-core-5d9f37873d88a33cc0f1c326a2cb0c2ff673a3a6.tar.gz
openembedded-core-5d9f37873d88a33cc0f1c326a2cb0c2ff673a3a6.tar.bz2
openembedded-core-5d9f37873d88a33cc0f1c326a2cb0c2ff673a3a6.zip
bitbake: Split Runqueue into two classes, a data processor and the execution part
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py221
1 files changed, 116 insertions, 105 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index bdd806a2c1..c25adc37fa 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -76,12 +76,13 @@ class RunQueueScheduler(object):
"""
name = "basic"
- def __init__(self, runqueue):
+ def __init__(self, runqueue, rqdata):
"""
The default scheduler just returns the first buildable task (the
priority map is sorted by task numer)
"""
self.rq = runqueue
+ self.rqdata = rqdata
numTasks = len(self.rq.runq_fnid)
self.prio_map = []
@@ -91,7 +92,7 @@ class RunQueueScheduler(object):
"""
Return the id of the first task we find that is buildable
"""
- for task1 in range(len(self.rq.runq_fnid)):
+ for task1 in range(len(self.rqdata.runq_fnid)):
task = self.prio_map[task1]
if self.rq.runq_running[task] == 1:
continue
@@ -105,16 +106,17 @@ class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
name = "speed"
- def __init__(self, runqueue):
+ def __init__(self, runqueue, rqdata):
"""
The priority map is sorted by task weight.
"""
from copy import deepcopy
self.rq = runqueue
+ self.rqdata = rqdata
- sortweight = sorted(deepcopy(self.rq.runq_weight))
- copyweight = deepcopy(self.rq.runq_weight)
+ sortweight = sorted(deepcopy(self.rqdata.runq_weight))
+ copyweight = deepcopy(self.rqdata.runq_weight)
self.prio_map = []
for weight in sortweight:
@@ -134,8 +136,8 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
"""
name = "completion"
- def __init__(self, runqueue):
- RunQueueSchedulerSpeed.__init__(self, runqueue)
+ def __init__(self, runqueue, rqdata):
+ RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
from copy import deepcopy
#FIXME - whilst this groups all fnids together it does not reorder the
@@ -146,10 +148,10 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
while (len(basemap) > 0):
entry = basemap.pop(0)
self.prio_map.append(entry)
- fnid = self.rq.runq_fnid[entry]
+ fnid = self.rqdata.runq_fnid[entry]
todel = []
for entry in basemap:
- entry_fnid = self.rq.runq_fnid[entry]
+ entry_fnid = self.rqdata.runq_fnid[entry]
if entry_fnid == fnid:
todel.append(basemap.index(entry))
self.prio_map.append(entry)
@@ -157,30 +159,27 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
for idx in todel:
del basemap[idx]
-class RunQueue:
+class RunQueueData:
"""
BitBake Run Queue implementation
"""
- def __init__(self, cooker, cfgData, dataCache, taskData, targets):
- self.reset_runqueue()
+ def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
self.cooker = cooker
self.dataCache = dataCache
self.taskData = taskData
- self.cfgData = cfgData
self.targets = targets
+ self.rq = rq
- self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
- self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
- self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
- self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
+ self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
+
+ self.reset()
- def reset_runqueue(self):
+ def reset(self):
self.runq_fnid = []
self.runq_task = []
self.runq_depends = []
self.runq_revdeps = []
- self.state = runQueuePrepare
def runq_depends_names(self, ids):
import re
@@ -348,7 +347,7 @@ class RunQueue:
return weight
- def prepare_runqueue(self):
+ def prepare(self):
"""
Turn a set of taskData into a RunQueue and compute data needed
to optimise the execution order.
@@ -644,17 +643,6 @@ class RunQueue:
# Check of higher length circular dependencies
self.runq_weight = self.calculate_task_weights(endpoints)
- schedulers = [obj for obj in globals().itervalues()
- if type(obj) is type and issubclass(obj, RunQueueScheduler)]
- for scheduler in schedulers:
- if self.scheduler == scheduler.name:
- self.sched = scheduler(self)
- break
- else:
- bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler)
- bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers))
- self.sched = RunQueueSchedulerSpeed(self)
-
# Sanity Check - Check for multiple tasks building the same provider
prov_list = {}
seen_fn = []
@@ -690,7 +678,43 @@ class RunQueue:
#self.dump_data(taskData)
- self.state = runQueueRunInit
+ def dump_data(self, taskQueue):
+ """
+ Dump some debug information on the internal data structures
+ """
+ bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
+ for task in range(len(self.rqdata.runq_task)):
+ bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task]))
+
+ bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
+ for task1 in range(len(self.rqdata.runq_task)):
+ if task1 in self.prio_map:
+ task = self.prio_map[task1]
+ bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task]))
+
+
+class RunQueue:
+ def __init__(self, cooker, cfgData, dataCache, taskData, targets):
+
+ self.cooker = cooker
+ self.cfgData = cfgData
+ self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
+
+ self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
+ self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
+ self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
+
+ self.state = runQueuePrepare
def check_stamps(self):
unchecked = {}
@@ -704,29 +728,29 @@ class RunQueue:
fulldeptree = True
stampwhitelist = []
if self.stamppolicy == "whitelist":
- stampwhitelist = self.self.stampfnwhitelist
+ stampwhitelist = self.rqdata.stampfnwhitelist
- for task in range(len(self.runq_fnid)):
+ for task in range(len(self.rqdata.runq_fnid)):
unchecked[task] = ""
- if len(self.runq_depends[task]) == 0:
+ if len(self.rqdata.runq_depends[task]) == 0:
buildable.append(task)
def check_buildable(self, task, buildable):
- for revdep in self.runq_revdeps[task]:
+ for revdep in self.rqdata.runq_revdeps[task]:
alldeps = 1
- for dep in self.runq_depends[revdep]:
+ for dep in self.rqdata.runq_depends[revdep]:
if dep in unchecked:
alldeps = 0
if alldeps == 1:
if revdep in unchecked:
buildable.append(revdep)
- for task in range(len(self.runq_fnid)):
+ for task in range(len(self.rqdata.runq_fnid)):
if task not in unchecked:
continue
- fn = self.taskData.fn_index[self.runq_fnid[task]]
- taskname = self.runq_task[task]
- stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
+ taskname = self.rqdata.runq_task[task]
+ stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname)
# If the stamp is missing its not current
if not os.access(stampfile, os.F_OK):
del unchecked[task]
@@ -734,7 +758,7 @@ class RunQueue:
check_buildable(self, task, buildable)
continue
# If its a 'nostamp' task, it's not current
- taskdep = self.dataCache.task_deps[fn]
+ taskdep = self.rqdata.dataCache.task_deps[fn]
if 'nostamp' in taskdep and task in taskdep['nostamp']:
del unchecked[task]
notcurrent.append(task)
@@ -745,17 +769,17 @@ class RunQueue:
nextbuildable = []
for task in buildable:
if task in unchecked:
- fn = self.taskData.fn_index[self.runq_fnid[task]]
- taskname = self.runq_task[task]
- stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
+ fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
+ taskname = self.rqdata.runq_task[task]
+ stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname)
iscurrent = True
t1 = os.stat(stampfile)[stat.ST_MTIME]
- for dep in self.runq_depends[task]:
+ for dep in self.rqdata.runq_depends[task]:
if iscurrent:
- fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
- taskname2 = self.runq_task[dep]
- stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
+ fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
+ taskname2 = self.rqdata.runq_task[dep]
+ stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
if dep in notcurrent:
iscurrent = False
@@ -794,29 +818,29 @@ class RunQueue:
fulldeptree = True
stampwhitelist = []
if self.stamppolicy == "whitelist":
- stampwhitelist = self.stampfnwhitelist
+ stampwhitelist = self.rqdata.stampfnwhitelist
- fn = self.taskData.fn_index[self.runq_fnid[task]]
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
if taskname is None:
- taskname = self.runq_task[task]
- stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
+ taskname = self.rqdata.runq_task[task]
+ stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname)
# If the stamp is missing its not current
if not os.access(stampfile, os.F_OK):
bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
return False
# If its a 'nostamp' task, it's not current
- taskdep = self.dataCache.task_deps[fn]
+ taskdep = self.rqdata.dataCache.task_deps[fn]
if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
return False
iscurrent = True
t1 = os.stat(stampfile)[stat.ST_MTIME]
- for dep in self.runq_depends[task]:
+ for dep in self.rqdata.runq_depends[task]:
if iscurrent:
- fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
- taskname2 = self.runq_task[dep]
- stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
+ fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
+ taskname2 = self.rqdata.runq_task[dep]
+ stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
try:
t2 = os.stat(stampfile2)[stat.ST_MTIME]
@@ -831,13 +855,14 @@ class RunQueue:
def execute_runqueue(self):
"""
- Run the tasks in a queue prepared by prepare_runqueue
+ Run the tasks in a queue prepared by rqdata.prepare()
Upon failure, optionally try to recover the build using any alternate providers
(if the abort on failure configuration option isn't set)
"""
if self.state is runQueuePrepare:
- self.prepare_runqueue()
+ self.rqdata.prepare()
+ self.state = runQueueRunInit
if self.state is runQueueRunInit:
bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
@@ -850,11 +875,11 @@ class RunQueue:
self.finish_runqueue()
if self.state is runQueueFailed:
- if not self.taskData.tryaltconfigs:
+ if not self.rqdata.taskData.tryaltconfigs:
raise bb.runqueue.TaskFailure(self.failed_fnids)
for fnid in self.failed_fnids:
- self.taskData.fail_fnid(fnid)
- self.reset_runqueue()
+ self.rqdata.taskData.fail_fnid(fnid)
+ self.rqdata.reset()
if self.state is runQueueComplete:
# All done
@@ -870,7 +895,7 @@ class RunQueue:
def execute_runqueue_initVars(self):
- self.stats = RunQueueStats(len(self.runq_fnid))
+ self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
self.runq_buildable = []
self.runq_running = []
@@ -883,14 +908,25 @@ class RunQueue:
for task in range(self.stats.total):
self.runq_running.append(0)
self.runq_complete.append(0)
- if len(self.runq_depends[task]) == 0:
+ if len(self.rqdata.runq_depends[task]) == 0:
self.runq_buildable.append(1)
else:
self.runq_buildable.append(0)
self.state = runQueueRunning
- event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData)
+ event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
+
+ schedulers = [obj for obj in globals().itervalues()
+ if type(obj) is type and issubclass(obj, RunQueueScheduler)]
+ for scheduler in schedulers:
+ if self.scheduler == scheduler.name:
+ self.sched = scheduler(self, self.rqdata)
+ break
+ else:
+ bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler)
+ bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers))
+ self.sched = RunQueueSchedulerSpeed(self, self.rqdata)
def task_complete(self, task):
"""
@@ -899,19 +935,19 @@ class RunQueue:
completed dependencies as buildable
"""
self.runq_complete[task] = 1
- for revdep in self.runq_revdeps[task]:
+ for revdep in self.rqdata.runq_revdeps[task]:
if self.runq_running[revdep] == 1:
continue
if self.runq_buildable[revdep] == 1:
continue
alldeps = 1
- for dep in self.runq_depends[revdep]:
+ for dep in self.rqdata.runq_depends[revdep]:
if self.runq_complete[dep] != 1:
alldeps = 0
if alldeps == 1:
self.runq_buildable[revdep] = 1
- fn = self.taskData.fn_index[self.runq_fnid[revdep]]
- taskname = self.runq_task[revdep]
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
+ taskname = self.rqdata.runq_task[revdep]
bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
def task_fail(self, task, exitcode):
@@ -919,17 +955,17 @@ class RunQueue:
Called when a task has failed
Updates the state engine with the failure
"""
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode))
+ bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.rqdata.get_user_idstring(task), exitcode))
self.stats.taskFailed()
- fnid = self.runq_fnid[task]
+ fnid = self.rqdata.runq_fnid[task]
self.failed_fnids.append(fnid)
bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
- if self.taskData.abort:
+ if self.rqdata.taskData.abort:
self.state = runQueueCleanUp
def execute_runqueue_internal(self):
"""
- Run the tasks in a queue prepared by prepare_runqueue
+ Run the tasks in a queue prepared by rqdata.prepare()
"""
if self.stats.total == 0:
@@ -941,11 +977,11 @@ class RunQueue:
if self.stats.active < self.number_tasks:
task = self.sched.next()
if task is not None:
- fn = self.taskData.fn_index[self.runq_fnid[task]]
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
- taskname = self.runq_task[task]
+ taskname = self.rqdata.runq_task[task]
if self.check_stamp_task(task, taskname):
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
+ bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task)))
self.runq_running[task] = 1
self.runq_buildable[task] = 1
self.task_complete(task)
@@ -1072,7 +1108,7 @@ class RunQueue:
"Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
self.stats.total,
task,
- self.get_user_idstring(task)))
+ self.rqdata.get_user_idstring(task)))
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
@@ -1096,31 +1132,6 @@ class RunQueue:
return pid, pipein, pipeout
- def dump_data(self, taskQueue):
- """
- Dump some debug information on the internal data structures
- """
- bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
- for task in range(len(self.runq_task)):
- bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.runq_fnid[task]],
- self.runq_task[task],
- self.runq_weight[task],
- self.runq_depends[task],
- self.runq_revdeps[task]))
-
- bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
- for task1 in range(len(self.runq_task)):
- if task1 in self.prio_map:
- task = self.prio_map[task1]
- bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.runq_fnid[task]],
- self.runq_task[task],
- self.runq_weight[task],
- self.runq_depends[task],
- self.runq_revdeps[task]))
-
-
class TaskFailure(Exception):
"""
Exception raised when a task in a runqueue fails
@@ -1145,7 +1156,7 @@ class runQueueEvent(bb.event.Event):
"""
def __init__(self, task, stats, rq):
self.taskid = task
- self.taskstring = rq.get_user_idstring(task)
+ self.taskstring = rq.rqdata.get_user_idstring(task)
self.stats = stats
bb.event.Event.__init__(self)
@@ -1176,7 +1187,7 @@ class runQueueTaskCompleted(runQueueEvent):
def check_stamp_fn(fn, taskname, d):
rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
- fnid = rq.taskData.getfn_id(fn)
+ fnid = rq.rqdata.taskData.getfn_id(fn)
taskid = rq.get_task_id(fnid, taskname)
if taskid is not None:
return rq.check_stamp_task(taskid)