]> asedeno.scripts.mit.edu Git - linux.git/blob - tools/testing/selftests/tc-testing/tdc.py
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[linux.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2
3 """
4 tdc.py - Linux tc (Traffic Control) unit test driver
5
6 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7 """
8
9 import re
10 import os
11 import sys
12 import argparse
13 import json
14 import subprocess
15 from collections import OrderedDict
16 from string import Template
17
18 from tdc_config import *
19 from tdc_helper import *
20
21
22 USE_NS = True
23
24
25 def replace_keywords(cmd):
26     """
27     For a given executable command, substitute any known
28     variables contained within NAMES with the correct values
29     """
30     tcmd = Template(cmd)
31     subcmd = tcmd.safe_substitute(NAMES)
32     return subcmd
33
34
35 def exec_cmd(command, nsonly=True):
36     """
37     Perform any required modifications on an executable command, then run
38     it in a subprocess and return the results.
39     """
40     if (USE_NS and nsonly):
41         command = 'ip netns exec $NS ' + command
42
43     if '$' in command:
44         command = replace_keywords(command)
45
46     proc = subprocess.Popen(command,
47         shell=True,
48         stdout=subprocess.PIPE,
49         stderr=subprocess.PIPE)
50     (rawout, serr) = proc.communicate()
51
52     if proc.returncode != 0:
53         foutput = serr.decode("utf-8")
54     else:
55         foutput = rawout.decode("utf-8")
56
57     proc.stdout.close()
58     proc.stderr.close()
59     return proc, foutput
60
61
62 def prepare_env(cmdlist):
63     """
64     Execute the setup/teardown commands for a test case. Optionally
65     terminate test execution if the command fails.
66     """
67     for cmdinfo in cmdlist:
68         if (type(cmdinfo) == list):
69             exit_codes = cmdinfo[1:]
70             cmd = cmdinfo[0]
71         else:
72             exit_codes = [0]
73             cmd = cmdinfo
74
75         if (len(cmd) == 0):
76             continue
77
78         (proc, foutput) = exec_cmd(cmd)
79
80         if proc.returncode not in exit_codes:
81             print
82             print("Could not execute:")
83             print(cmd)
84             print("\nError message:")
85             print(foutput)
86             print("\nAborting test run.")
87             ns_destroy()
88             exit(1)
89
90
91 def test_runner(filtered_tests, args):
92     """
93     Driver function for the unit tests.
94
95     Prints information about the tests being run, executes the setup and
96     teardown commands and the command under test itself. Also determines
97     success/failure based on the information in the test case and generates
98     TAP output accordingly.
99     """
100     testlist = filtered_tests
101     tcount = len(testlist)
102     index = 1
103     tap = str(index) + ".." + str(tcount) + "\n"
104
105     for tidx in testlist:
106         result = True
107         tresult = ""
108         if "flower" in tidx["category"] and args.device == None:
109             continue
110         print("Test " + tidx["id"] + ": " + tidx["name"])
111         prepare_env(tidx["setup"])
112         (p, procout) = exec_cmd(tidx["cmdUnderTest"])
113         exit_code = p.returncode
114
115         if (exit_code != int(tidx["expExitCode"])):
116             result = False
117             print("exit:", exit_code, int(tidx["expExitCode"]))
118             print(procout)
119         else:
120             match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
121             (p, procout) = exec_cmd(tidx["verifyCmd"])
122             match_index = re.findall(match_pattern, procout)
123             if len(match_index) != int(tidx["matchCount"]):
124                 result = False
125
126         if result == True:
127             tresult += "ok "
128         else:
129             tresult += "not ok "
130         tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
131
132         if result == False:
133             tap += procout
134
135         prepare_env(tidx["teardown"])
136         index += 1
137
138     return tap
139
140
141 def ns_create():
142     """
143     Create the network namespace in which the tests will be run and set up
144     the required network devices for it.
145     """
146     if (USE_NS):
147         cmd = 'ip netns add $NS'
148         exec_cmd(cmd, False)
149         cmd = 'ip link add $DEV0 type veth peer name $DEV1'
150         exec_cmd(cmd, False)
151         cmd = 'ip link set $DEV1 netns $NS'
152         exec_cmd(cmd, False)
153         cmd = 'ip link set $DEV0 up'
154         exec_cmd(cmd, False)
155         cmd = 'ip -s $NS link set $DEV1 up'
156         exec_cmd(cmd, False)
157         cmd = 'ip link set $DEV2 netns $NS'
158         exec_cmd(cmd, False)
159         cmd = 'ip -s $NS link set $DEV2 up'
160         exec_cmd(cmd, False)
161
162
163 def ns_destroy():
164     """
165     Destroy the network namespace for testing (and any associated network
166     devices as well)
167     """
168     if (USE_NS):
169         cmd = 'ip netns delete $NS'
170         exec_cmd(cmd, False)
171
172
173 def has_blank_ids(idlist):
174     """
175     Search the list for empty ID fields and return true/false accordingly.
176     """
177     return not(all(k for k in idlist))
178
179
180 def load_from_file(filename):
181     """
182     Open the JSON file containing the test cases and return them as an
183     ordered dictionary object.
184     """
185     with open(filename) as test_data:
186         testlist = json.load(test_data, object_pairs_hook=OrderedDict)
187     idlist = get_id_list(testlist)
188     if (has_blank_ids(idlist)):
189         for k in testlist:
190             k['filename'] = filename
191     return testlist
192
193
194 def args_parse():
195     """
196     Create the argument parser.
197     """
198     parser = argparse.ArgumentParser(description='Linux TC unit tests')
199     return parser
200
201
202 def set_args(parser):
203     """
204     Set the command line arguments for tdc.
205     """
206     parser.add_argument('-p', '--path', type=str,
207                         help='The full path to the tc executable to use')
208     parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
209                         help='Run tests only from the specified category, or if no category is specified, list known categories.')
210     parser.add_argument('-f', '--file', type=str,
211                         help='Run tests from the specified file')
212     parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
213                         help='List all test cases, or those only within the specified category')
214     parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
215                         help='Display the test case with specified id')
216     parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
217                         help='Execute the single test case with specified ID')
218     parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
219                         help='Generate ID numbers for new test cases')
220     parser.add_argument('-d', '--device',
221                         help='Execute the test case in flower category')
222     return parser
223
224
225 def check_default_settings(args):
226     """
227     Process any arguments overriding the default settings, and ensure the
228     settings are correct.
229     """
230     # Allow for overriding specific settings
231     global NAMES
232
233     if args.path != None:
234          NAMES['TC'] = args.path
235     if args.device != None:
236          NAMES['DEV2'] = args.device
237     if not os.path.isfile(NAMES['TC']):
238         print("The specified tc path " + NAMES['TC'] + " does not exist.")
239         exit(1)
240
241
242 def get_id_list(alltests):
243     """
244     Generate a list of all IDs in the test cases.
245     """
246     return [x["id"] for x in alltests]
247
248
249 def check_case_id(alltests):
250     """
251     Check for duplicate test case IDs.
252     """
253     idl = get_id_list(alltests)
254     return [x for x in idl if idl.count(x) > 1]
255
256
257 def does_id_exist(alltests, newid):
258     """
259     Check if a given ID already exists in the list of test cases.
260     """
261     idl = get_id_list(alltests)
262     return (any(newid == x for x in idl))
263
264
265 def generate_case_ids(alltests):
266     """
267     If a test case has a blank ID field, generate a random hex ID for it
268     and then write the test cases back to disk.
269     """
270     import random
271     for c in alltests:
272         if (c["id"] == ""):
273             while True:
274                 newid = str('%04x' % random.randrange(16**4))
275                 if (does_id_exist(alltests, newid)):
276                     continue
277                 else:
278                     c['id'] = newid
279                     break
280
281     ufilename = []
282     for c in alltests:
283         if ('filename' in c):
284             ufilename.append(c['filename'])
285     ufilename = get_unique_item(ufilename)
286     for f in ufilename:
287         testlist = []
288         for t in alltests:
289             if 'filename' in t:
290                 if t['filename'] == f:
291                     del t['filename']
292                     testlist.append(t)
293         outfile = open(f, "w")
294         json.dump(testlist, outfile, indent=4)
295         outfile.close()
296
297
298 def get_test_cases(args):
299     """
300     If a test case file is specified, retrieve tests from that file.
301     Otherwise, glob for all json files in subdirectories and load from
302     each one.
303     """
304     import fnmatch
305     if args.file != None:
306         if not os.path.isfile(args.file):
307             print("The specified test case file " + args.file + " does not exist.")
308             exit(1)
309         flist = [args.file]
310     else:
311         flist = []
312         for root, dirnames, filenames in os.walk('tc-tests'):
313             for filename in fnmatch.filter(filenames, '*.json'):
314                 flist.append(os.path.join(root, filename))
315     alltests = list()
316     for casefile in flist:
317         alltests = alltests + (load_from_file(casefile))
318     return alltests
319
320
321 def set_operation_mode(args):
322     """
323     Load the test case data and process remaining arguments to determine
324     what the script should do for this run, and call the appropriate
325     function.
326     """
327     alltests = get_test_cases(args)
328
329     if args.gen_id:
330         idlist = get_id_list(alltests)
331         if (has_blank_ids(idlist)):
332             alltests = generate_case_ids(alltests)
333         else:
334             print("No empty ID fields found in test files.")
335         exit(0)
336
337     duplicate_ids = check_case_id(alltests)
338     if (len(duplicate_ids) > 0):
339         print("The following test case IDs are not unique:")
340         print(str(set(duplicate_ids)))
341         print("Please correct them before continuing.")
342         exit(1)
343
344     ucat = get_test_categories(alltests)
345
346     if args.showID:
347         show_test_case_by_id(alltests, args.showID[0])
348         exit(0)
349
350     if args.execute:
351         target_id = args.execute[0]
352     else:
353         target_id = ""
354
355     if args.category:
356         if (args.category == '+c'):
357             print("Available categories:")
358             print_sll(ucat)
359             exit(0)
360         else:
361             target_category = args.category
362     else:
363         target_category = ""
364
365
366     testcases = get_categorized_testlist(alltests, ucat)
367
368     if args.list:
369         if (len(args.list) == 0):
370             list_test_cases(alltests)
371             exit(0)
372         elif(len(args.list > 0)):
373             if (args.list not in ucat):
374                 print("Unknown category " + args.list)
375                 print("Available categories:")
376                 print_sll(ucat)
377                 exit(1)
378             list_test_cases(testcases[args.list])
379             exit(0)
380
381     if (os.geteuid() != 0):
382         print("This script must be run with root privileges.\n")
383         exit(1)
384
385     ns_create()
386
387     if (len(target_category) == 0):
388         if (len(target_id) > 0):
389             alltests = list(filter(lambda x: target_id in x['id'], alltests))
390             if (len(alltests) == 0):
391                 print("Cannot find a test case with ID matching " + target_id)
392                 exit(1)
393         catresults = test_runner(alltests, args)
394         print("All test results: " + "\n\n" + catresults)
395     elif (len(target_category) > 0):
396         if (target_category == "flower") and args.device == None:
397             print("Please specify a NIC device (-d) to run category flower")
398             exit(1)
399         if (target_category not in ucat):
400             print("Specified category is not present in this file.")
401             exit(1)
402         else:
403             catresults = test_runner(testcases[target_category], args)
404             print("Category " + target_category + "\n\n" + catresults)
405
406     ns_destroy()
407
408
409 def main():
410     """
411     Start of execution; set up argument parser and get the arguments,
412     and start operations.
413     """
414     parser = args_parse()
415     parser = set_args(parser)
416     (args, remaining) = parser.parse_known_args()
417     check_default_settings(args)
418
419     set_operation_mode(args)
420
421     exit(0)
422
423
424 if __name__ == "__main__":
425     main()