parse_iso8601,
read_batch_urls,
sanitize_filename,
+ sanitize_path,
shell_quote,
smuggle_url,
str_to_int,
xpath_with_ns,
render_table,
match_str,
+ url_sanitize_consecutive_slashes,
)
self.assertEqual(sanitize_filename('_BD_eEpuzXw', is_id=True), '_BD_eEpuzXw')
self.assertEqual(sanitize_filename('N0Y__7-UOdI', is_id=True), 'N0Y__7-UOdI')
+ def test_sanitize_path(self):
+ if sys.platform != 'win32':
+ return
+
+ self.assertEqual(sanitize_path('abc'), 'abc')
+ self.assertEqual(sanitize_path('abc/def'), 'abc\\def')
+ self.assertEqual(sanitize_path('abc\\def'), 'abc\\def')
+ self.assertEqual(sanitize_path('abc|def'), 'abc#def')
+ self.assertEqual(sanitize_path('<>:"|?*'), '#######')
+ self.assertEqual(sanitize_path('C:/abc/def'), 'C:\\abc\\def')
+ self.assertEqual(sanitize_path('C?:/abc/def'), 'C##\\abc\\def')
+
+ self.assertEqual(sanitize_path('\\\\?\\UNC\\ComputerName\\abc'), '\\\\?\\UNC\\ComputerName\\abc')
+ self.assertEqual(sanitize_path('\\\\?\\UNC/ComputerName/abc'), '\\\\?\\UNC\\ComputerName\\abc')
+
+ self.assertEqual(sanitize_path('\\\\?\\C:\\abc'), '\\\\?\\C:\\abc')
+ self.assertEqual(sanitize_path('\\\\?\\C:/abc'), '\\\\?\\C:\\abc')
+ self.assertEqual(sanitize_path('\\\\?\\C:\\ab?c\\de:f'), '\\\\?\\C:\\ab#c\\de#f')
+ self.assertEqual(sanitize_path('\\\\?\\C:\\abc'), '\\\\?\\C:\\abc')
+
+ self.assertEqual(
+ sanitize_path('youtube/%(uploader)s/%(autonumber)s-%(title)s-%(upload_date)s.%(ext)s'),
+ 'youtube\\%(uploader)s\\%(autonumber)s-%(title)s-%(upload_date)s.%(ext)s')
+
+ self.assertEqual(
+ sanitize_path('youtube/TheWreckingYard ./00001-Not bad, Especially for Free! (1987 Yamaha 700)-20141116.mp4.part'),
+ 'youtube\\TheWreckingYard #\\00001-Not bad, Especially for Free! (1987 Yamaha 700)-20141116.mp4.part')
+ self.assertEqual(sanitize_path('abc/def...'), 'abc\\def..#')
+ self.assertEqual(sanitize_path('abc.../def'), 'abc..#\\def')
+ self.assertEqual(sanitize_path('abc.../def...'), 'abc..#\\def..#')
+
+ self.assertEqual(sanitize_path('../abc'), '..\\abc')
+ self.assertEqual(sanitize_path('../../abc'), '..\\..\\abc')
+ self.assertEqual(sanitize_path('./abc'), 'abc')
+ self.assertEqual(sanitize_path('./../abc'), '..\\abc')
+
def test_ordered_set(self):
self.assertEqual(orderedSet([1, 1, 2, 3, 4, 4, 5, 6, 7, 3, 5]), [1, 2, 3, 4, 5, 6, 7])
self.assertEqual(orderedSet([]), [])
'like_count > 100 & dislike_count <? 50 & description',
{'like_count': 190, 'dislike_count': 10}))
+ def test_url_sanitize_consecutive_slashes(self):
+ self.assertEqual(url_sanitize_consecutive_slashes(
+ 'http://hostname/foo//bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(url_sanitize_consecutive_slashes(
+ 'http://hostname//foo/bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(url_sanitize_consecutive_slashes(
+ 'http://hostname//'), 'http://hostname/')
+ self.assertEqual(url_sanitize_consecutive_slashes(
+ 'http://hostname/foo/bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(url_sanitize_consecutive_slashes(
+ 'http://hostname/'), 'http://hostname/')
+
if __name__ == '__main__':
unittest.main()
raise
# In case of error, try to remove win32 forbidden chars
- alt_filename = os.path.join(
- re.sub('[/<>:"\\|\\\\?\\*]', '#', path_part)
- for path_part in os.path.split(filename)
- )
+ alt_filename = sanitize_path(filename)
if alt_filename == filename:
raise
else:
# An exception here should be caught in the caller
- stream = open(encodeFilename(filename), open_mode)
+ stream = open(encodeFilename(alt_filename), open_mode)
return (stream, alt_filename)
return result
+def sanitize_path(s):
+ """Sanitizes and normalizes path on Windows"""
+ if sys.platform != 'win32':
+ return s
+ drive, _ = os.path.splitdrive(s)
+ unc, _ = os.path.splitunc(s)
+ unc_or_drive = unc or drive
+ norm_path = os.path.normpath(remove_start(s, unc_or_drive)).split(os.path.sep)
+ if unc_or_drive:
+ norm_path.pop(0)
+ sanitized_path = [
+ path_part if path_part in ['.', '..'] else re.sub('(?:[/<>:"\\|\\\\?\\*]|\.$)', '#', path_part)
+ for path_part in norm_path]
+ if unc_or_drive:
+ sanitized_path.insert(0, unc_or_drive + os.path.sep)
+ return os.path.join(*sanitized_path)
+
+
def orderedSet(iterable):
""" Remove all duplicates from the input iterable """
res = []
return None # No Proxy
return compat_urllib_request.ProxyHandler.proxy_open(
self, req, proxy, type)
+
+
+ def url_sanitize_consecutive_slashes(url):
+ """Sanitize URLs with consecutive slashes
+
+ For example, transform both
+ http://hostname/foo//bar/filename.html
+ and
+ http://hostname//foo/bar/filename.html
+ into
+ http://hostname/foo/bar/filename.html
+ """
+ parsed_url = list(compat_urlparse.urlparse(url))
+ parsed_url[2] = re.sub(r'/{2,}', '/', parsed_url[2])
+ return compat_urlparse.urlunparse(parsed_url)