4 tdc.py - Linux tc (Traffic Control) unit test driver
6 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
15 from collections import OrderedDict
16 from string import Template
18 from tdc_config import *
19 from tdc_helper import *
25 def replace_keywords(cmd):
27 For a given executable command, substitute any known
28 variables contained within NAMES with the correct values
31 subcmd = tcmd.safe_substitute(NAMES)
35 def exec_cmd(command, nsonly=True):
37 Perform any required modifications on an executable command, then run
38 it in a subprocess and return the results.
40 if (USE_NS and nsonly):
41 command = 'ip netns exec $NS ' + command
44 command = replace_keywords(command)
46 proc = subprocess.Popen(command,
48 stdout=subprocess.PIPE,
49 stderr=subprocess.PIPE)
50 (rawout, serr) = proc.communicate()
52 if proc.returncode != 0:
53 foutput = serr.decode("utf-8")
55 foutput = rawout.decode("utf-8")
62 def prepare_env(cmdlist):
64 Execute the setup/teardown commands for a test case. Optionally
65 terminate test execution if the command fails.
67 for cmdinfo in cmdlist:
68 if (type(cmdinfo) == list):
69 exit_codes = cmdinfo[1:]
78 (proc, foutput) = exec_cmd(cmd)
80 if proc.returncode not in exit_codes:
82 print("Could not execute:")
84 print("\nError message:")
86 print("\nAborting test run.")
91 def test_runner(filtered_tests):
93 Driver function for the unit tests.
95 Prints information about the tests being run, executes the setup and
96 teardown commands and the command under test itself. Also determines
97 success/failure based on the information in the test case and generates
98 TAP output accordingly.
100 testlist = filtered_tests
101 tcount = len(testlist)
103 tap = str(index) + ".." + str(tcount) + "\n"
105 for tidx in testlist:
108 print("Test " + tidx["id"] + ": " + tidx["name"])
109 prepare_env(tidx["setup"])
110 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
111 exit_code = p.returncode
113 if (exit_code != int(tidx["expExitCode"])):
115 print("exit:", exit_code, int(tidx["expExitCode"]))
118 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
119 (p, procout) = exec_cmd(tidx["verifyCmd"])
120 match_index = re.findall(match_pattern, procout)
121 if len(match_index) != int(tidx["matchCount"]):
128 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
133 prepare_env(tidx["teardown"])
141 Create the network namespace in which the tests will be run and set up
142 the required network devices for it.
145 cmd = 'ip netns add $NS'
147 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
149 cmd = 'ip link set $DEV1 netns $NS'
151 cmd = 'ip link set $DEV0 up'
153 cmd = 'ip -s $NS link set $DEV1 up'
159 Destroy the network namespace for testing (and any associated network
163 cmd = 'ip netns delete $NS'
167 def has_blank_ids(idlist):
169 Search the list for empty ID fields and return true/false accordingly.
171 return not(all(k for k in idlist))
174 def load_from_file(filename):
176 Open the JSON file containing the test cases and return them as an
177 ordered dictionary object.
179 with open(filename) as test_data:
180 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
181 idlist = get_id_list(testlist)
182 if (has_blank_ids(idlist)):
184 k['filename'] = filename
190 Create the argument parser.
192 parser = argparse.ArgumentParser(description='Linux TC unit tests')
196 def set_args(parser):
198 Set the command line arguments for tdc.
200 parser.add_argument('-p', '--path', type=str,
201 help='The full path to the tc executable to use')
202 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
203 help='Run tests only from the specified category, or if no category is specified, list known categories.')
204 parser.add_argument('-f', '--file', type=str,
205 help='Run tests from the specified file')
206 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
207 help='List all test cases, or those only within the specified category')
208 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
209 help='Display the test case with specified id')
210 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
211 help='Execute the single test case with specified ID')
212 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
213 help='Generate ID numbers for new test cases')
218 def check_default_settings(args):
220 Process any arguments overriding the default settings, and ensure the
221 settings are correct.
223 # Allow for overriding specific settings
226 if args.path != None:
227 NAMES['TC'] = args.path
228 if not os.path.isfile(NAMES['TC']):
229 print("The specified tc path " + NAMES['TC'] + " does not exist.")
233 def get_id_list(alltests):
235 Generate a list of all IDs in the test cases.
237 return [x["id"] for x in alltests]
240 def check_case_id(alltests):
242 Check for duplicate test case IDs.
244 idl = get_id_list(alltests)
245 return [x for x in idl if idl.count(x) > 1]
248 def does_id_exist(alltests, newid):
250 Check if a given ID already exists in the list of test cases.
252 idl = get_id_list(alltests)
253 return (any(newid == x for x in idl))
256 def generate_case_ids(alltests):
258 If a test case has a blank ID field, generate a random hex ID for it
259 and then write the test cases back to disk.
265 newid = str('%04x' % random.randrange(16**4))
266 if (does_id_exist(alltests, newid)):
274 if ('filename' in c):
275 ufilename.append(c['filename'])
276 ufilename = get_unique_item(ufilename)
281 if t['filename'] == f:
284 outfile = open(f, "w")
285 json.dump(testlist, outfile, indent=4)
289 def get_test_cases(args):
291 If a test case file is specified, retrieve tests from that file.
292 Otherwise, glob for all json files in subdirectories and load from
296 if args.file != None:
297 if not os.path.isfile(args.file):
298 print("The specified test case file " + args.file + " does not exist.")
303 for root, dirnames, filenames in os.walk('tc-tests'):
304 for filename in fnmatch.filter(filenames, '*.json'):
305 flist.append(os.path.join(root, filename))
307 for casefile in flist:
308 alltests = alltests + (load_from_file(casefile))
312 def set_operation_mode(args):
314 Load the test case data and process remaining arguments to determine
315 what the script should do for this run, and call the appropriate
318 alltests = get_test_cases(args)
321 idlist = get_id_list(alltests)
322 if (has_blank_ids(idlist)):
323 alltests = generate_case_ids(alltests)
325 print("No empty ID fields found in test files.")
328 duplicate_ids = check_case_id(alltests)
329 if (len(duplicate_ids) > 0):
330 print("The following test case IDs are not unique:")
331 print(str(set(duplicate_ids)))
332 print("Please correct them before continuing.")
335 ucat = get_test_categories(alltests)
338 show_test_case_by_id(alltests, args.showID[0])
342 target_id = args.execute[0]
347 if (args.category == '+c'):
348 print("Available categories:")
352 target_category = args.category
357 testcases = get_categorized_testlist(alltests, ucat)
360 if (len(args.list) == 0):
361 list_test_cases(alltests)
363 elif(len(args.list > 0)):
364 if (args.list not in ucat):
365 print("Unknown category " + args.list)
366 print("Available categories:")
369 list_test_cases(testcases[args.list])
372 if (os.geteuid() != 0):
373 print("This script must be run with root privileges.\n")
378 if (len(target_category) == 0):
379 if (len(target_id) > 0):
380 alltests = list(filter(lambda x: target_id in x['id'], alltests))
381 if (len(alltests) == 0):
382 print("Cannot find a test case with ID matching " + target_id)
384 catresults = test_runner(alltests)
385 print("All test results: " + "\n\n" + catresults)
386 elif (len(target_category) > 0):
387 if (target_category not in ucat):
388 print("Specified category is not present in this file.")
391 catresults = test_runner(testcases[target_category])
392 print("Category " + target_category + "\n\n" + catresults)
399 Start of execution; set up argument parser and get the arguments,
400 and start operations.
402 parser = args_parse()
403 parser = set_args(parser)
404 (args, remaining) = parser.parse_known_args()
405 check_default_settings(args)
407 set_operation_mode(args)
412 if __name__ == "__main__":