diff options
author | Aníbal Limón <anibal.limon@linux.intel.com> | 2015-06-23 11:49:53 -0500 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2015-06-26 09:25:50 +0100 |
commit | 524d92ed7b53bef933527095e82f378b934f25ef (patch) | |
tree | b6fb65d2634631b09b420e71ad6c41f8b4b95684 /meta | |
parent | 2587b83faabdc8858e8746201805369ed8d53ba8 (diff) | |
download | openembedded-core-524d92ed7b53bef933527095e82f378b934f25ef.tar.gz openembedded-core-524d92ed7b53bef933527095e82f378b934f25ef.tar.bz2 openembedded-core-524d92ed7b53bef933527095e82f378b934f25ef.zip |
oe/utils.py: Fix thread leakage in ThreadPool
In order to fix Thread leakage caused by not call join() in Threads,
Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.
classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.
Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta')
-rw-r--r-- | meta/classes/sstate.bbclass | 3 | ||||
-rw-r--r-- | meta/lib/oe/utils.py | 26 |
2 files changed, 22 insertions, 7 deletions
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass index 1e5e98a1da..a80d1ced72 100644 --- a/meta/classes/sstate.bbclass +++ b/meta/classes/sstate.bbclass @@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False): bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist)) import multiprocessing nproc = min(multiprocessing.cpu_count(), len(tasklist)) - pool = oe.utils.ThreadedPool(nproc) + pool = oe.utils.ThreadedPool(nproc, len(tasklist)) for t in tasklist: pool.add_task(checkstatus, t) + pool.start() pool.wait_completion() inheritlist = d.getVar("INHERIT", True) diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 0de880013a..f0d3c14137 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py @@ -222,11 +222,16 @@ class ThreadedWorker(Thread): Thread.__init__(self) self.tasks = tasks self.daemon = True - self.start() def run(self): + from Queue import Empty + while True: - func, args, kargs = self.tasks.get() + try: + func, args, kargs = self.tasks.get(block=False) + except Empty: + break + try: func(*args, **kargs) except Exception, e: @@ -236,9 +241,17 @@ class ThreadedWorker(Thread): class ThreadedPool: """Pool of threads consuming tasks from a queue""" - def __init__(self, num_threads): - self.tasks = Queue(num_threads) - for _ in range(num_threads): ThreadedWorker(self.tasks) + def __init__(self, num_workers, num_tasks): + self.tasks = Queue(num_tasks) + self.workers = [] + + for _ in range(num_workers): + worker = ThreadedWorker(self.tasks) + self.workers.append(worker) + + def start(self): + for worker in self.workers: + worker.start() def add_task(self, func, *args, **kargs): """Add a task to the queue""" @@ -247,4 +260,5 @@ class ThreadedPool: def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() - + for worker in self.workers: + worker.join() |