2 # SPDX-License-Identifier: GPL-2.0
5 tdc.py - Linux tc (Traffic Control) unit test driver
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
16 from collections import OrderedDict
17 from string import Template
19 from tdc_config import *
20 from tdc_helper import *
26 def replace_keywords(cmd):
28 For a given executable command, substitute any known
29 variables contained within NAMES with the correct values
32 subcmd = tcmd.safe_substitute(NAMES)
36 def exec_cmd(command, nsonly=True):
38 Perform any required modifications on an executable command, then run
39 it in a subprocess and return the results.
41 if (USE_NS and nsonly):
42 command = 'ip netns exec $NS ' + command
45 command = replace_keywords(command)
47 proc = subprocess.Popen(command,
49 stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 (rawout, serr) = proc.communicate()
53 if proc.returncode != 0:
54 foutput = serr.decode("utf-8")
56 foutput = rawout.decode("utf-8")
63 def prepare_env(cmdlist):
65 Execute the setup/teardown commands for a test case. Optionally
66 terminate test execution if the command fails.
68 for cmdinfo in cmdlist:
69 if (type(cmdinfo) == list):
70 exit_codes = cmdinfo[1:]
79 (proc, foutput) = exec_cmd(cmd)
81 if proc.returncode not in exit_codes:
83 print("Could not execute:")
85 print("\nError message:")
87 print("\nAborting test run.")
92 def test_runner(filtered_tests, args):
94 Driver function for the unit tests.
96 Prints information about the tests being run, executes the setup and
97 teardown commands and the command under test itself. Also determines
98 success/failure based on the information in the test case and generates
99 TAP output accordingly.
101 testlist = filtered_tests
102 tcount = len(testlist)
104 tap = str(index) + ".." + str(tcount) + "\n"
106 for tidx in testlist:
109 if "flower" in tidx["category"] and args.device == None:
111 print("Test " + tidx["id"] + ": " + tidx["name"])
112 prepare_env(tidx["setup"])
113 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
114 exit_code = p.returncode
116 if (exit_code != int(tidx["expExitCode"])):
118 print("exit:", exit_code, int(tidx["expExitCode"]))
121 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
122 (p, procout) = exec_cmd(tidx["verifyCmd"])
123 match_index = re.findall(match_pattern, procout)
124 if len(match_index) != int(tidx["matchCount"]):
131 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
136 prepare_env(tidx["teardown"])
144 Create the network namespace in which the tests will be run and set up
145 the required network devices for it.
148 cmd = 'ip netns add $NS'
150 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
152 cmd = 'ip link set $DEV1 netns $NS'
154 cmd = 'ip link set $DEV0 up'
156 cmd = 'ip -n $NS link set $DEV1 up'
158 cmd = 'ip link set $DEV2 netns $NS'
160 cmd = 'ip -n $NS link set $DEV2 up'
166 Destroy the network namespace for testing (and any associated network
170 cmd = 'ip netns delete $NS'
174 def has_blank_ids(idlist):
176 Search the list for empty ID fields and return true/false accordingly.
178 return not(all(k for k in idlist))
181 def load_from_file(filename):
183 Open the JSON file containing the test cases and return them as an
184 ordered dictionary object.
186 with open(filename) as test_data:
187 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
188 idlist = get_id_list(testlist)
189 if (has_blank_ids(idlist)):
191 k['filename'] = filename
197 Create the argument parser.
199 parser = argparse.ArgumentParser(description='Linux TC unit tests')
203 def set_args(parser):
205 Set the command line arguments for tdc.
207 parser.add_argument('-p', '--path', type=str,
208 help='The full path to the tc executable to use')
209 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
210 help='Run tests only from the specified category, or if no category is specified, list known categories.')
211 parser.add_argument('-f', '--file', type=str,
212 help='Run tests from the specified file')
213 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
214 help='List all test cases, or those only within the specified category')
215 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
216 help='Display the test case with specified id')
217 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
218 help='Execute the single test case with specified ID')
219 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
220 help='Generate ID numbers for new test cases')
221 parser.add_argument('-d', '--device',
222 help='Execute the test case in flower category')
226 def check_default_settings(args):
228 Process any arguments overriding the default settings, and ensure the
229 settings are correct.
231 # Allow for overriding specific settings
234 if args.path != None:
235 NAMES['TC'] = args.path
236 if args.device != None:
237 NAMES['DEV2'] = args.device
238 if not os.path.isfile(NAMES['TC']):
239 print("The specified tc path " + NAMES['TC'] + " does not exist.")
243 def get_id_list(alltests):
245 Generate a list of all IDs in the test cases.
247 return [x["id"] for x in alltests]
250 def check_case_id(alltests):
252 Check for duplicate test case IDs.
254 idl = get_id_list(alltests)
255 return [x for x in idl if idl.count(x) > 1]
258 def does_id_exist(alltests, newid):
260 Check if a given ID already exists in the list of test cases.
262 idl = get_id_list(alltests)
263 return (any(newid == x for x in idl))
266 def generate_case_ids(alltests):
268 If a test case has a blank ID field, generate a random hex ID for it
269 and then write the test cases back to disk.
275 newid = str('%04x' % random.randrange(16**4))
276 if (does_id_exist(alltests, newid)):
284 if ('filename' in c):
285 ufilename.append(c['filename'])
286 ufilename = get_unique_item(ufilename)
291 if t['filename'] == f:
294 outfile = open(f, "w")
295 json.dump(testlist, outfile, indent=4)
299 def get_test_cases(args):
301 If a test case file is specified, retrieve tests from that file.
302 Otherwise, glob for all json files in subdirectories and load from
306 if args.file != None:
307 if not os.path.isfile(args.file):
308 print("The specified test case file " + args.file + " does not exist.")
313 for root, dirnames, filenames in os.walk('tc-tests'):
314 for filename in fnmatch.filter(filenames, '*.json'):
315 flist.append(os.path.join(root, filename))
317 for casefile in flist:
318 alltests = alltests + (load_from_file(casefile))
322 def set_operation_mode(args):
324 Load the test case data and process remaining arguments to determine
325 what the script should do for this run, and call the appropriate
328 alltests = get_test_cases(args)
331 idlist = get_id_list(alltests)
332 if (has_blank_ids(idlist)):
333 alltests = generate_case_ids(alltests)
335 print("No empty ID fields found in test files.")
338 duplicate_ids = check_case_id(alltests)
339 if (len(duplicate_ids) > 0):
340 print("The following test case IDs are not unique:")
341 print(str(set(duplicate_ids)))
342 print("Please correct them before continuing.")
345 ucat = get_test_categories(alltests)
348 show_test_case_by_id(alltests, args.showID[0])
352 target_id = args.execute[0]
357 if (args.category == '+c'):
358 print("Available categories:")
362 target_category = args.category
367 testcases = get_categorized_testlist(alltests, ucat)
370 if (len(args.list) == 0):
371 list_test_cases(alltests)
373 elif(len(args.list > 0)):
374 if (args.list not in ucat):
375 print("Unknown category " + args.list)
376 print("Available categories:")
379 list_test_cases(testcases[args.list])
382 if (os.geteuid() != 0):
383 print("This script must be run with root privileges.\n")
388 if (len(target_category) == 0):
389 if (len(target_id) > 0):
390 alltests = list(filter(lambda x: target_id in x['id'], alltests))
391 if (len(alltests) == 0):
392 print("Cannot find a test case with ID matching " + target_id)
394 catresults = test_runner(alltests, args)
395 print("All test results: " + "\n\n" + catresults)
396 elif (len(target_category) > 0):
397 if (target_category == "flower") and args.device == None:
398 print("Please specify a NIC device (-d) to run category flower")
400 if (target_category not in ucat):
401 print("Specified category is not present in this file.")
404 catresults = test_runner(testcases[target_category], args)
405 print("Category " + target_category + "\n\n" + catresults)
412 Start of execution; set up argument parser and get the arguments,
413 and start operations.
415 parser = args_parse()
416 parser = set_args(parser)
417 (args, remaining) = parser.parse_known_args()
418 check_default_settings(args)
420 set_operation_mode(args)
425 if __name__ == "__main__":