4 tdc.py - Linux tc (Traffic Control) unit test driver
6 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
15 from collections import OrderedDict
16 from string import Template
18 from tdc_config import *
19 from tdc_helper import *
25 def replace_keywords(cmd):
27 For a given executable command, substitute any known
28 variables contained within NAMES with the correct values
31 subcmd = tcmd.safe_substitute(NAMES)
35 def exec_cmd(command, nsonly=True):
37 Perform any required modifications on an executable command, then run
38 it in a subprocess and return the results.
40 if (USE_NS and nsonly):
41 command = 'ip netns exec $NS ' + command
44 command = replace_keywords(command)
46 proc = subprocess.Popen(command,
48 stdout=subprocess.PIPE,
49 stderr=subprocess.PIPE)
50 (rawout, serr) = proc.communicate()
52 if proc.returncode != 0:
53 foutput = serr.decode("utf-8")
55 foutput = rawout.decode("utf-8")
62 def prepare_env(cmdlist):
64 Execute the setup/teardown commands for a test case. Optionally
65 terminate test execution if the command fails.
67 for cmdinfo in cmdlist:
68 if (type(cmdinfo) == list):
69 exit_codes = cmdinfo[1:]
78 (proc, foutput) = exec_cmd(cmd)
80 if proc.returncode not in exit_codes:
82 print("Could not execute:")
84 print("\nError message:")
86 print("\nAborting test run.")
91 def test_runner(filtered_tests, args):
93 Driver function for the unit tests.
95 Prints information about the tests being run, executes the setup and
96 teardown commands and the command under test itself. Also determines
97 success/failure based on the information in the test case and generates
98 TAP output accordingly.
100 testlist = filtered_tests
101 tcount = len(testlist)
103 tap = str(index) + ".." + str(tcount) + "\n"
105 for tidx in testlist:
108 if "flower" in tidx["category"] and args.device == None:
110 print("Test " + tidx["id"] + ": " + tidx["name"])
111 prepare_env(tidx["setup"])
112 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
113 exit_code = p.returncode
115 if (exit_code != int(tidx["expExitCode"])):
117 print("exit:", exit_code, int(tidx["expExitCode"]))
120 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
121 (p, procout) = exec_cmd(tidx["verifyCmd"])
122 match_index = re.findall(match_pattern, procout)
123 if len(match_index) != int(tidx["matchCount"]):
130 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
135 prepare_env(tidx["teardown"])
143 Create the network namespace in which the tests will be run and set up
144 the required network devices for it.
147 cmd = 'ip netns add $NS'
149 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
151 cmd = 'ip link set $DEV1 netns $NS'
153 cmd = 'ip link set $DEV0 up'
155 cmd = 'ip -n $NS link set $DEV1 up'
157 cmd = 'ip link set $DEV2 netns $NS'
159 cmd = 'ip -n $NS link set $DEV2 up'
165 Destroy the network namespace for testing (and any associated network
169 cmd = 'ip netns delete $NS'
173 def has_blank_ids(idlist):
175 Search the list for empty ID fields and return true/false accordingly.
177 return not(all(k for k in idlist))
180 def load_from_file(filename):
182 Open the JSON file containing the test cases and return them as an
183 ordered dictionary object.
185 with open(filename) as test_data:
186 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
187 idlist = get_id_list(testlist)
188 if (has_blank_ids(idlist)):
190 k['filename'] = filename
196 Create the argument parser.
198 parser = argparse.ArgumentParser(description='Linux TC unit tests')
202 def set_args(parser):
204 Set the command line arguments for tdc.
206 parser.add_argument('-p', '--path', type=str,
207 help='The full path to the tc executable to use')
208 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
209 help='Run tests only from the specified category, or if no category is specified, list known categories.')
210 parser.add_argument('-f', '--file', type=str,
211 help='Run tests from the specified file')
212 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
213 help='List all test cases, or those only within the specified category')
214 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
215 help='Display the test case with specified id')
216 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
217 help='Execute the single test case with specified ID')
218 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
219 help='Generate ID numbers for new test cases')
220 parser.add_argument('-d', '--device',
221 help='Execute the test case in flower category')
225 def check_default_settings(args):
227 Process any arguments overriding the default settings, and ensure the
228 settings are correct.
230 # Allow for overriding specific settings
233 if args.path != None:
234 NAMES['TC'] = args.path
235 if args.device != None:
236 NAMES['DEV2'] = args.device
237 if not os.path.isfile(NAMES['TC']):
238 print("The specified tc path " + NAMES['TC'] + " does not exist.")
242 def get_id_list(alltests):
244 Generate a list of all IDs in the test cases.
246 return [x["id"] for x in alltests]
249 def check_case_id(alltests):
251 Check for duplicate test case IDs.
253 idl = get_id_list(alltests)
254 return [x for x in idl if idl.count(x) > 1]
257 def does_id_exist(alltests, newid):
259 Check if a given ID already exists in the list of test cases.
261 idl = get_id_list(alltests)
262 return (any(newid == x for x in idl))
265 def generate_case_ids(alltests):
267 If a test case has a blank ID field, generate a random hex ID for it
268 and then write the test cases back to disk.
274 newid = str('%04x' % random.randrange(16**4))
275 if (does_id_exist(alltests, newid)):
283 if ('filename' in c):
284 ufilename.append(c['filename'])
285 ufilename = get_unique_item(ufilename)
290 if t['filename'] == f:
293 outfile = open(f, "w")
294 json.dump(testlist, outfile, indent=4)
298 def get_test_cases(args):
300 If a test case file is specified, retrieve tests from that file.
301 Otherwise, glob for all json files in subdirectories and load from
305 if args.file != None:
306 if not os.path.isfile(args.file):
307 print("The specified test case file " + args.file + " does not exist.")
312 for root, dirnames, filenames in os.walk('tc-tests'):
313 for filename in fnmatch.filter(filenames, '*.json'):
314 flist.append(os.path.join(root, filename))
316 for casefile in flist:
317 alltests = alltests + (load_from_file(casefile))
321 def set_operation_mode(args):
323 Load the test case data and process remaining arguments to determine
324 what the script should do for this run, and call the appropriate
327 alltests = get_test_cases(args)
330 idlist = get_id_list(alltests)
331 if (has_blank_ids(idlist)):
332 alltests = generate_case_ids(alltests)
334 print("No empty ID fields found in test files.")
337 duplicate_ids = check_case_id(alltests)
338 if (len(duplicate_ids) > 0):
339 print("The following test case IDs are not unique:")
340 print(str(set(duplicate_ids)))
341 print("Please correct them before continuing.")
344 ucat = get_test_categories(alltests)
347 show_test_case_by_id(alltests, args.showID[0])
351 target_id = args.execute[0]
356 if (args.category == '+c'):
357 print("Available categories:")
361 target_category = args.category
366 testcases = get_categorized_testlist(alltests, ucat)
369 if (len(args.list) == 0):
370 list_test_cases(alltests)
372 elif(len(args.list > 0)):
373 if (args.list not in ucat):
374 print("Unknown category " + args.list)
375 print("Available categories:")
378 list_test_cases(testcases[args.list])
381 if (os.geteuid() != 0):
382 print("This script must be run with root privileges.\n")
387 if (len(target_category) == 0):
388 if (len(target_id) > 0):
389 alltests = list(filter(lambda x: target_id in x['id'], alltests))
390 if (len(alltests) == 0):
391 print("Cannot find a test case with ID matching " + target_id)
393 catresults = test_runner(alltests, args)
394 print("All test results: " + "\n\n" + catresults)
395 elif (len(target_category) > 0):
396 if (target_category == "flower") and args.device == None:
397 print("Please specify a NIC device (-d) to run category flower")
399 if (target_category not in ucat):
400 print("Specified category is not present in this file.")
403 catresults = test_runner(testcases[target_category], args)
404 print("Category " + target_category + "\n\n" + catresults)
411 Start of execution; set up argument parser and get the arguments,
412 and start operations.
414 parser = args_parse()
415 parser = set_args(parser)
416 (args, remaining) = parser.parse_known_args()
417 check_default_settings(args)
419 set_operation_mode(args)
424 if __name__ == "__main__":