summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bitbake/lib/bb/runqueue.py170
1 files changed, 80 insertions, 90 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 4e37aaf723..8580f51693 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -22,13 +22,13 @@ Handles preparation and execution of a queue of tasks
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import copy
import os
import sys
import subprocess
import signal
import stat
import fcntl
-import copy
import logging
import bb
from bb import msg, data, event
@@ -36,12 +36,6 @@ from bb import msg, data, event
bblogger = logging.getLogger("BitBake")
logger = logging.getLogger("BitBake.RunQueue")
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
- logger.info("Importing cPickle failed. Falling back to a very slow implementation.")
-
class RunQueueStats:
"""
Holds statistics on the tasks handled by the associated runQueue
@@ -93,28 +87,28 @@ class RunQueueScheduler(object):
"""
self.rq = runqueue
self.rqdata = rqdata
- numTasks = len(self.rq.runq_fnid)
+ numTasks = len(self.rqdata.runq_fnid)
self.prio_map = []
self.prio_map.extend(range(numTasks))
- def next_buildable_tasks(self):
+ def next_buildable_task(self):
"""
Return the id of the first task we find that is buildable
"""
- for tasknum in range(len(self.rqdata.runq_fnid)):
+ for tasknum in xrange(len(self.rqdata.runq_fnid)):
taskid = self.prio_map[tasknum]
if self.rq.runq_running[taskid] == 1:
continue
if self.rq.runq_buildable[taskid] == 1:
- yield taskid
+ return taskid
def next(self):
"""
Return the id of the task we should build next
"""
if self.rq.stats.active < self.rq.number_tasks:
- return next(self.next_buildable_tasks(), None)
+ return self.next_buildable_task()
class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
@@ -127,13 +121,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
The priority map is sorted by task weight.
"""
- from copy import deepcopy
self.rq = runqueue
self.rqdata = rqdata
- sortweight = sorted(deepcopy(self.rqdata.runq_weight))
- copyweight = deepcopy(self.rqdata.runq_weight)
+ sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
+ copyweight = copy.deepcopy(self.rqdata.runq_weight)
self.prio_map = []
for weight in sortweight:
@@ -155,12 +148,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
def __init__(self, runqueue, rqdata):
RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
- from copy import deepcopy
#FIXME - whilst this groups all fnids together it does not reorder the
#fnid groups optimally.
- basemap = deepcopy(self.prio_map)
+ basemap = copy.deepcopy(self.prio_map)
self.prio_map = []
while (len(basemap) > 0):
entry = basemap.pop(0)
@@ -190,25 +182,6 @@ class RunQueueData:
self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
- self.schedulers = set(obj for obj in globals().itervalues()
- if type(obj) is type and issubclass(obj, RunQueueScheduler))
-
- user_schedulers = bb.data.getVar("BB_SCHEDULERS", cfgData, True)
- if user_schedulers:
- for sched in user_schedulers.split():
- if not "." in sched:
- bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
- continue
-
- modname, name = sched.rsplit(".", 1)
- try:
- module = __import__(modname, fromlist=(name,))
- except ImportError, exc:
- logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
- raise SystemExit(1)
- else:
- self.schedulers.add(getattr(module, name))
-
self.reset()
def reset(self):
@@ -313,7 +286,7 @@ class RunQueueData:
if dep in explored_deps[revdep]:
scan = True
if scan:
- find_chains(revdep, deepcopy(prev_chain))
+ find_chains(revdep, copy.deepcopy(prev_chain))
for dep in explored_deps[revdep]:
if dep not in total_deps:
total_deps.append(dep)
@@ -715,20 +688,15 @@ class RunQueueData:
stampfnwhitelist.append(fn)
self.stampfnwhitelist = stampfnwhitelist
- #self.dump_data(taskData)
-
# Interate over the task list looking for tasks with a 'setscene' function
-
self.runq_setscene = []
for task in range(len(self.runq_fnid)):
setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
if not setscene:
continue
- #bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task]))
self.runq_setscene.append(task)
# Interate over the task list and call into the siggen code
-
dealtwith = set()
todeal = set(range(len(self.runq_fnid)))
while len(todeal) > 0:
@@ -744,7 +712,7 @@ class RunQueueData:
hashdata = {}
hashdata["hashes"] = {}
hashdata["deps"] = {}
- for task in range(len(self.runq_fnid)):
+ for task in xrange(len(self.runq_fnid)):
hashdata["hashes"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = self.runq_hash[task]
deps = []
for dep in self.runq_depends[task]:
@@ -764,24 +732,24 @@ class RunQueueData:
Dump some debug information on the internal data structures
"""
logger.debug(3, "run_tasks:")
- for task in range(len(self.rqdata.runq_task)):
- logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.rqdata.runq_fnid[task]],
- self.rqdata.runq_task[task],
- self.rqdata.runq_weight[task],
- self.rqdata.runq_depends[task],
- self.rqdata.runq_revdeps[task]))
+ for task in xrange(len(self.rqdata.runq_task)):
+ logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task])
logger.debug(3, "sorted_tasks:")
- for task1 in range(len(self.rqdata.runq_task)):
+ for task1 in xrange(len(self.rqdata.runq_task)):
if task1 in self.prio_map:
task = self.prio_map[task1]
- logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.rqdata.runq_fnid[task]],
- self.rqdata.runq_task[task],
- self.rqdata.runq_weight[task],
- self.rqdata.runq_depends[task],
- self.rqdata.runq_revdeps[task]))
+ logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task])
class RunQueue:
def __init__(self, cooker, cfgData, dataCache, taskData, targets):
@@ -809,7 +777,7 @@ class RunQueue:
if self.stamppolicy == "whitelist":
stampwhitelist = self.rqdata.stampfnwhitelist
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
unchecked[task] = ""
if len(self.rqdata.runq_depends[task]) == 0:
buildable.append(task)
@@ -824,7 +792,7 @@ class RunQueue:
if revdep in unchecked:
buildable.append(revdep)
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
if task not in unchecked:
continue
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
@@ -909,7 +877,7 @@ class RunQueue:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
if taskname is None:
taskname = self.rqdata.runq_task[task]
-
+
stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname)
# If the stamp is missing its not current
@@ -919,7 +887,7 @@ class RunQueue:
# If its a 'nostamp' task, it's not current
taskdep = self.rqdata.dataCache.task_deps[fn]
if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
- logger.debug(2, "%s.%s is nostamp\n" % (fn, taskname))
+ logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
return False
if taskname != "do_setscene" and taskname.endswith("_setscene"):
@@ -939,10 +907,10 @@ class RunQueue:
continue
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
if not t2:
- logger.debug(2, "Stampfile %s does not exist" % (stampfile2))
+ logger.debug(2, 'Stampfile %s does not exist', stampfile2)
iscurrent = False
if t1 < t2:
- logger.debug(2, "Stampfile %s < %s" % (stampfile, stampfile2))
+ logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
iscurrent = False
return iscurrent
@@ -1014,7 +982,7 @@ class RunQueue:
bb.note("Reparsing files to collect dependency data")
for task in range(len(self.rqdata.runq_fnid)):
if self.rqdata.runq_fnid[task] not in done:
- fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
done.add(self.rqdata.runq_fnid[task])
@@ -1219,14 +1187,38 @@ class RunQueueExecuteTasks(RunQueueExecute):
event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
- for scheduler in self.rqdata.schedulers:
+ schedulers = self.get_schedulers()
+ for scheduler in schedulers:
if self.scheduler == scheduler.name:
self.sched = scheduler(self, self.rqdata)
logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
break
else:
- bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
- (self.scheduler, ", ".join(obj.name for obj in self.rqdata.schedulers)))
+ bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
+ (self.scheduler, ", ".join(obj.name for obj in schedulers)))
+
+
+ def get_schedulers(self):
+ schedulers = set(obj for obj in globals().values()
+ if type(obj) is type and
+ issubclass(obj, RunQueueScheduler))
+
+ user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
+ if user_schedulers:
+ for sched in user_schedulers.split():
+ if not "." in sched:
+ bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
+ continue
+
+ modname, name = sched.rsplit(".", 1)
+ try:
+ module = __import__(modname, fromlist=(name,))
+ except ImportError, exc:
+ logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
+ raise SystemExit(1)
+ else:
+ schedulers.add(getattr(module, name))
+ return schedulers
def task_completeoutright(self, task):
"""
@@ -1283,12 +1275,14 @@ class RunQueueExecuteTasks(RunQueueExecute):
# nothing to do
self.rq.state = runQueueCleanUp
- for task in iter(self.sched.next, None):
+ task = self.sched.next()
+ if task is not None:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
taskname = self.rqdata.runq_task[task]
if self.rq.check_stamp_task(task, taskname):
- logger.debug(2, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task)))
+ logger.debug(2, "Stamp current task %s (%s)", task,
+ self.rqdata.get_user_idstring(task))
self.task_skip(task)
return True
@@ -1455,12 +1449,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for task in xrange(len(self.sq_revdeps)):
if task not in valid_new and task not in noexec:
- logger.debug(2, "No package found so skipping setscene task %s" % (self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
+ logger.debug(2, 'No package found, so skipping setscene task %s',
+ self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
- #print(str(valid))
-
- logger.info("Executing SetScene Tasks")
+ logger.info('Executing SetScene Tasks')
self.rq.state = runQueueSceneRun
@@ -1521,11 +1514,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
# Find the next setscene to run
for nexttask in xrange(self.stats.total):
if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
- #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered))
- #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered):
- # bb.note("Skipping task %s" % nexttask)
- # self.scenequeue_skip(nexttask)
- # return True
task = nexttask
break
if task is not None:
@@ -1534,7 +1522,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
taskname = self.rqdata.runq_task[realtask] + "_setscene"
if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
- logger.debug(2, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task)))
+ logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
+ task, self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
return True
@@ -1545,7 +1534,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
return True
if self.rq.check_stamp_task(realtask, taskname):
- logger.debug(2, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask)))
+ logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
+ task, self.rqdata.get_user_idstring(realtask))
self.task_skip(task)
return True
@@ -1575,7 +1565,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for task in oldcovered:
self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
- bb.debug(1, "We can skip tasks %s" % self.rq.scenequeue_covered)
+ logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
self.rq.state = runQueueRunInit
return True
@@ -1630,12 +1620,12 @@ class runQueueTaskCompleted(runQueueEvent):
"""
#def check_stamp_fn(fn, taskname, d):
-# rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
+# rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
# fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
-# fnid = rq.rqdata.taskData.getfn_id(fn)
-# taskid = rq.get_task_id(fnid, taskname)
+# fnid = rqexe.rqdata.taskData.getfn_id(fn)
+# taskid = rqexe.rqdata.get_task_id(fnid, taskname)
# if taskid is not None:
-# return rq.check_stamp_task(taskid)
+# return rqexe.rq.check_stamp_task(taskid)
# return None
class runQueuePipe():
@@ -1643,17 +1633,17 @@ class runQueuePipe():
Abstraction for a pipe between a worker thread and the server
"""
def __init__(self, pipein, pipeout, d):
- self.fd = pipein
+ self.input = pipein
pipeout.close()
- fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+ fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
self.queue = ""
self.d = d
def read(self):
start = len(self.queue)
try:
- self.queue = self.queue + self.fd.read(1024)
- except IOError:
+ self.queue = self.queue + self.input.read(1024)
+ except (OSError, IOError):
pass
end = len(self.queue)
index = self.queue.find("</event>")
@@ -1668,4 +1658,4 @@ class runQueuePipe():
continue
if len(self.queue) > 0:
print("Warning, worker left partial message: %s" % self.queue)
- self.fd.close()
+ self.input.close()