summaryrefslogtreecommitdiff
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py443
1 files changed, 227 insertions, 216 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index b4134f8266..a3f444c2ab 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -22,19 +22,18 @@ Handles preparation and execution of a queue of tasks
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import bb, os, sys
-import subprocess
-from bb import msg, data, event
+import copy
+import os
+import sys
import signal
import stat
import fcntl
-import copy
+import logging
+import bb
+from bb import msg, data, event
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
- bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
+bblogger = logging.getLogger("BitBake")
+logger = logging.getLogger("BitBake.RunQueue")
class RunQueueStats:
"""
@@ -87,21 +86,28 @@ class RunQueueScheduler(object):
"""
self.rq = runqueue
self.rqdata = rqdata
- numTasks = len(self.rq.runq_fnid)
+ numTasks = len(self.rqdata.runq_fnid)
self.prio_map = []
self.prio_map.extend(range(numTasks))
- def next(self):
+ def next_buildable_task(self):
"""
Return the id of the first task we find that is buildable
"""
- for task1 in range(len(self.rqdata.runq_fnid)):
- task = self.prio_map[task1]
- if self.rq.runq_running[task] == 1:
+ for tasknum in xrange(len(self.rqdata.runq_fnid)):
+ taskid = self.prio_map[tasknum]
+ if self.rq.runq_running[taskid] == 1:
continue
- if self.rq.runq_buildable[task] == 1:
- return task
+ if self.rq.runq_buildable[taskid] == 1:
+ return taskid
+
+ def next(self):
+ """
+ Return the id of the task we should build next
+ """
+ if self.rq.stats.active < self.rq.number_tasks:
+ return self.next_buildable_task()
class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
@@ -114,13 +120,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
The priority map is sorted by task weight.
"""
- from copy import deepcopy
self.rq = runqueue
self.rqdata = rqdata
- sortweight = sorted(deepcopy(self.rqdata.runq_weight))
- copyweight = deepcopy(self.rqdata.runq_weight)
+ sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
+ copyweight = copy.deepcopy(self.rqdata.runq_weight)
self.prio_map = []
for weight in sortweight:
@@ -142,12 +147,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
def __init__(self, runqueue, rqdata):
RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
- from copy import deepcopy
#FIXME - whilst this groups all fnids together it does not reorder the
#fnid groups optimally.
- basemap = deepcopy(self.prio_map)
+ basemap = copy.deepcopy(self.prio_map)
self.prio_map = []
while (len(basemap) > 0):
entry = basemap.pop(0)
@@ -201,7 +205,7 @@ class RunQueueData:
return "%s, %s" % (fn, taskname)
def get_task_id(self, fnid, taskname):
- for listid in range(len(self.runq_fnid)):
+ for listid in xrange(len(self.runq_fnid)):
if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
return listid
return None
@@ -223,7 +227,7 @@ class RunQueueData:
"""
lowest = 0
new_chain = []
- for entry in range(len(chain)):
+ for entry in xrange(len(chain)):
if chain[entry] < chain[lowest]:
lowest = entry
new_chain.extend(chain[lowest:])
@@ -236,7 +240,7 @@ class RunQueueData:
"""
if len(chain1) != len(chain2):
return False
- for index in range(len(chain1)):
+ for index in xrange(len(chain1)):
if chain1[index] != chain2[index]:
return False
return True
@@ -281,7 +285,7 @@ class RunQueueData:
if dep in explored_deps[revdep]:
scan = True
if scan:
- find_chains(revdep, deepcopy(prev_chain))
+ find_chains(revdep, copy.deepcopy(prev_chain))
for dep in explored_deps[revdep]:
if dep not in total_deps:
total_deps.append(dep)
@@ -298,7 +302,7 @@ class RunQueueData:
Calculate a number representing the "weight" of each task. Heavier weighted tasks
have more dependencies and hence should be executed sooner for maximum speed.
- This function also sanity checks the task list finding tasks that its not
+ This function also sanity checks the task list finding tasks that are not
possible to execute due to circular dependencies.
"""
@@ -307,7 +311,7 @@ class RunQueueData:
deps_left = []
task_done = []
- for listid in range(numTasks):
+ for listid in xrange(numTasks):
task_done.append(False)
weight.append(0)
deps_left.append(len(self.runq_revdeps[listid]))
@@ -331,17 +335,17 @@ class RunQueueData:
# Circular dependency sanity check
problem_tasks = []
- for task in range(numTasks):
+ for task in xrange(numTasks):
if task_done[task] is False or deps_left[task] != 0:
problem_tasks.append(task)
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
- bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
+ logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
+ logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
if problem_tasks:
message = "Unbuildable tasks were found.\n"
message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
message = message + "Identifying dependency loops (this may take a short while)...\n"
- bb.msg.error(bb.msg.domain.RunQueue, message)
+ logger.error(message)
msgs = self.circular_depchains_handler(problem_tasks)
@@ -369,7 +373,7 @@ class RunQueueData:
# Nothing to do
return 0
- bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
+ logger.info("Preparing runqueue")
# Step A - Work out a list of tasks to run
#
@@ -409,14 +413,14 @@ class RunQueueData:
if taskid is not None:
depends.append(taskid)
- for task in range(len(taskData.tasks_name)):
+ for task in xrange(len(taskData.tasks_name)):
depends = []
recrdepends = []
fnid = taskData.tasks_fnid[task]
fn = taskData.fn_index[fnid]
task_deps = self.dataCache.task_deps[fn]
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Processing %s:%s" %(fn, taskData.tasks_name[task]))
+ logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
if fnid not in taskData.failed_fnids:
@@ -454,7 +458,9 @@ class RunQueueData:
depdata = taskData.build_targets[depid][0]
if depdata is not None:
dep = taskData.fn_index[depdata]
- taskid = taskData.gettask_id(dep, idependtask)
+ taskid = taskData.gettask_id(dep, idependtask, False)
+ if taskid is None:
+ bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s in %s depends upon nonexistant task %s in %s" % (taskData.tasks_name[task], fn, idependtask, dep))
depends.append(taskid)
if depdata != fnid:
tdepends_fnid[fnid].add(taskid)
@@ -474,7 +480,7 @@ class RunQueueData:
# Rmove all self references
if task in depends:
newdep = []
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
+ logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)
for dep in depends:
if task != dep:
newdep.append(dep)
@@ -498,7 +504,7 @@ class RunQueueData:
# Algorithm is O(tasks) + O(tasks)*O(fnids)
#
reccumdepends = {}
- for task in range(len(self.runq_fnid)):
+ for task in xrange(len(self.runq_fnid)):
fnid = self.runq_fnid[task]
if fnid not in reccumdepends:
if fnid in tdepends_fnid:
@@ -506,7 +512,7 @@ class RunQueueData:
else:
reccumdepends[fnid] = set()
reccumdepends[fnid].update(self.runq_depends[task])
- for task in range(len(self.runq_fnid)):
+ for task in xrange(len(self.runq_fnid)):
taskfnid = self.runq_fnid[task]
for fnid in reccumdepends:
if task in reccumdepends[fnid]:
@@ -519,7 +525,7 @@ class RunQueueData:
#
# e.g. do_sometask[recrdeptask] = "do_someothertask"
# (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
- for task in range(len(self.runq_fnid)):
+ for task in xrange(len(self.runq_fnid)):
if len(runq_recrdepends[task]) > 0:
taskfnid = self.runq_fnid[task]
for dep in reccumdepends[taskfnid]:
@@ -536,7 +542,7 @@ class RunQueueData:
# as active too. If the task is to be 'forced', clear its stamp. Once
# all active tasks are marked, prune the ones we don't need.
- bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
+ logger.verbose("Marking Active Tasks")
def mark_active(listid, depth):
"""
@@ -567,11 +573,6 @@ class RunQueueData:
fn = taskData.fn_index[fnid]
self.target_pairs.append((fn, target[1]))
- # Remove stamps for targets if force mode active
- if self.cooker.configuration.force:
- bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
- bb.build.del_stamp(target[1], self.dataCache, fn)
-
if fnid in taskData.failed_fnids:
continue
@@ -588,7 +589,7 @@ class RunQueueData:
maps = []
delcount = 0
- for listid in range(len(self.runq_fnid)):
+ for listid in xrange(len(self.runq_fnid)):
if runq_build[listid-delcount] == 1:
maps.append(listid-delcount)
else:
@@ -612,11 +613,11 @@ class RunQueueData:
else:
bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
- bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
+ logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
# Remap the dependencies to account for the deleted tasks
# Check we didn't delete a task we depend on
- for listid in range(len(self.runq_fnid)):
+ for listid in xrange(len(self.runq_fnid)):
newdeps = []
origdeps = self.runq_depends[listid]
for origdep in origdeps:
@@ -625,17 +626,17 @@ class RunQueueData:
newdeps.append(maps[origdep])
self.runq_depends[listid] = set(newdeps)
- bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
+ logger.verbose("Assign Weightings")
# Generate a list of reverse dependencies to ease future calculations
- for listid in range(len(self.runq_fnid)):
+ for listid in xrange(len(self.runq_fnid)):
for dep in self.runq_depends[listid]:
self.runq_revdeps[dep].add(listid)
# Identify tasks at the end of dependency chains
# Error on circular dependency loops (length two)
endpoints = []
- for listid in range(len(self.runq_fnid)):
+ for listid in xrange(len(self.runq_fnid)):
revdeps = self.runq_revdeps[listid]
if len(revdeps) == 0:
endpoints.append(listid)
@@ -644,7 +645,7 @@ class RunQueueData:
#self.dump_data(taskData)
bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
- bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
+ logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
# Calculate task weights
# Check of higher length circular dependencies
@@ -653,7 +654,7 @@ class RunQueueData:
# Sanity Check - Check for multiple tasks building the same provider
prov_list = {}
seen_fn = []
- for task in range(len(self.runq_fnid)):
+ for task in xrange(len(self.runq_fnid)):
fn = taskData.fn_index[self.runq_fnid[task]]
if fn in seen_fn:
continue
@@ -667,9 +668,7 @@ class RunQueueData:
for prov in prov_list:
if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
error = True
- bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
- #if error:
- # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
+ logger.error("Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should.", prov, " ".join(prov_list[prov]))
# Create a whitelist usable by the stamp checks
@@ -683,20 +682,15 @@ class RunQueueData:
stampfnwhitelist.append(fn)
self.stampfnwhitelist = stampfnwhitelist
- #self.dump_data(taskData)
-
# Interate over the task list looking for tasks with a 'setscene' function
-
self.runq_setscene = []
for task in range(len(self.runq_fnid)):
setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
if not setscene:
continue
- #bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task]))
self.runq_setscene.append(task)
# Interate over the task list and call into the siggen code
-
dealtwith = set()
todeal = set(range(len(self.runq_fnid)))
while len(todeal) > 0:
@@ -709,21 +703,24 @@ class RunQueueData:
procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
- hashdata = {}
- hashdata["hashes"] = {}
- hashdata["deps"] = {}
- for task in range(len(self.runq_fnid)):
- hashdata["hashes"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = self.runq_hash[task]
+ self.hashes = {}
+ self.hash_deps = {}
+ for task in xrange(len(self.runq_fnid)):
+ identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
+ self.runq_task[task])
+ self.hashes[identifier] = self.runq_hash[task]
deps = []
for dep in self.runq_depends[task]:
- deps.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
- hashdata["deps"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = deps
+ depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
+ self.runq_task[dep])
+ deps.append(depidentifier)
+ self.hash_deps[identifier] = deps
- hashdata["msg-debug"] = self.cooker.configuration.debug
- hashdata["msg-debug-domains"] = self.cooker.configuration.debug_domains
- hashdata["verbose"] = self.cooker.configuration.verbose
-
- self.hashdata = hashdata
+ # Remove stamps for targets if force mode active
+ if self.cooker.configuration.force:
+ for (fn, target) in self.target_pairs:
+ logger.verbose("Remove stamp %s, %s", target, fn)
+ bb.build.del_stamp(target, self.dataCache, fn)
return len(self.runq_fnid)
@@ -731,25 +728,25 @@ class RunQueueData:
"""
Dump some debug information on the internal data structures
"""
- bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
- for task in range(len(self.rqdata.runq_task)):
- bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.rqdata.runq_fnid[task]],
- self.rqdata.runq_task[task],
- self.rqdata.runq_weight[task],
- self.rqdata.runq_depends[task],
- self.rqdata.runq_revdeps[task]))
-
- bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
- for task1 in range(len(self.rqdata.runq_task)):
+ logger.debug(3, "run_tasks:")
+ for task in xrange(len(self.rqdata.runq_task)):
+ logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task])
+
+ logger.debug(3, "sorted_tasks:")
+ for task1 in xrange(len(self.rqdata.runq_task)):
if task1 in self.prio_map:
task = self.prio_map[task1]
- bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
- taskQueue.fn_index[self.rqdata.runq_fnid[task]],
- self.rqdata.runq_task[task],
- self.rqdata.runq_weight[task],
- self.rqdata.runq_depends[task],
- self.rqdata.runq_revdeps[task]))
+ logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
+ taskQueue.fn_index[self.rqdata.runq_fnid[task]],
+ self.rqdata.runq_task[task],
+ self.rqdata.runq_weight[task],
+ self.rqdata.runq_depends[task],
+ self.rqdata.runq_revdeps[task])
class RunQueue:
def __init__(self, cooker, cfgData, dataCache, taskData, targets):
@@ -777,7 +774,7 @@ class RunQueue:
if self.stamppolicy == "whitelist":
stampwhitelist = self.rqdata.stampfnwhitelist
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
unchecked[task] = ""
if len(self.rqdata.runq_depends[task]) == 0:
buildable.append(task)
@@ -792,12 +789,12 @@ class RunQueue:
if revdep in unchecked:
buildable.append(revdep)
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
if task not in unchecked:
continue
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
taskname = self.rqdata.runq_task[task]
- stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname)
+ stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
# If the stamp is missing its not current
if not os.access(stampfile, os.F_OK):
del unchecked[task]
@@ -818,7 +815,7 @@ class RunQueue:
if task in unchecked:
fn = self.taskData.fn_index[self.rqdata.runq_fnid[task]]
taskname = self.rqdata.runq_task[task]
- stampfile = "%s.%s" % (self.rqdata.dataCache.stamp[fn], taskname)
+ stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
iscurrent = True
t1 = os.stat(stampfile)[stat.ST_MTIME]
@@ -826,7 +823,7 @@ class RunQueue:
if iscurrent:
fn2 = self.taskData.fn_index[self.rqdata.runq_fnid[dep]]
taskname2 = self.rqdata.runq_task[dep]
- stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
+ stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
if dep in notcurrent:
iscurrent = False
@@ -877,20 +874,20 @@ class RunQueue:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
if taskname is None:
taskname = self.rqdata.runq_task[task]
-
- stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], taskname, self.rqdata.runq_hash[task])
+
+ stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
# If the stamp is missing its not current
if not os.access(stampfile, os.F_OK):
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
+ logger.debug(2, "Stampfile %s not available", stampfile)
return False
# If its a 'nostamp' task, it's not current
taskdep = self.rqdata.dataCache.task_deps[fn]
if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
- bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
+ logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
return False
- if taskname.endswith("_setscene"):
+ if taskname != "do_setscene" and taskname.endswith("_setscene"):
return True
iscurrent = True
@@ -899,18 +896,18 @@ class RunQueue:
if iscurrent:
fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
taskname2 = self.rqdata.runq_task[dep]
- stampfile2 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], taskname2, self.rqdata.runq_hash[dep])
- stampfile3 = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn2], taskname2 + "_setscene", self.rqdata.runq_hash[dep])
+ stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
+ stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
t2 = get_timestamp(stampfile2)
t3 = get_timestamp(stampfile3)
if t3 and t3 > t2:
continue
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
if not t2:
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s does not exist" % (stampfile2))
+ logger.debug(2, 'Stampfile %s does not exist', stampfile2)
iscurrent = False
if t1 < t2:
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile, stampfile2))
+ logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
iscurrent = False
return iscurrent
@@ -941,7 +938,7 @@ class RunQueue:
retval = self.rqexe.execute()
if self.state is runQueueRunInit:
- bb.msg.note(1, bb.msg.domain.RunQueue, "Executing RunQueue Tasks")
+ logger.info("Executing RunQueue Tasks")
self.rqexe = RunQueueExecuteTasks(self)
self.state = runQueueRunning
@@ -960,7 +957,7 @@ class RunQueue:
if self.state is runQueueComplete:
# All done
- bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed))
+ logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
return False
if self.state is runQueueChildProcess:
@@ -982,8 +979,8 @@ class RunQueue:
bb.note("Reparsing files to collect dependency data")
for task in range(len(self.rqdata.runq_fnid)):
if self.rqdata.runq_fnid[task] not in done:
- fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
- the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
+ the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
done.add(self.rqdata.runq_fnid[task])
bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
@@ -1022,16 +1019,16 @@ class RunQueueExecute:
self.build_pipes[result[0]].close()
del self.build_pipes[result[0]]
if result[1] != 0:
- self.task_fail(task, result[1])
+ self.task_fail(task, result[1]>>8)
else:
self.task_complete(task)
def finish_now(self):
if self.stats.active:
- bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active)
+ logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
for k, v in self.build_pids.iteritems():
try:
- os.kill(-k, signal.SIGINT)
+ os.kill(-k, signal.SIGTERM)
except:
pass
for pipe in self.build_pipes:
@@ -1055,8 +1052,8 @@ class RunQueueExecute:
self.rq.state = runQueueComplete
return
- def fork_off_task(self, fn, task, taskname):
- the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
+ def fork_off_task(self, fn, task, taskname, quieterrors=False):
+ the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
env = bb.data.export_vars(the_data)
env = bb.data.export_envvars(env, the_data)
@@ -1070,55 +1067,59 @@ class RunQueueExecute:
fakedirs = (the_data.getVar("FAKEROOTDIRS", True) or "").split()
for p in fakedirs:
bb.mkdirhier(p)
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Running %s:%s under fakeroot, state dir is %s" % (fn, taskname, fakedirs))
+ logger.debug(2, "Running %s:%s under fakeroot, state dir is %s" % (fn, taskname, fakedirs))
- env['BB_TASKHASH'] = self.rqdata.runq_hash[task]
env['PATH'] = self.cooker.configuration.initial_path
envbackup = os.environ.copy()
- os.environ = env
+ for e in envbackup:
+ os.unsetenv(e)
+ for e in env:
+ os.putenv(e, env[e])
sys.stdout.flush()
sys.stderr.flush()
-
try:
- pipeinfd, pipeoutfd = os.pipe()
- pipein = os.fdopen(pipeinfd, 'rb', 4096)
- pipeout = os.fdopen(pipeoutfd, 'wb', 4096)
-
+ pipein, pipeout = os.pipe()
+ pipein = os.fdopen(pipein, 'rb', 4096)
+ pipeout = os.fdopen(pipeout, 'wb', 0)
pid = os.fork()
except OSError as e:
bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
if pid == 0:
pipein.close()
+
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_pipe = pipeout
bb.event.useStdout = False
+ # Child processes should send their messages to the UI
+ # process via the server process, not print them
+ # themselves
+ bblogger.handlers = [bb.event.LogHandler()]
+
self.rq.state = runQueueChildProcess
# Make the child the process group leader
os.setpgid(0, 0)
# No stdin
- newsi = os.open('/dev/null', os.O_RDWR)
+ newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
- # Stdout to a logfile
- #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True)
- #mkdirhier(os.path.dirname(logout))
- #newso = open(logout, 'w')
- #os.dup2(newso.fileno(), sys.stdout.fileno())
- #os.dup2(newso.fileno(), sys.stderr.fileno())
- if taskname.endswith("_setscene"):
+ if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
+ bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
+ bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
bb.data.setVar("BB_WORKERCONTEXT", "1", the_data)
- bb.parse.siggen.set_taskdata(self.rqdata.hashdata["hashes"], self.rqdata.hashdata["deps"])
+ bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
- for h in self.rqdata.hashdata["hashes"]:
- bb.data.setVar("BBHASH_%s" % h, self.rqdata.hashdata["hashes"][h], the_data)
- for h in self.rqdata.hashdata["deps"]:
- bb.data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hashdata["deps"][h], the_data)
+ for h in self.rqdata.hashes:
+ bb.data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h], the_data)
+ for h in self.rqdata.hash_deps:
+ bb.data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h], the_data)
+
+ bb.data.setVar("BB_TASKHASH", self.rqdata.runq_hash[task], the_data)
ret = 0
try:
@@ -1128,7 +1129,10 @@ class RunQueueExecute:
except:
os._exit(1)
- os.environ = envbackup
+ for e in env:
+ os.unsetenv(e)
+ for e in envbackup:
+ os.putenv(e, envbackup[e])
return pid, pipein, pipeout
@@ -1136,9 +1140,10 @@ class RunQueueExecuteDummy(RunQueueExecute):
def __init__(self, rq):
self.rq = rq
self.stats = RunQueueStats(0)
+
def finish(self):
self.rq.state = runQueueComplete
- return
+ return
class RunQueueExecuteTasks(RunQueueExecute):
def __init__(self, rq):
@@ -1147,7 +1152,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
# Mark initial buildable tasks
- for task in range(self.stats.total):
+ for task in xrange(self.stats.total):
self.runq_running.append(0)
self.runq_complete.append(0)
if len(self.rqdata.runq_depends[task]) == 0:
@@ -1160,31 +1165,52 @@ class RunQueueExecuteTasks(RunQueueExecute):
found = True
while found:
found = False
- for task in range(self.stats.total):
+ for task in xrange(self.stats.total):
if task in self.rq.scenequeue_covered:
continue
if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
self.rq.scenequeue_covered.add(task)
found = True
- bb.debug("Full skip list %s" % self.rq.scenequeue_covered)
+ logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
for task in self.rq.scenequeue_covered:
self.task_skip(task)
event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
- schedulers = [obj for obj in globals().itervalues()
- if type(obj) is type and issubclass(obj, RunQueueScheduler)]
+ schedulers = self.get_schedulers()
for scheduler in schedulers:
if self.scheduler == scheduler.name:
self.sched = scheduler(self, self.rqdata)
+ logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
break
else:
- bb.error("Invalid scheduler '%s', using default 'speed' scheduler" % self.scheduler)
- bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers))
- self.sched = RunQueueSchedulerSpeed(self, self.rqdata)
+ bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
+ (self.scheduler, ", ".join(obj.name for obj in schedulers)))
+
+
+ def get_schedulers(self):
+ schedulers = set(obj for obj in globals().values()
+ if type(obj) is type and
+ issubclass(obj, RunQueueScheduler))
+ user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
+ if user_schedulers:
+ for sched in user_schedulers.split():
+ if not "." in sched:
+ bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
+ continue
+
+ modname, name = sched.rsplit(".", 1)
+ try:
+ module = __import__(modname, fromlist=(name,))
+ except ImportError, exc:
+ logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
+ raise SystemExit(1)
+ else:
+ schedulers.add(getattr(module, name))
+ return schedulers
def task_completeoutright(self, task):
"""
@@ -1206,7 +1232,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.runq_buildable[revdep] = 1
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
taskname = self.rqdata.runq_task[revdep]
- bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
+ logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
def task_complete(self, task):
self.stats.taskCompleted()
@@ -1218,11 +1244,10 @@ class RunQueueExecuteTasks(RunQueueExecute):
Called when a task has failed
Updates the state engine with the failure
"""
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.rqdata.get_user_idstring(task), exitcode))
self.stats.taskFailed()
fnid = self.rqdata.runq_fnid[task]
self.failed_fnids.append(fnid)
- bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData)
+ bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
if self.rqdata.taskData.abort:
self.rq.state = runQueueCleanUp
@@ -1242,38 +1267,30 @@ class RunQueueExecuteTasks(RunQueueExecute):
# nothing to do
self.rq.state = runQueueCleanUp
- task = None
- if self.stats.active < self.number_tasks:
- task = self.sched.next()
+ task = self.sched.next()
if task is not None:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
taskname = self.rqdata.runq_task[task]
if self.rq.check_stamp_task(task, taskname):
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task)))
+ logger.debug(2, "Stamp current task %s (%s)", task,
+ self.rqdata.get_user_idstring(task))
self.task_skip(task)
return True
- bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
-
taskdep = self.rqdata.dataCache.task_deps[fn]
if 'noexec' in taskdep and taskname in taskdep['noexec']:
- bb.msg.note(1, bb.msg.domain.RunQueue,
- "Noexec task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
- self.stats.total,
- task,
- self.rqdata.get_user_idstring(task)))
+ startevent = runQueueTaskStarted(task, self.stats, self.rq,
+ noexec=True)
+ bb.event.fire(startevent, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
self.task_complete(task)
return True
-
- bb.msg.note(1, bb.msg.domain.RunQueue,
- "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
- self.stats.total,
- task,
- self.rqdata.get_user_idstring(task)))
+ else:
+ startevent = runQueueTaskStarted(task, self.stats, self.rq)
+ bb.event.fire(startevent, self.cfgData)
pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
@@ -1281,8 +1298,6 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
- if self.stats.active < self.number_tasks:
- return True
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
@@ -1297,13 +1312,13 @@ class RunQueueExecuteTasks(RunQueueExecute):
return True
# Sanity Checks
- for task in range(self.stats.total):
+ for task in xrange(self.stats.total):
if self.runq_buildable[task] == 0:
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
+ logger.error("Task %s never buildable!", task)
if self.runq_running[task] == 0:
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
+ logger.error("Task %s never ran!", task)
if self.runq_complete[task] == 0:
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
+ logger.error("Task %s never completed!", task)
self.rq.state = runQueueComplete
return True
@@ -1332,12 +1347,12 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
# therefore aims to collapse the huge runqueue dependency tree into a smaller one
# only containing the setscene functions.
- for task in range(self.stats.total):
+ for task in xrange(self.stats.total):
self.runq_running.append(0)
self.runq_complete.append(0)
self.runq_buildable.append(0)
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
sq_revdeps_new.append(set())
if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
@@ -1368,7 +1383,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
process_endpoints(endpoints)
- for task in range(len(self.rqdata.runq_fnid)):
+ for task in xrange(len(self.rqdata.runq_fnid)):
if task in self.rqdata.runq_setscene:
deps = set()
for dep in sq_revdeps_new[task]:
@@ -1377,20 +1392,20 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
elif len(sq_revdeps_new[task]) != 0:
bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
- #for task in range(len(sq_revdeps_squash)):
+ #for task in xrange(len(sq_revdeps_squash)):
# print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
self.sq_deps = []
self.sq_revdeps = sq_revdeps_squash
self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
- for task in range(len(self.sq_revdeps)):
+ for task in xrange(len(self.sq_revdeps)):
self.sq_deps.append(set())
- for task in range(len(self.sq_revdeps)):
+ for task in xrange(len(self.sq_revdeps)):
for dep in self.sq_revdeps[task]:
self.sq_deps[dep].add(task)
- for task in range(len(self.sq_revdeps)):
+ for task in xrange(len(self.sq_revdeps)):
if len(self.sq_revdeps[task]) == 0:
self.runq_buildable[task] = 1
@@ -1401,7 +1416,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
sq_taskname = []
sq_task = []
noexec = []
- for task in range(len(self.sq_revdeps)):
+ for task in xrange(len(self.sq_revdeps)):
realtask = self.rqdata.runq_setscene[task]
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
taskname = self.rqdata.runq_task[realtask]
@@ -1424,14 +1439,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for v in valid:
valid_new.append(sq_task[v])
- for task in range(len(self.sq_revdeps)):
+ for task in xrange(len(self.sq_revdeps)):
if task not in valid_new and task not in noexec:
- bb.msg.debug(2, bb.msg.domain.RunQueue, "No package found so skipping setscene task %s" % (self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
+ logger.debug(2, 'No package found, so skipping setscene task %s',
+ self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
- #print(str(valid))
-
- bb.msg.note(1, bb.msg.domain.RunQueue, "Executing SetScene Tasks")
+ logger.info('Executing SetScene Tasks')
self.rq.state = runQueueSceneRun
@@ -1449,7 +1463,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
"""
index = self.rqdata.runq_setscene[task]
- bb.msg.debug(1, bb.msg.domain.RunQueue, "Found task %s could be accelerated" % self.rqdata.get_user_idstring(index))
+ logger.debug(1, 'Found task %s which could be accelerated',
+ self.rqdata.get_user_idstring(index))
self.scenequeue_covered.add(task)
self.scenequeue_updatecounters(task)
@@ -1461,7 +1476,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
def task_fail(self, task, result):
self.stats.taskFailed()
index = self.rqdata.runq_setscene[task]
- bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
+ bb.event.fire(runQueueTaskFailed(task, self.stats, result, self), self.cfgData)
self.scenequeue_notcovered.add(task)
self.scenequeue_updatecounters(task)
@@ -1489,13 +1504,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
task = None
if self.stats.active < self.number_tasks:
# Find the next setscene to run
- for nexttask in range(self.stats.total):
+ for nexttask in xrange(self.stats.total):
if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
- #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered))
- #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered):
- # bb.note("Skipping task %s" % nexttask)
- # self.scenequeue_skip(nexttask)
- # return True
task = nexttask
break
if task is not None:
@@ -1504,7 +1514,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
taskname = self.rqdata.runq_task[realtask] + "_setscene"
if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task)))
+ logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
+ task, self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
return True
@@ -1515,12 +1526,12 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
return True
if self.rq.check_stamp_task(realtask, taskname):
- bb.msg.debug(2, bb.msg.domain.RunQueue, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask)))
+ logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
+ task, self.rqdata.get_user_idstring(realtask))
self.task_skip(task)
return True
- bb.msg.note(1, bb.msg.domain.RunQueue,
- "Running setscene task %d of %d (%s:%s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
+ logger.info("Running setscene task %d of %d (%s:%s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
self.stats.total, fn, taskname))
pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
@@ -1546,11 +1557,14 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for task in oldcovered:
self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
- bb.debug("We can skip tasks %s" % self.rq.scenequeue_covered)
+ logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
self.rq.state = runQueueRunInit
return True
+ def fork_off_task(self, fn, task, taskname):
+ return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
+
class TaskFailure(Exception):
"""
Exception raised when a task in a runqueue fails
@@ -1583,51 +1597,48 @@ class runQueueTaskStarted(runQueueEvent):
"""
Event notifing a task was started
"""
- def __init__(self, task, stats, rq):
+ def __init__(self, task, stats, rq, noexec=False):
runQueueEvent.__init__(self, task, stats, rq)
- self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
+ self.noexec = noexec
class runQueueTaskFailed(runQueueEvent):
"""
Event notifing a task failed
"""
- def __init__(self, task, stats, rq):
+ def __init__(self, task, stats, exitcode, rq):
runQueueEvent.__init__(self, task, stats, rq)
- self.message = "Task %s failed (%s)" % (task, self.taskstring)
+ self.exitcode = exitcode
class runQueueTaskCompleted(runQueueEvent):
"""
Event notifing a task completed
"""
- def __init__(self, task, stats, rq):
- runQueueEvent.__init__(self, task, stats, rq)
- self.message = "Task %s completed (%s)" % (task, self.taskstring)
-#def check_stamp_fn(fn, taskname, d):
-# rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
-# fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
-# fnid = rq.rqdata.taskData.getfn_id(fn)
-# taskid = rq.get_task_id(fnid, taskname)
-# if taskid is not None:
-# return rq.check_stamp_task(taskid)
-# return None
+def check_stamp_fn(fn, taskname, d):
+ rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
+ fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
+ fnid = rqexe.rqdata.taskData.getfn_id(fn)
+ taskid = rqexe.rqdata.get_task_id(fnid, taskname)
+ if taskid is not None:
+ return rqexe.rq.check_stamp_task(taskid)
+ return None
class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
"""
def __init__(self, pipein, pipeout, d):
- self.fd = pipein
+ self.input = pipein
pipeout.close()
- fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+ fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
self.queue = ""
self.d = d
def read(self):
start = len(self.queue)
try:
- self.queue = self.queue + self.fd.read(1024)
- except IOError:
+ self.queue = self.queue + self.input.read(102400)
+ except (OSError, IOError):
pass
end = len(self.queue)
index = self.queue.find("</event>")
@@ -1642,4 +1653,4 @@ class runQueuePipe():
continue
if len(self.queue) > 0:
print("Warning, worker left partial message: %s" % self.queue)
- self.fd.close()
+ self.input.close()