]> asedeno.scripts.mit.edu Git - linux.git/blob - tools/testing/selftests/tc-testing/tdc.py
Merge branch 'acpi-ec' into acpi-pm
[linux.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2
3 """
4 tdc.py - Linux tc (Traffic Control) unit test driver
5
6 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7 """
8
9 import re
10 import os
11 import sys
12 import argparse
13 import json
14 import subprocess
15 from collections import OrderedDict
16 from string import Template
17
18 from tdc_config import *
19 from tdc_helper import *
20
21
22 USE_NS = True
23
24
25 def replace_keywords(cmd):
26     """
27     For a given executable command, substitute any known
28     variables contained within NAMES with the correct values
29     """
30     tcmd = Template(cmd)
31     subcmd = tcmd.safe_substitute(NAMES)
32     return subcmd
33
34
35 def exec_cmd(command, nsonly=True):
36     """
37     Perform any required modifications on an executable command, then run
38     it in a subprocess and return the results.
39     """
40     if (USE_NS and nsonly):
41         command = 'ip netns exec $NS ' + command
42
43     if '$' in command:
44         command = replace_keywords(command)
45
46     proc = subprocess.Popen(command,
47         shell=True,
48         stdout=subprocess.PIPE,
49         stderr=subprocess.PIPE)
50     (rawout, serr) = proc.communicate()
51
52     if proc.returncode != 0:
53         foutput = serr.decode("utf-8")
54     else:
55         foutput = rawout.decode("utf-8")
56
57     proc.stdout.close()
58     proc.stderr.close()
59     return proc, foutput
60
61
62 def prepare_env(cmdlist):
63     """
64     Execute the setup/teardown commands for a test case. Optionally
65     terminate test execution if the command fails.
66     """
67     for cmdinfo in cmdlist:
68         if (type(cmdinfo) == list):
69             exit_codes = cmdinfo[1:]
70             cmd = cmdinfo[0]
71         else:
72             exit_codes = [0]
73             cmd = cmdinfo
74
75         if (len(cmd) == 0):
76             continue
77
78         (proc, foutput) = exec_cmd(cmd)
79
80         if proc.returncode not in exit_codes:
81             print
82             print("Could not execute:")
83             print(cmd)
84             print("\nError message:")
85             print(foutput)
86             print("\nAborting test run.")
87             ns_destroy()
88             exit(1)
89
90
91 def test_runner(filtered_tests):
92     """
93     Driver function for the unit tests.
94
95     Prints information about the tests being run, executes the setup and
96     teardown commands and the command under test itself. Also determines
97     success/failure based on the information in the test case and generates
98     TAP output accordingly.
99     """
100     testlist = filtered_tests
101     tcount = len(testlist)
102     index = 1
103     tap = str(index) + ".." + str(tcount) + "\n"
104
105     for tidx in testlist:
106         result = True
107         tresult = ""
108         print("Test " + tidx["id"] + ": " + tidx["name"])
109         prepare_env(tidx["setup"])
110         (p, procout) = exec_cmd(tidx["cmdUnderTest"])
111         exit_code = p.returncode
112
113         if (exit_code != int(tidx["expExitCode"])):
114             result = False
115             print("exit:", exit_code, int(tidx["expExitCode"]))
116             print(procout)
117         else:
118             match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
119             (p, procout) = exec_cmd(tidx["verifyCmd"])
120             match_index = re.findall(match_pattern, procout)
121             if len(match_index) != int(tidx["matchCount"]):
122                 result = False
123
124         if result == True:
125             tresult += "ok "
126         else:
127             tresult += "not ok "
128         tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
129
130         if result == False:
131             tap += procout
132
133         prepare_env(tidx["teardown"])
134         index += 1
135
136     return tap
137
138
139 def ns_create():
140     """
141     Create the network namespace in which the tests will be run and set up
142     the required network devices for it.
143     """
144     if (USE_NS):
145         cmd = 'ip netns add $NS'
146         exec_cmd(cmd, False)
147         cmd = 'ip link add $DEV0 type veth peer name $DEV1'
148         exec_cmd(cmd, False)
149         cmd = 'ip link set $DEV1 netns $NS'
150         exec_cmd(cmd, False)
151         cmd = 'ip link set $DEV0 up'
152         exec_cmd(cmd, False)
153         cmd = 'ip -s $NS link set $DEV1 up'
154         exec_cmd(cmd, False)
155
156
157 def ns_destroy():
158     """
159     Destroy the network namespace for testing (and any associated network
160     devices as well)
161     """
162     if (USE_NS):
163         cmd = 'ip netns delete $NS'
164         exec_cmd(cmd, False)
165
166
167 def has_blank_ids(idlist):
168     """
169     Search the list for empty ID fields and return true/false accordingly.
170     """
171     return not(all(k for k in idlist))
172
173
174 def load_from_file(filename):
175     """
176     Open the JSON file containing the test cases and return them as an
177     ordered dictionary object.
178     """
179     with open(filename) as test_data:
180         testlist = json.load(test_data, object_pairs_hook=OrderedDict)
181     idlist = get_id_list(testlist)
182     if (has_blank_ids(idlist)):
183         for k in testlist:
184             k['filename'] = filename
185     return testlist
186
187
188 def args_parse():
189     """
190     Create the argument parser.
191     """
192     parser = argparse.ArgumentParser(description='Linux TC unit tests')
193     return parser
194
195
196 def set_args(parser):
197     """
198     Set the command line arguments for tdc.
199     """
200     parser.add_argument('-p', '--path', type=str,
201                         help='The full path to the tc executable to use')
202     parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
203                         help='Run tests only from the specified category, or if no category is specified, list known categories.')
204     parser.add_argument('-f', '--file', type=str,
205                         help='Run tests from the specified file')
206     parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
207                         help='List all test cases, or those only within the specified category')
208     parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
209                         help='Display the test case with specified id')
210     parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
211                         help='Execute the single test case with specified ID')
212     parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
213                         help='Generate ID numbers for new test cases')
214     return parser
215     return parser
216
217
218 def check_default_settings(args):
219     """
220     Process any arguments overriding the default settings, and ensure the
221     settings are correct.
222     """
223     # Allow for overriding specific settings
224     global NAMES
225
226     if args.path != None:
227          NAMES['TC'] = args.path
228     if not os.path.isfile(NAMES['TC']):
229         print("The specified tc path " + NAMES['TC'] + " does not exist.")
230         exit(1)
231
232
233 def get_id_list(alltests):
234     """
235     Generate a list of all IDs in the test cases.
236     """
237     return [x["id"] for x in alltests]
238
239
240 def check_case_id(alltests):
241     """
242     Check for duplicate test case IDs.
243     """
244     idl = get_id_list(alltests)
245     return [x for x in idl if idl.count(x) > 1]
246
247
248 def does_id_exist(alltests, newid):
249     """
250     Check if a given ID already exists in the list of test cases.
251     """
252     idl = get_id_list(alltests)
253     return (any(newid == x for x in idl))
254
255
256 def generate_case_ids(alltests):
257     """
258     If a test case has a blank ID field, generate a random hex ID for it
259     and then write the test cases back to disk.
260     """
261     import random
262     for c in alltests:
263         if (c["id"] == ""):
264             while True:
265                 newid = str('%04x' % random.randrange(16**4))
266                 if (does_id_exist(alltests, newid)):
267                     continue
268                 else:
269                     c['id'] = newid
270                     break
271
272     ufilename = []
273     for c in alltests:
274         if ('filename' in c):
275             ufilename.append(c['filename'])
276     ufilename = get_unique_item(ufilename)
277     for f in ufilename:
278         testlist = []
279         for t in alltests:
280             if 'filename' in t:
281                 if t['filename'] == f:
282                     del t['filename']
283                     testlist.append(t)
284         outfile = open(f, "w")
285         json.dump(testlist, outfile, indent=4)
286         outfile.close()
287
288
289 def get_test_cases(args):
290     """
291     If a test case file is specified, retrieve tests from that file.
292     Otherwise, glob for all json files in subdirectories and load from
293     each one.
294     """
295     import fnmatch
296     if args.file != None:
297         if not os.path.isfile(args.file):
298             print("The specified test case file " + args.file + " does not exist.")
299             exit(1)
300         flist = [args.file]
301     else:
302         flist = []
303         for root, dirnames, filenames in os.walk('tc-tests'):
304             for filename in fnmatch.filter(filenames, '*.json'):
305                 flist.append(os.path.join(root, filename))
306     alltests = list()
307     for casefile in flist:
308         alltests = alltests + (load_from_file(casefile))
309     return alltests
310
311
312 def set_operation_mode(args):
313     """
314     Load the test case data and process remaining arguments to determine
315     what the script should do for this run, and call the appropriate
316     function.
317     """
318     alltests = get_test_cases(args)
319
320     if args.gen_id:
321         idlist = get_id_list(alltests)
322         if (has_blank_ids(idlist)):
323             alltests = generate_case_ids(alltests)
324         else:
325             print("No empty ID fields found in test files.")
326         exit(0)
327
328     duplicate_ids = check_case_id(alltests)
329     if (len(duplicate_ids) > 0):
330         print("The following test case IDs are not unique:")
331         print(str(set(duplicate_ids)))
332         print("Please correct them before continuing.")
333         exit(1)
334
335     ucat = get_test_categories(alltests)
336
337     if args.showID:
338         show_test_case_by_id(alltests, args.showID[0])
339         exit(0)
340
341     if args.execute:
342         target_id = args.execute[0]
343     else:
344         target_id = ""
345
346     if args.category:
347         if (args.category == '+c'):
348             print("Available categories:")
349             print_sll(ucat)
350             exit(0)
351         else:
352             target_category = args.category
353     else:
354         target_category = ""
355
356
357     testcases = get_categorized_testlist(alltests, ucat)
358
359     if args.list:
360         if (len(args.list) == 0):
361             list_test_cases(alltests)
362             exit(0)
363         elif(len(args.list > 0)):
364             if (args.list not in ucat):
365                 print("Unknown category " + args.list)
366                 print("Available categories:")
367                 print_sll(ucat)
368                 exit(1)
369             list_test_cases(testcases[args.list])
370             exit(0)
371
372     if (os.geteuid() != 0):
373         print("This script must be run with root privileges.\n")
374         exit(1)
375
376     ns_create()
377
378     if (len(target_category) == 0):
379         if (len(target_id) > 0):
380             alltests = list(filter(lambda x: target_id in x['id'], alltests))
381             if (len(alltests) == 0):
382                 print("Cannot find a test case with ID matching " + target_id)
383                 exit(1)
384         catresults = test_runner(alltests)
385         print("All test results: " + "\n\n" + catresults)
386     elif (len(target_category) > 0):
387         if (target_category not in ucat):
388             print("Specified category is not present in this file.")
389             exit(1)
390         else:
391             catresults = test_runner(testcases[target_category])
392             print("Category " + target_category + "\n\n" + catresults)
393
394     ns_destroy()
395
396
397 def main():
398     """
399     Start of execution; set up argument parser and get the arguments,
400     and start operations.
401     """
402     parser = args_parse()
403     parser = set_args(parser)
404     (args, remaining) = parser.parse_known_args()
405     check_default_settings(args)
406
407     set_operation_mode(args)
408
409     exit(0)
410
411
412 if __name__ == "__main__":
413     main()