summaryrefslogtreecommitdiff
path: root/scripts/pybootchartgui/pybootchartgui/tests
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/pybootchartgui/pybootchartgui/tests')
-rw-r--r--scripts/pybootchartgui/pybootchartgui/tests/parser_test.py105
-rw-r--r--scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py92
2 files changed, 197 insertions, 0 deletions
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
new file mode 100644
index 0000000..00fb3bf
--- /dev/null
+++ b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
@@ -0,0 +1,105 @@
+import sys, os, re, struct, operator, math
+from collections import defaultdict
+import unittest
+
+sys.path.insert(0, os.getcwd())
+
+import pybootchartgui.parsing as parsing
+import pybootchartgui.main as main
+
+debug = False
+
+def floatEq(f1, f2):
+ return math.fabs(f1-f2) < 0.00001
+
+bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
+parser = main._mk_options_parser()
+options, args = parser.parse_args(['--q', bootchart_dir])
+writer = main._mk_writer(options)
+
+class TestBCParser(unittest.TestCase):
+
+ def setUp(self):
+ self.name = "My first unittest"
+ self.rootdir = bootchart_dir
+
+ def mk_fname(self,f):
+ return os.path.join(self.rootdir, f)
+
+ def testParseHeader(self):
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('header'))
+ self.assertEqual(6, len(state.headers))
+ self.assertEqual(2, parsing.get_num_cpus(state.headers))
+
+ def test_parseTimedBlocks(self):
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log'))
+ self.assertEqual(141, len(state.disk_stats))
+
+ def testParseProcPsLog(self):
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
+ samples = state.ps_stats
+ processes = samples.process_map
+ sorted_processes = [processes[k] for k in sorted(processes.keys())]
+
+ ps_data = open(self.mk_fname('extract2.proc_ps.log'))
+ for index, line in enumerate(ps_data):
+ tokens = line.split();
+ process = sorted_processes[index]
+ if debug:
+ print(tokens[0:4])
+ print(process.pid / 1000, process.cmd, process.ppid, len(process.samples))
+ print('-------------------')
+
+ self.assertEqual(tokens[0], str(process.pid // 1000))
+ self.assertEqual(tokens[1], str(process.cmd))
+ self.assertEqual(tokens[2], str(process.ppid // 1000))
+ self.assertEqual(tokens[3], str(len(process.samples)))
+ ps_data.close()
+
+ def testparseProcDiskStatLog(self):
+ trace = parsing.Trace(writer, args, options)
+ state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header'))
+ state_with_headers.headers['system.cpu'] = 'xxx (2)'
+ samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats
+ self.assertEqual(141, len(samples))
+
+ diskstats_data = open(self.mk_fname('extract.proc_diskstats.log'))
+ for index, line in enumerate(diskstats_data):
+ tokens = line.split('\t')
+ sample = samples[index]
+ if debug:
+ print(line.rstrip())
+ print(sample)
+ print('-------------------')
+
+ self.assertEqual(tokens[0], str(sample.time))
+ self.assert_(floatEq(float(tokens[1]), sample.read))
+ self.assert_(floatEq(float(tokens[2]), sample.write))
+ self.assert_(floatEq(float(tokens[3]), sample.util))
+ diskstats_data.close()
+
+ def testparseProcStatLog(self):
+ trace = parsing.Trace(writer, args, options)
+ samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats
+ self.assertEqual(141, len(samples))
+
+ stat_data = open(self.mk_fname('extract.proc_stat.log'))
+ for index, line in enumerate(stat_data):
+ tokens = line.split('\t')
+ sample = samples[index]
+ if debug:
+ print(line.rstrip())
+ print(sample)
+ print('-------------------')
+ self.assert_(floatEq(float(tokens[0]), sample.time))
+ self.assert_(floatEq(float(tokens[1]), sample.user))
+ self.assert_(floatEq(float(tokens[2]), sample.sys))
+ self.assert_(floatEq(float(tokens[3]), sample.io))
+ stat_data.close()
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py
new file mode 100644
index 0000000..6f46a1c
--- /dev/null
+++ b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py
@@ -0,0 +1,92 @@
+import sys
+import os
+import unittest
+
+sys.path.insert(0, os.getcwd())
+
+import pybootchartgui.parsing as parsing
+import pybootchartgui.process_tree as process_tree
+import pybootchartgui.main as main
+
+if sys.version_info >= (3, 0):
+ long = int
+
+class TestProcessTree(unittest.TestCase):
+
+ def setUp(self):
+ self.name = "Process tree unittest"
+ self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
+
+ parser = main._mk_options_parser()
+ options, args = parser.parse_args(['--q', self.rootdir])
+ writer = main._mk_writer(options)
+ trace = parsing.Trace(writer, args, options)
+
+ parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
+ trace.compile(writer)
+ self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \
+ trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True)
+
+ def mk_fname(self,f):
+ return os.path.join(self.rootdir, f)
+
+ def flatten(self, process_tree):
+ flattened = []
+ for p in process_tree:
+ flattened.append(p)
+ flattened.extend(self.flatten(p.child_list))
+ return flattened
+
+ def checkAgainstJavaExtract(self, filename, process_tree):
+ test_data = open(filename)
+ for expected, actual in zip(test_data, self.flatten(process_tree)):
+ tokens = expected.split('\t')
+ self.assertEqual(int(tokens[0]), actual.pid // 1000)
+ self.assertEqual(tokens[1], actual.cmd)
+ self.assertEqual(long(tokens[2]), 10 * actual.start_time)
+ self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration")
+ self.assertEqual(int(tokens[4]), len(actual.child_list))
+ self.assertEqual(int(tokens[5]), len(actual.samples))
+ test_data.close()
+
+ def testBuild(self):
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree)
+
+ def testMergeLogger(self):
+ self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree)
+
+ def testPrune(self):
+ self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
+ self.processtree.prune(self.processtree.process_tree, None)
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree)
+
+ def testMergeExploders(self):
+ self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
+ self.processtree.prune(self.processtree.process_tree, None)
+ self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree)
+
+ def testMergeSiblings(self):
+ self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
+ self.processtree.prune(self.processtree.process_tree, None)
+ self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
+ self.processtree.merge_siblings(self.processtree.process_tree)
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree)
+
+ def testMergeRuns(self):
+ self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
+ self.processtree.prune(self.processtree.process_tree, None)
+ self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
+ self.processtree.merge_siblings(self.processtree.process_tree)
+ self.processtree.merge_runs(self.processtree.process_tree)
+ process_tree = self.processtree.process_tree
+ self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree)
+
+if __name__ == '__main__':
+ unittest.main()