From 7971cb0aa3e517a53f0ce6d3ee9bc3179041ccb8 Mon Sep 17 00:00:00 2001 From: John Klug Date: Wed, 25 May 2022 17:12:18 -0500 Subject: mLinux 6 --- .../pybootchartgui/tests/parser_test.py | 105 +++++++++++++++++++++ .../pybootchartgui/tests/process_tree_test.py | 92 ++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 scripts/pybootchartgui/pybootchartgui/tests/parser_test.py create mode 100644 scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py (limited to 'scripts/pybootchartgui/pybootchartgui/tests') diff --git a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py new file mode 100644 index 0000000..00fb3bf --- /dev/null +++ b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py @@ -0,0 +1,105 @@ +import sys, os, re, struct, operator, math +from collections import defaultdict +import unittest + +sys.path.insert(0, os.getcwd()) + +import pybootchartgui.parsing as parsing +import pybootchartgui.main as main + +debug = False + +def floatEq(f1, f2): + return math.fabs(f1-f2) < 0.00001 + +bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') +parser = main._mk_options_parser() +options, args = parser.parse_args(['--q', bootchart_dir]) +writer = main._mk_writer(options) + +class TestBCParser(unittest.TestCase): + + def setUp(self): + self.name = "My first unittest" + self.rootdir = bootchart_dir + + def mk_fname(self,f): + return os.path.join(self.rootdir, f) + + def testParseHeader(self): + trace = parsing.Trace(writer, args, options) + state = parsing.parse_file(writer, trace, self.mk_fname('header')) + self.assertEqual(6, len(state.headers)) + self.assertEqual(2, parsing.get_num_cpus(state.headers)) + + def test_parseTimedBlocks(self): + trace = parsing.Trace(writer, args, options) + state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log')) + self.assertEqual(141, len(state.disk_stats)) + + def testParseProcPsLog(self): + trace = parsing.Trace(writer, args, options) + state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) + samples = state.ps_stats + processes = samples.process_map + sorted_processes = [processes[k] for k in sorted(processes.keys())] + + ps_data = open(self.mk_fname('extract2.proc_ps.log')) + for index, line in enumerate(ps_data): + tokens = line.split(); + process = sorted_processes[index] + if debug: + print(tokens[0:4]) + print(process.pid / 1000, process.cmd, process.ppid, len(process.samples)) + print('-------------------') + + self.assertEqual(tokens[0], str(process.pid // 1000)) + self.assertEqual(tokens[1], str(process.cmd)) + self.assertEqual(tokens[2], str(process.ppid // 1000)) + self.assertEqual(tokens[3], str(len(process.samples))) + ps_data.close() + + def testparseProcDiskStatLog(self): + trace = parsing.Trace(writer, args, options) + state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header')) + state_with_headers.headers['system.cpu'] = 'xxx (2)' + samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats + self.assertEqual(141, len(samples)) + + diskstats_data = open(self.mk_fname('extract.proc_diskstats.log')) + for index, line in enumerate(diskstats_data): + tokens = line.split('\t') + sample = samples[index] + if debug: + print(line.rstrip()) + print(sample) + print('-------------------') + + self.assertEqual(tokens[0], str(sample.time)) + self.assert_(floatEq(float(tokens[1]), sample.read)) + self.assert_(floatEq(float(tokens[2]), sample.write)) + self.assert_(floatEq(float(tokens[3]), sample.util)) + diskstats_data.close() + + def testparseProcStatLog(self): + trace = parsing.Trace(writer, args, options) + samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats + self.assertEqual(141, len(samples)) + + stat_data = open(self.mk_fname('extract.proc_stat.log')) + for index, line in enumerate(stat_data): + tokens = line.split('\t') + sample = samples[index] + if debug: + print(line.rstrip()) + print(sample) + print('-------------------') + self.assert_(floatEq(float(tokens[0]), sample.time)) + self.assert_(floatEq(float(tokens[1]), sample.user)) + self.assert_(floatEq(float(tokens[2]), sample.sys)) + self.assert_(floatEq(float(tokens[3]), sample.io)) + stat_data.close() + +if __name__ == '__main__': + unittest.main() + diff --git a/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py new file mode 100644 index 0000000..6f46a1c --- /dev/null +++ b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py @@ -0,0 +1,92 @@ +import sys +import os +import unittest + +sys.path.insert(0, os.getcwd()) + +import pybootchartgui.parsing as parsing +import pybootchartgui.process_tree as process_tree +import pybootchartgui.main as main + +if sys.version_info >= (3, 0): + long = int + +class TestProcessTree(unittest.TestCase): + + def setUp(self): + self.name = "Process tree unittest" + self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') + + parser = main._mk_options_parser() + options, args = parser.parse_args(['--q', self.rootdir]) + writer = main._mk_writer(options) + trace = parsing.Trace(writer, args, options) + + parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) + trace.compile(writer) + self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \ + trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True) + + def mk_fname(self,f): + return os.path.join(self.rootdir, f) + + def flatten(self, process_tree): + flattened = [] + for p in process_tree: + flattened.append(p) + flattened.extend(self.flatten(p.child_list)) + return flattened + + def checkAgainstJavaExtract(self, filename, process_tree): + test_data = open(filename) + for expected, actual in zip(test_data, self.flatten(process_tree)): + tokens = expected.split('\t') + self.assertEqual(int(tokens[0]), actual.pid // 1000) + self.assertEqual(tokens[1], actual.cmd) + self.assertEqual(long(tokens[2]), 10 * actual.start_time) + self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration") + self.assertEqual(int(tokens[4]), len(actual.child_list)) + self.assertEqual(int(tokens[5]), len(actual.samples)) + test_data.close() + + def testBuild(self): + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree) + + def testMergeLogger(self): + self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree) + + def testPrune(self): + self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) + self.processtree.prune(self.processtree.process_tree, None) + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree) + + def testMergeExploders(self): + self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) + self.processtree.prune(self.processtree.process_tree, None) + self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree) + + def testMergeSiblings(self): + self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) + self.processtree.prune(self.processtree.process_tree, None) + self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) + self.processtree.merge_siblings(self.processtree.process_tree) + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree) + + def testMergeRuns(self): + self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) + self.processtree.prune(self.processtree.process_tree, None) + self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) + self.processtree.merge_siblings(self.processtree.process_tree) + self.processtree.merge_runs(self.processtree.process_tree) + process_tree = self.processtree.process_tree + self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree) + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.3