diff options
author | Chris Larson <chris_larson@mentor.com> | 2010-11-18 20:21:54 -0700 |
---|---|---|
committer | Richard Purdie <rpurdie@linux.intel.com> | 2011-01-04 14:46:42 +0000 |
commit | 32ea7668712a50d8f8b67d5e4558039e5092a485 (patch) | |
tree | 2473f8b1aade6131c7a37fbad2cc4d23998a3a56 /bitbake/lib/bb/cooker.py | |
parent | 570bec37a898fb502d166a22f20bdb1da8c21c38 (diff) | |
download | openembedded-core-32ea7668712a50d8f8b67d5e4558039e5092a485.tar.gz openembedded-core-32ea7668712a50d8f8b67d5e4558039e5092a485.tar.bz2 openembedded-core-32ea7668712a50d8f8b67d5e4558039e5092a485.zip |
Implement parallel parsing support
This utilizes python's multiprocessing module. The default number of threads
to be used is the same as the number of available processor cores, however,
you can manually set this with the BB_NUMBER_PARSE_THREADS variable.
(Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f)
Signed-off-by: Chris Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake/lib/bb/cooker.py')
-rw-r--r-- | bitbake/lib/bb/cooker.py | 135 |
1 files changed, 97 insertions, 38 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 6194919e4c..0143c149b8 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -25,6 +25,8 @@ from __future__ import print_function import sys, os, glob, os.path, re, time import logging import sre_constants +import multiprocessing +import signal from cStringIO import StringIO from contextlib import closing import bb @@ -976,7 +978,7 @@ class CookerExit(bb.event.Event): def __init__(self): bb.event.Event.__init__(self) -class CookerParser: +class CookerParser(object): def __init__(self, cooker, filelist, masked): # Internal data self.filelist = filelist @@ -987,49 +989,106 @@ class CookerParser: self.cached = 0 self.error = 0 self.masked = masked - self.total = len(filelist) self.skipped = 0 self.virtuals = 0 + self.total = len(filelist) - # Pointer to the next file to parse - self.pointer = 0 - - def parse_next(self): - cooker = self.cooker - if self.pointer < len(self.filelist): - f = self.filelist[self.pointer] - - try: - fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status) - if fromCache: - self.cached += 1 - else: - self.parsed += 1 - - self.skipped += skipped - self.virtuals += virtuals + # current to the next file to parse + self.current = 0 + self.result_queue = None + self.fromcache = None - except KeyboardInterrupt: - cooker.bb_cache.remove(f) - cooker.bb_cache.sync() - raise - except Exception as e: - self.error += 1 - cooker.bb_cache.remove(f) - parselog.exception("Unable to open %s", f) - except: - cooker.bb_cache.remove(f) - raise - finally: - bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data) + self.launch_processes() - self.pointer += 1 + def launch_processes(self): + self.task_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() + + self.fromcache = [] + cfgdata = self.cooker.configuration.data + for filename in self.filelist: + appends = self.cooker.get_file_appends(filename) + if not self.cooker.bb_cache.cacheValid(filename): + self.task_queue.put((filename, appends)) + else: + self.fromcache.append((filename, appends)) + + def worker(input, output, cfgdata): + signal.signal(signal.SIGINT, signal.SIG_IGN) + for filename, appends in iter(input.get, 'STOP'): + infos = bb.cache.Cache.parse(filename, appends, cfgdata) + output.put(infos) + + self.processes = [] + num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or + multiprocessing.cpu_count()) + for i in xrange(num_processes): + process = multiprocessing.Process(target=worker, + args=(self.task_queue, + self.result_queue, + cfgdata)) + process.start() + self.processes.append(process) + + def shutdown(self, clean=True): + self.result_queue.close() + for process in self.processes: + if clean: + self.task_queue.put('STOP') + else: + process.terminate() + self.task_queue.close() + for process in self.processes: + process.join() + self.cooker.bb_cache.sync() + bb.codeparser.parser_cache_save(self.cooker.configuration.data) + if self.error > 0: + raise ParsingErrorsFound() + + def progress(self): + bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, + self.skipped, self.masked, + self.virtuals, self.error, + self.total), + self.cooker.configuration.event_data) - if self.pointer >= self.total: - cooker.bb_cache.sync() - bb.codeparser.parser_cache_save(cooker.configuration.data) - if self.error > 0: - raise ParsingErrorsFound + def parse_next(self): + cooker = self.cooker + if self.current >= self.total: + self.shutdown() return False + + try: + if self.result_queue.empty() and self.fromcache: + filename, appends = self.fromcache.pop() + _, infos = cooker.bb_cache.load(filename, appends, + self.cooker.configuration.data) + parsed = False + else: + infos = self.result_queue.get() + parsed = True + except KeyboardInterrupt: + self.shutdown(clean=False) + raise + except Exception as e: + self.error += 1 + parselog.critical(str(e)) + else: + if parsed: + self.parsed += 1 + else: + self.cached += 1 + self.virtuals += len(infos) + + for virtualfn, info in infos: + cooker.bb_cache.add_info(virtualfn, info, cooker.status, + parsed=parsed) + if info.skipped: + self.skipped += 1 + finally: + self.progress() + + self.current += 1 return True + |