]> asedeno.scripts.mit.edu Git - linux.git/blob - tools/testing/kunit/kunit_parser.py
s390/dasd: fix data corruption for thin provisioned devices
[linux.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self):
21                 self.status = None
22                 self.name = None
23                 self.cases = []
24
25         def __str__(self):
26                 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self):
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self):
33                 self.status = None
34                 self.name = ''
35                 self.log = []
36
37         def __str__(self):
38                 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self):
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         TEST_CRASHED = auto()
47         NO_TESTS = auto()
48
49 kunit_start_re = re.compile(r'^TAP version [0-9]+$')
50 kunit_end_re = re.compile('List of all partitions:')
51
52 def isolate_kunit_output(kernel_output):
53         started = False
54         for line in kernel_output:
55                 if kunit_start_re.match(line):
56                         started = True
57                         yield line
58                 elif kunit_end_re.match(line):
59                         break
60                 elif started:
61                         yield line
62
63 def raw_output(kernel_output):
64         for line in kernel_output:
65                 print(line)
66
67 DIVIDER = '=' * 60
68
69 RESET = '\033[0;0m'
70
71 def red(text):
72         return '\033[1;31m' + text + RESET
73
74 def yellow(text):
75         return '\033[1;33m' + text + RESET
76
77 def green(text):
78         return '\033[1;32m' + text + RESET
79
80 def print_with_timestamp(message):
81         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
82
83 def format_suite_divider(message):
84         return '======== ' + message + ' ========'
85
86 def print_suite_divider(message):
87         print_with_timestamp(DIVIDER)
88         print_with_timestamp(format_suite_divider(message))
89
90 def print_log(log):
91         for m in log:
92                 print_with_timestamp(m)
93
94 TAP_ENTRIES = re.compile(r'^(TAP|\t?ok|\t?not ok|\t?[0-9]+\.\.[0-9]+|\t?#).*$')
95
96 def consume_non_diagnositic(lines: List[str]) -> None:
97         while lines and not TAP_ENTRIES.match(lines[0]):
98                 lines.pop(0)
99
100 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
101         while lines and not TAP_ENTRIES.match(lines[0]):
102                 test_case.log.append(lines[0])
103                 lines.pop(0)
104
105 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
106
107 OK_NOT_OK_SUBTEST = re.compile(r'^\t(ok|not ok) [0-9]+ - (.*)$')
108
109 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) [0-9]+ - (.*)$')
110
111 def parse_ok_not_ok_test_case(lines: List[str],
112                               test_case: TestCase,
113                               expecting_test_case: bool) -> bool:
114         save_non_diagnositic(lines, test_case)
115         if not lines:
116                 if expecting_test_case:
117                         test_case.status = TestStatus.TEST_CRASHED
118                         return True
119                 else:
120                         return False
121         line = lines[0]
122         match = OK_NOT_OK_SUBTEST.match(line)
123         if match:
124                 test_case.log.append(lines.pop(0))
125                 test_case.name = match.group(2)
126                 if test_case.status == TestStatus.TEST_CRASHED:
127                         return True
128                 if match.group(1) == 'ok':
129                         test_case.status = TestStatus.SUCCESS
130                 else:
131                         test_case.status = TestStatus.FAILURE
132                 return True
133         else:
134                 return False
135
136 SUBTEST_DIAGNOSTIC = re.compile(r'^\t# .*?: (.*)$')
137 DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
138
139 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
140         save_non_diagnositic(lines, test_case)
141         if not lines:
142                 return False
143         line = lines[0]
144         match = SUBTEST_DIAGNOSTIC.match(line)
145         if match:
146                 test_case.log.append(lines.pop(0))
147                 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
148                         test_case.status = TestStatus.TEST_CRASHED
149                 return True
150         else:
151                 return False
152
153 def parse_test_case(lines: List[str], expecting_test_case: bool) -> TestCase:
154         test_case = TestCase()
155         save_non_diagnositic(lines, test_case)
156         while parse_diagnostic(lines, test_case):
157                 pass
158         if parse_ok_not_ok_test_case(lines, test_case, expecting_test_case):
159                 return test_case
160         else:
161                 return None
162
163 SUBTEST_HEADER = re.compile(r'^\t# Subtest: (.*)$')
164
165 def parse_subtest_header(lines: List[str]) -> str:
166         consume_non_diagnositic(lines)
167         if not lines:
168                 return None
169         match = SUBTEST_HEADER.match(lines[0])
170         if match:
171                 lines.pop(0)
172                 return match.group(1)
173         else:
174                 return None
175
176 SUBTEST_PLAN = re.compile(r'\t[0-9]+\.\.([0-9]+)')
177
178 def parse_subtest_plan(lines: List[str]) -> int:
179         consume_non_diagnositic(lines)
180         match = SUBTEST_PLAN.match(lines[0])
181         if match:
182                 lines.pop(0)
183                 return int(match.group(1))
184         else:
185                 return None
186
187 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
188         if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
189                 return TestStatus.TEST_CRASHED
190         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
191                 return TestStatus.FAILURE
192         elif left != TestStatus.SUCCESS:
193                 return left
194         elif right != TestStatus.SUCCESS:
195                 return right
196         else:
197                 return TestStatus.SUCCESS
198
199 def parse_ok_not_ok_test_suite(lines: List[str], test_suite: TestSuite) -> bool:
200         consume_non_diagnositic(lines)
201         if not lines:
202                 test_suite.status = TestStatus.TEST_CRASHED
203                 return False
204         line = lines[0]
205         match = OK_NOT_OK_MODULE.match(line)
206         if match:
207                 lines.pop(0)
208                 if match.group(1) == 'ok':
209                         test_suite.status = TestStatus.SUCCESS
210                 else:
211                         test_suite.status = TestStatus.FAILURE
212                 return True
213         else:
214                 return False
215
216 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
217         status_list = map(to_status, status_container_list)
218         return reduce(max_status, status_list, TestStatus.SUCCESS)
219
220 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
221         max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
222         return max_status(max_test_case_status, test_suite.status)
223
224 def parse_test_suite(lines: List[str]) -> TestSuite:
225         if not lines:
226                 return None
227         consume_non_diagnositic(lines)
228         test_suite = TestSuite()
229         test_suite.status = TestStatus.SUCCESS
230         name = parse_subtest_header(lines)
231         if not name:
232                 return None
233         test_suite.name = name
234         expected_test_case_num = parse_subtest_plan(lines)
235         if not expected_test_case_num:
236                 return None
237         test_case = parse_test_case(lines, expected_test_case_num > 0)
238         expected_test_case_num -= 1
239         while test_case:
240                 test_suite.cases.append(test_case)
241                 test_case = parse_test_case(lines, expected_test_case_num > 0)
242                 expected_test_case_num -= 1
243         if parse_ok_not_ok_test_suite(lines, test_suite):
244                 test_suite.status = bubble_up_test_case_errors(test_suite)
245                 return test_suite
246         elif not lines:
247                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
248                 return test_suite
249         else:
250                 print('failed to parse end of suite' + lines[0])
251                 return None
252
253 TAP_HEADER = re.compile(r'^TAP version 14$')
254
255 def parse_tap_header(lines: List[str]) -> bool:
256         consume_non_diagnositic(lines)
257         if TAP_HEADER.match(lines[0]):
258                 lines.pop(0)
259                 return True
260         else:
261                 return False
262
263 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
264         return bubble_up_errors(lambda x: x.status, test_suite_list)
265
266 def parse_test_result(lines: List[str]) -> TestResult:
267         if not lines:
268                 return TestResult(TestStatus.NO_TESTS, [], lines)
269         consume_non_diagnositic(lines)
270         if not parse_tap_header(lines):
271                 return None
272         test_suites = []
273         test_suite = parse_test_suite(lines)
274         while test_suite:
275                 test_suites.append(test_suite)
276                 test_suite = parse_test_suite(lines)
277         return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
278
279 def parse_run_tests(kernel_output) -> TestResult:
280         total_tests = 0
281         failed_tests = 0
282         crashed_tests = 0
283         test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
284         for test_suite in test_result.suites:
285                 if test_suite.status == TestStatus.SUCCESS:
286                         print_suite_divider(green('[PASSED] ') + test_suite.name)
287                 elif test_suite.status == TestStatus.TEST_CRASHED:
288                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
289                 else:
290                         print_suite_divider(red('[FAILED] ') + test_suite.name)
291                 for test_case in test_suite.cases:
292                         total_tests += 1
293                         if test_case.status == TestStatus.SUCCESS:
294                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
295                         elif test_case.status == TestStatus.TEST_CRASHED:
296                                 crashed_tests += 1
297                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
298                                 print_log(map(yellow, test_case.log))
299                                 print_with_timestamp('')
300                         else:
301                                 failed_tests += 1
302                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
303                                 print_log(map(yellow, test_case.log))
304                                 print_with_timestamp('')
305         print_with_timestamp(DIVIDER)
306         fmt = green if test_result.status == TestStatus.SUCCESS else red
307         print_with_timestamp(
308                 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
309                     (total_tests, failed_tests, crashed_tests)))
310         return test_result