Convert all tabs to 4 spaces (PEP8)

This commit is contained in:
Philipp Hagemeister 2012-11-28 02:04:46 +01:00
parent 40b35b4aa6
commit 59ae15a507
9 changed files with 4841 additions and 4841 deletions

View File

@ -13,186 +13,186 @@ from youtube_dl.InfoExtractors import CollegeHumorIE, XNXXIE
class DownloadTest(unittest.TestCase): class DownloadTest(unittest.TestCase):
PARAMETERS_FILE = "test/parameters.json" PARAMETERS_FILE = "test/parameters.json"
#calculated with md5sum: #calculated with md5sum:
#md5sum (GNU coreutils) 8.19 #md5sum (GNU coreutils) 8.19
YOUTUBE_SIZE = 1993883 YOUTUBE_SIZE = 1993883
YOUTUBE_URL = "http://www.youtube.com/watch?v=BaW_jenozKc" YOUTUBE_URL = "http://www.youtube.com/watch?v=BaW_jenozKc"
YOUTUBE_FILE = "BaW_jenozKc.mp4" YOUTUBE_FILE = "BaW_jenozKc.mp4"
DAILYMOTION_MD5 = "d363a50e9eb4f22ce90d08d15695bb47" DAILYMOTION_MD5 = "d363a50e9eb4f22ce90d08d15695bb47"
DAILYMOTION_URL = "http://www.dailymotion.com/video/x33vw9_tutoriel-de-youtubeur-dl-des-video_tech" DAILYMOTION_URL = "http://www.dailymotion.com/video/x33vw9_tutoriel-de-youtubeur-dl-des-video_tech"
DAILYMOTION_FILE = "x33vw9.mp4" DAILYMOTION_FILE = "x33vw9.mp4"
METACAFE_SIZE = 5754305 METACAFE_SIZE = 5754305
METACAFE_URL = "http://www.metacafe.com/watch/yt-_aUehQsCQtM/the_electric_company_short_i_pbs_kids_go/" METACAFE_URL = "http://www.metacafe.com/watch/yt-_aUehQsCQtM/the_electric_company_short_i_pbs_kids_go/"
METACAFE_FILE = "_aUehQsCQtM.flv" METACAFE_FILE = "_aUehQsCQtM.flv"
BLIP_MD5 = "93c24d2f4e0782af13b8a7606ea97ba7" BLIP_MD5 = "93c24d2f4e0782af13b8a7606ea97ba7"
BLIP_URL = "http://blip.tv/cbr/cbr-exclusive-gotham-city-imposters-bats-vs-jokerz-short-3-5796352" BLIP_URL = "http://blip.tv/cbr/cbr-exclusive-gotham-city-imposters-bats-vs-jokerz-short-3-5796352"
BLIP_FILE = "5779306.m4v" BLIP_FILE = "5779306.m4v"
XVIDEO_MD5 = "1ab4dedc01f771cb2a65e91caa801aaf" XVIDEO_MD5 = "1ab4dedc01f771cb2a65e91caa801aaf"
XVIDEO_URL = "http://www.xvideos.com/video939581/funny_porns_by_s_-1" XVIDEO_URL = "http://www.xvideos.com/video939581/funny_porns_by_s_-1"
XVIDEO_FILE = "939581.flv" XVIDEO_FILE = "939581.flv"
VIMEO_MD5 = "1ab4dedc01f771cb2a65e91caa801aaf" VIMEO_MD5 = "1ab4dedc01f771cb2a65e91caa801aaf"
VIMEO_URL = "http://vimeo.com/14160053" VIMEO_URL = "http://vimeo.com/14160053"
VIMEO_FILE = "" VIMEO_FILE = ""
VIMEO2_MD5 = "" VIMEO2_MD5 = ""
VIMEO2_URL = "http://player.vimeo.com/video/47019590" VIMEO2_URL = "http://player.vimeo.com/video/47019590"
VIMEO2_FILE = "" VIMEO2_FILE = ""
SOUNDCLOUD_MD5 = "ce3775768ebb6432fa8495d446a078ed" SOUNDCLOUD_MD5 = "ce3775768ebb6432fa8495d446a078ed"
SOUNDCLOUD_URL = "http://soundcloud.com/ethmusic/lostin-powers-she-so-heavy" SOUNDCLOUD_URL = "http://soundcloud.com/ethmusic/lostin-powers-she-so-heavy"
SOUNDCLOUD_FILE = "n6FLbx6ZzMiu.mp3" SOUNDCLOUD_FILE = "n6FLbx6ZzMiu.mp3"
STANDFORD_MD5 = "22c8206291368c4e2c9c1a307f0ea0f4" STANDFORD_MD5 = "22c8206291368c4e2c9c1a307f0ea0f4"
STANDFORD_URL = "http://openclassroom.stanford.edu/MainFolder/VideoPage.php?course=PracticalUnix&video=intro-environment&speed=100" STANDFORD_URL = "http://openclassroom.stanford.edu/MainFolder/VideoPage.php?course=PracticalUnix&video=intro-environment&speed=100"
STANDFORD_FILE = "PracticalUnix_intro-environment.mp4" STANDFORD_FILE = "PracticalUnix_intro-environment.mp4"
COLLEGEHUMOR_MD5 = "" COLLEGEHUMOR_MD5 = ""
COLLEGEHUMOR_URL = "http://www.collegehumor.com/video/6830834/mitt-romney-style-gangnam-style-parody" COLLEGEHUMOR_URL = "http://www.collegehumor.com/video/6830834/mitt-romney-style-gangnam-style-parody"
COLLEGEHUMOR_FILE = "" COLLEGEHUMOR_FILE = ""
XNXX_MD5 = "5f0469c8d1dfd1bc38c8e6deb5e0a21d" XNXX_MD5 = "5f0469c8d1dfd1bc38c8e6deb5e0a21d"
XNXX_URL = "http://video.xnxx.com/video1135332/lida_naked_funny_actress_5_" XNXX_URL = "http://video.xnxx.com/video1135332/lida_naked_funny_actress_5_"
XNXX_FILE = "1135332.flv" XNXX_FILE = "1135332.flv"
def test_youtube(self): def test_youtube(self):
#let's download a file from youtube #let's download a file from youtube
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(YoutubeIE()) fd.add_info_extractor(YoutubeIE())
fd.download([DownloadTest.YOUTUBE_URL]) fd.download([DownloadTest.YOUTUBE_URL])
self.assertTrue(os.path.exists(DownloadTest.YOUTUBE_FILE)) self.assertTrue(os.path.exists(DownloadTest.YOUTUBE_FILE))
self.assertEqual(os.path.getsize(DownloadTest.YOUTUBE_FILE), DownloadTest.YOUTUBE_SIZE) self.assertEqual(os.path.getsize(DownloadTest.YOUTUBE_FILE), DownloadTest.YOUTUBE_SIZE)
def test_dailymotion(self): def test_dailymotion(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(DailymotionIE()) fd.add_info_extractor(DailymotionIE())
fd.download([DownloadTest.DAILYMOTION_URL]) fd.download([DownloadTest.DAILYMOTION_URL])
self.assertTrue(os.path.exists(DownloadTest.DAILYMOTION_FILE)) self.assertTrue(os.path.exists(DownloadTest.DAILYMOTION_FILE))
md5_down_file = md5_for_file(DownloadTest.DAILYMOTION_FILE) md5_down_file = md5_for_file(DownloadTest.DAILYMOTION_FILE)
self.assertEqual(md5_down_file, DownloadTest.DAILYMOTION_MD5) self.assertEqual(md5_down_file, DownloadTest.DAILYMOTION_MD5)
def test_metacafe(self): def test_metacafe(self):
#this emulate a skip,to be 2.6 compatible #this emulate a skip,to be 2.6 compatible
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(MetacafeIE()) fd.add_info_extractor(MetacafeIE())
fd.add_info_extractor(YoutubeIE()) fd.add_info_extractor(YoutubeIE())
fd.download([DownloadTest.METACAFE_URL]) fd.download([DownloadTest.METACAFE_URL])
self.assertTrue(os.path.exists(DownloadTest.METACAFE_FILE)) self.assertTrue(os.path.exists(DownloadTest.METACAFE_FILE))
self.assertEqual(os.path.getsize(DownloadTest.METACAFE_FILE), DownloadTest.METACAFE_SIZE) self.assertEqual(os.path.getsize(DownloadTest.METACAFE_FILE), DownloadTest.METACAFE_SIZE)
def test_blip(self): def test_blip(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(BlipTVIE()) fd.add_info_extractor(BlipTVIE())
fd.download([DownloadTest.BLIP_URL]) fd.download([DownloadTest.BLIP_URL])
self.assertTrue(os.path.exists(DownloadTest.BLIP_FILE)) self.assertTrue(os.path.exists(DownloadTest.BLIP_FILE))
md5_down_file = md5_for_file(DownloadTest.BLIP_FILE) md5_down_file = md5_for_file(DownloadTest.BLIP_FILE)
self.assertEqual(md5_down_file, DownloadTest.BLIP_MD5) self.assertEqual(md5_down_file, DownloadTest.BLIP_MD5)
def test_xvideo(self): def test_xvideo(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(XVideosIE()) fd.add_info_extractor(XVideosIE())
fd.download([DownloadTest.XVIDEO_URL]) fd.download([DownloadTest.XVIDEO_URL])
self.assertTrue(os.path.exists(DownloadTest.XVIDEO_FILE)) self.assertTrue(os.path.exists(DownloadTest.XVIDEO_FILE))
md5_down_file = md5_for_file(DownloadTest.XVIDEO_FILE) md5_down_file = md5_for_file(DownloadTest.XVIDEO_FILE)
self.assertEqual(md5_down_file, DownloadTest.XVIDEO_MD5) self.assertEqual(md5_down_file, DownloadTest.XVIDEO_MD5)
def test_vimeo(self): def test_vimeo(self):
#skipped for the moment produce an error #skipped for the moment produce an error
return return
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(VimeoIE()) fd.add_info_extractor(VimeoIE())
fd.download([DownloadTest.VIMEO_URL]) fd.download([DownloadTest.VIMEO_URL])
self.assertTrue(os.path.exists(DownloadTest.VIMEO_FILE)) self.assertTrue(os.path.exists(DownloadTest.VIMEO_FILE))
md5_down_file = md5_for_file(DownloadTest.VIMEO_FILE) md5_down_file = md5_for_file(DownloadTest.VIMEO_FILE)
self.assertEqual(md5_down_file, DownloadTest.VIMEO_MD5) self.assertEqual(md5_down_file, DownloadTest.VIMEO_MD5)
def test_vimeo2(self): def test_vimeo2(self):
#skipped for the moment produce an error #skipped for the moment produce an error
return return
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(VimeoIE()) fd.add_info_extractor(VimeoIE())
fd.download([DownloadTest.VIMEO2_URL]) fd.download([DownloadTest.VIMEO2_URL])
self.assertTrue(os.path.exists(DownloadTest.VIMEO2_FILE)) self.assertTrue(os.path.exists(DownloadTest.VIMEO2_FILE))
md5_down_file = md5_for_file(DownloadTest.VIMEO2_FILE) md5_down_file = md5_for_file(DownloadTest.VIMEO2_FILE)
self.assertEqual(md5_down_file, DownloadTest.VIMEO2_MD5) self.assertEqual(md5_down_file, DownloadTest.VIMEO2_MD5)
def test_soundcloud(self): def test_soundcloud(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(SoundcloudIE()) fd.add_info_extractor(SoundcloudIE())
fd.download([DownloadTest.SOUNDCLOUD_URL]) fd.download([DownloadTest.SOUNDCLOUD_URL])
self.assertTrue(os.path.exists(DownloadTest.SOUNDCLOUD_FILE)) self.assertTrue(os.path.exists(DownloadTest.SOUNDCLOUD_FILE))
md5_down_file = md5_for_file(DownloadTest.SOUNDCLOUD_FILE) md5_down_file = md5_for_file(DownloadTest.SOUNDCLOUD_FILE)
self.assertEqual(md5_down_file, DownloadTest.SOUNDCLOUD_MD5) self.assertEqual(md5_down_file, DownloadTest.SOUNDCLOUD_MD5)
def test_standford(self): def test_standford(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(StanfordOpenClassroomIE()) fd.add_info_extractor(StanfordOpenClassroomIE())
fd.download([DownloadTest.STANDFORD_URL]) fd.download([DownloadTest.STANDFORD_URL])
self.assertTrue(os.path.exists(DownloadTest.STANDFORD_FILE)) self.assertTrue(os.path.exists(DownloadTest.STANDFORD_FILE))
md5_down_file = md5_for_file(DownloadTest.STANDFORD_FILE) md5_down_file = md5_for_file(DownloadTest.STANDFORD_FILE)
self.assertEqual(md5_down_file, DownloadTest.STANDFORD_MD5) self.assertEqual(md5_down_file, DownloadTest.STANDFORD_MD5)
def test_collegehumor(self): def test_collegehumor(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(CollegeHumorIE()) fd.add_info_extractor(CollegeHumorIE())
fd.download([DownloadTest.COLLEGEHUMOR_URL]) fd.download([DownloadTest.COLLEGEHUMOR_URL])
self.assertTrue(os.path.exists(DownloadTest.COLLEGEHUMOR_FILE)) self.assertTrue(os.path.exists(DownloadTest.COLLEGEHUMOR_FILE))
md5_down_file = md5_for_file(DownloadTest.COLLEGEHUMOR_FILE) md5_down_file = md5_for_file(DownloadTest.COLLEGEHUMOR_FILE)
self.assertEqual(md5_down_file, DownloadTest.COLLEGEHUMOR_MD5) self.assertEqual(md5_down_file, DownloadTest.COLLEGEHUMOR_MD5)
def test_xnxx(self): def test_xnxx(self):
with open(DownloadTest.PARAMETERS_FILE) as f: with open(DownloadTest.PARAMETERS_FILE) as f:
fd = FileDownloader(json.load(f)) fd = FileDownloader(json.load(f))
fd.add_info_extractor(XNXXIE()) fd.add_info_extractor(XNXXIE())
fd.download([DownloadTest.XNXX_URL]) fd.download([DownloadTest.XNXX_URL])
self.assertTrue(os.path.exists(DownloadTest.XNXX_FILE)) self.assertTrue(os.path.exists(DownloadTest.XNXX_FILE))
md5_down_file = md5_for_file(DownloadTest.XNXX_FILE) md5_down_file = md5_for_file(DownloadTest.XNXX_FILE)
self.assertEqual(md5_down_file, DownloadTest.XNXX_MD5) self.assertEqual(md5_down_file, DownloadTest.XNXX_MD5)
def tearDown(self): def tearDown(self):
if os.path.exists(DownloadTest.YOUTUBE_FILE): if os.path.exists(DownloadTest.YOUTUBE_FILE):
os.remove(DownloadTest.YOUTUBE_FILE) os.remove(DownloadTest.YOUTUBE_FILE)
if os.path.exists(DownloadTest.DAILYMOTION_FILE): if os.path.exists(DownloadTest.DAILYMOTION_FILE):
os.remove(DownloadTest.DAILYMOTION_FILE) os.remove(DownloadTest.DAILYMOTION_FILE)
if os.path.exists(DownloadTest.METACAFE_FILE): if os.path.exists(DownloadTest.METACAFE_FILE):
os.remove(DownloadTest.METACAFE_FILE) os.remove(DownloadTest.METACAFE_FILE)
if os.path.exists(DownloadTest.BLIP_FILE): if os.path.exists(DownloadTest.BLIP_FILE):
os.remove(DownloadTest.BLIP_FILE) os.remove(DownloadTest.BLIP_FILE)
if os.path.exists(DownloadTest.XVIDEO_FILE): if os.path.exists(DownloadTest.XVIDEO_FILE):
os.remove(DownloadTest.XVIDEO_FILE) os.remove(DownloadTest.XVIDEO_FILE)
if os.path.exists(DownloadTest.VIMEO_FILE): if os.path.exists(DownloadTest.VIMEO_FILE):
os.remove(DownloadTest.VIMEO_FILE) os.remove(DownloadTest.VIMEO_FILE)
if os.path.exists(DownloadTest.SOUNDCLOUD_FILE): if os.path.exists(DownloadTest.SOUNDCLOUD_FILE):
os.remove(DownloadTest.SOUNDCLOUD_FILE) os.remove(DownloadTest.SOUNDCLOUD_FILE)
if os.path.exists(DownloadTest.STANDFORD_FILE): if os.path.exists(DownloadTest.STANDFORD_FILE):
os.remove(DownloadTest.STANDFORD_FILE) os.remove(DownloadTest.STANDFORD_FILE)
if os.path.exists(DownloadTest.COLLEGEHUMOR_FILE): if os.path.exists(DownloadTest.COLLEGEHUMOR_FILE):
os.remove(DownloadTest.COLLEGEHUMOR_FILE) os.remove(DownloadTest.COLLEGEHUMOR_FILE)
if os.path.exists(DownloadTest.XNXX_FILE): if os.path.exists(DownloadTest.XNXX_FILE):
os.remove(DownloadTest.XNXX_FILE) os.remove(DownloadTest.XNXX_FILE)
def md5_for_file(filename, block_size=2**20): def md5_for_file(filename, block_size=2**20):
with open(filename) as f: with open(filename) as f:
md5 = hashlib.md5() md5 = hashlib.md5()
while True: while True:
data = f.read(block_size) data = f.read(block_size)
if not data: if not data:
break break
md5.update(data) md5.update(data)
return md5.hexdigest() return md5.hexdigest()

View File

@ -5,9 +5,9 @@ import os.path
import subprocess import subprocess
class TestImport(unittest.TestCase): class TestImport(unittest.TestCase):
def test_import(self): def test_import(self):
rootDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) rootDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
subprocess.check_call([sys.executable, '-c', 'import youtube_dl'], cwd=rootDir) subprocess.check_call([sys.executable, '-c', 'import youtube_dl'], cwd=rootDir)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -14,79 +14,79 @@ from youtube_dl.utils import unescapeHTML
from youtube_dl.utils import orderedSet from youtube_dl.utils import orderedSet
if sys.version_info < (3,0): if sys.version_info < (3,0):
_compat_str = lambda b: b.decode('unicode-escape') _compat_str = lambda b: b.decode('unicode-escape')
else: else:
_compat_str = lambda s: s _compat_str = lambda s: s
class TestUtil(unittest.TestCase): class TestUtil(unittest.TestCase):
def test_timeconvert(self): def test_timeconvert(self):
self.assertTrue(timeconvert('') is None) self.assertTrue(timeconvert('') is None)
self.assertTrue(timeconvert('bougrg') is None) self.assertTrue(timeconvert('bougrg') is None)
def test_sanitize_filename(self): def test_sanitize_filename(self):
self.assertEqual(sanitize_filename('abc'), 'abc') self.assertEqual(sanitize_filename('abc'), 'abc')
self.assertEqual(sanitize_filename('abc_d-e'), 'abc_d-e') self.assertEqual(sanitize_filename('abc_d-e'), 'abc_d-e')
self.assertEqual(sanitize_filename('123'), '123') self.assertEqual(sanitize_filename('123'), '123')
self.assertEqual('abc_de', sanitize_filename('abc/de')) self.assertEqual('abc_de', sanitize_filename('abc/de'))
self.assertFalse('/' in sanitize_filename('abc/de///')) self.assertFalse('/' in sanitize_filename('abc/de///'))
self.assertEqual('abc_de', sanitize_filename('abc/<>\\*|de')) self.assertEqual('abc_de', sanitize_filename('abc/<>\\*|de'))
self.assertEqual('xxx', sanitize_filename('xxx/<>\\*|')) self.assertEqual('xxx', sanitize_filename('xxx/<>\\*|'))
self.assertEqual('yes no', sanitize_filename('yes? no')) self.assertEqual('yes no', sanitize_filename('yes? no'))
self.assertEqual('this - that', sanitize_filename('this: that')) self.assertEqual('this - that', sanitize_filename('this: that'))
self.assertEqual(sanitize_filename('AT&T'), 'AT&T') self.assertEqual(sanitize_filename('AT&T'), 'AT&T')
aumlaut = _compat_str('\xe4') aumlaut = _compat_str('\xe4')
self.assertEqual(sanitize_filename(aumlaut), aumlaut) self.assertEqual(sanitize_filename(aumlaut), aumlaut)
tests = _compat_str('\u043a\u0438\u0440\u0438\u043b\u043b\u0438\u0446\u0430') tests = _compat_str('\u043a\u0438\u0440\u0438\u043b\u043b\u0438\u0446\u0430')
self.assertEqual(sanitize_filename(tests), tests) self.assertEqual(sanitize_filename(tests), tests)
forbidden = '"\0\\/' forbidden = '"\0\\/'
for fc in forbidden: for fc in forbidden:
for fbc in forbidden: for fbc in forbidden:
self.assertTrue(fbc not in sanitize_filename(fc)) self.assertTrue(fbc not in sanitize_filename(fc))
def test_sanitize_filename_restricted(self): def test_sanitize_filename_restricted(self):
self.assertEqual(sanitize_filename('abc', restricted=True), 'abc') self.assertEqual(sanitize_filename('abc', restricted=True), 'abc')
self.assertEqual(sanitize_filename('abc_d-e', restricted=True), 'abc_d-e') self.assertEqual(sanitize_filename('abc_d-e', restricted=True), 'abc_d-e')
self.assertEqual(sanitize_filename('123', restricted=True), '123') self.assertEqual(sanitize_filename('123', restricted=True), '123')
self.assertEqual('abc_de', sanitize_filename('abc/de', restricted=True)) self.assertEqual('abc_de', sanitize_filename('abc/de', restricted=True))
self.assertFalse('/' in sanitize_filename('abc/de///', restricted=True)) self.assertFalse('/' in sanitize_filename('abc/de///', restricted=True))
self.assertEqual('abc_de', sanitize_filename('abc/<>\\*|de', restricted=True)) self.assertEqual('abc_de', sanitize_filename('abc/<>\\*|de', restricted=True))
self.assertEqual('xxx', sanitize_filename('xxx/<>\\*|', restricted=True)) self.assertEqual('xxx', sanitize_filename('xxx/<>\\*|', restricted=True))
self.assertEqual('yes_no', sanitize_filename('yes? no', restricted=True)) self.assertEqual('yes_no', sanitize_filename('yes? no', restricted=True))
self.assertEqual('this_-_that', sanitize_filename('this: that', restricted=True)) self.assertEqual('this_-_that', sanitize_filename('this: that', restricted=True))
tests =_compat_str('a\xe4b\u4e2d\u56fd\u7684c') tests =_compat_str('a\xe4b\u4e2d\u56fd\u7684c')
self.assertEqual(sanitize_filename(tests, restricted=True), 'a_b_c') self.assertEqual(sanitize_filename(tests, restricted=True), 'a_b_c')
self.assertTrue(sanitize_filename(_compat_str('\xf6'), restricted=True) != '') # No empty filename self.assertTrue(sanitize_filename(_compat_str('\xf6'), restricted=True) != '') # No empty filename
forbidden = '"\0\\/&!: \'\t\n' forbidden = '"\0\\/&!: \'\t\n'
for fc in forbidden: for fc in forbidden:
for fbc in forbidden: for fbc in forbidden:
self.assertTrue(fbc not in sanitize_filename(fc, restricted=True)) self.assertTrue(fbc not in sanitize_filename(fc, restricted=True))
# Handle a common case more neatly # Handle a common case more neatly
self.assertEqual(sanitize_filename(_compat_str('\u5927\u58f0\u5e26 - Song'), restricted=True), 'Song') self.assertEqual(sanitize_filename(_compat_str('\u5927\u58f0\u5e26 - Song'), restricted=True), 'Song')
self.assertEqual(sanitize_filename(_compat_str('\u603b\u7edf: Speech'), restricted=True), 'Speech') self.assertEqual(sanitize_filename(_compat_str('\u603b\u7edf: Speech'), restricted=True), 'Speech')
# .. but make sure the file name is never empty # .. but make sure the file name is never empty
self.assertTrue(sanitize_filename('-', restricted=True) != '') self.assertTrue(sanitize_filename('-', restricted=True) != '')
self.assertTrue(sanitize_filename(':', restricted=True) != '') self.assertTrue(sanitize_filename(':', restricted=True) != '')
def test_ordered_set(self): def test_ordered_set(self):
self.assertEqual(orderedSet([1,1,2,3,4,4,5,6,7,3,5]), [1,2,3,4,5,6,7]) self.assertEqual(orderedSet([1,1,2,3,4,4,5,6,7,3,5]), [1,2,3,4,5,6,7])
self.assertEqual(orderedSet([]), []) self.assertEqual(orderedSet([]), [])
self.assertEqual(orderedSet([1]), [1]) self.assertEqual(orderedSet([1]), [1])
#keep the list ordered #keep the list ordered
self.assertEqual(orderedSet([135,1,1,1]), [135,1]) self.assertEqual(orderedSet([135,1,1,1]), [135,1])
def test_unescape_html(self): def test_unescape_html(self):
self.assertEqual(unescapeHTML(_compat_str('%20;')), _compat_str('%20;')) self.assertEqual(unescapeHTML(_compat_str('%20;')), _compat_str('%20;'))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,189 +10,189 @@ from utils import *
class PostProcessor(object): class PostProcessor(object):
"""Post Processor class. """Post Processor class.
PostProcessor objects can be added to downloaders with their PostProcessor objects can be added to downloaders with their
add_post_processor() method. When the downloader has finished a add_post_processor() method. When the downloader has finished a
successful download, it will take its internal chain of PostProcessors successful download, it will take its internal chain of PostProcessors
and start calling the run() method on each one of them, first with and start calling the run() method on each one of them, first with
an initial argument and then with the returned value of the previous an initial argument and then with the returned value of the previous
PostProcessor. PostProcessor.
The chain will be stopped if one of them ever returns None or the end The chain will be stopped if one of them ever returns None or the end
of the chain is reached. of the chain is reached.
PostProcessor objects follow a "mutual registration" process similar PostProcessor objects follow a "mutual registration" process similar
to InfoExtractor objects. to InfoExtractor objects.
""" """
_downloader = None _downloader = None
def __init__(self, downloader=None): def __init__(self, downloader=None):
self._downloader = downloader self._downloader = downloader
def set_downloader(self, downloader): def set_downloader(self, downloader):
"""Sets the downloader for this PP.""" """Sets the downloader for this PP."""
self._downloader = downloader self._downloader = downloader
def run(self, information): def run(self, information):
"""Run the PostProcessor. """Run the PostProcessor.
The "information" argument is a dictionary like the ones The "information" argument is a dictionary like the ones
composed by InfoExtractors. The only difference is that this composed by InfoExtractors. The only difference is that this
one has an extra field called "filepath" that points to the one has an extra field called "filepath" that points to the
downloaded file. downloaded file.
When this method returns None, the postprocessing chain is When this method returns None, the postprocessing chain is
stopped. However, this method may return an information stopped. However, this method may return an information
dictionary that will be passed to the next postprocessing dictionary that will be passed to the next postprocessing
object in the chain. It can be the one it received after object in the chain. It can be the one it received after
changing some fields. changing some fields.
In addition, this method may raise a PostProcessingError In addition, this method may raise a PostProcessingError
exception that will be taken into account by the downloader exception that will be taken into account by the downloader
it was called from. it was called from.
""" """
return information # by default, do nothing return information # by default, do nothing
class AudioConversionError(BaseException): class AudioConversionError(BaseException):
def __init__(self, message): def __init__(self, message):
self.message = message self.message = message
class FFmpegExtractAudioPP(PostProcessor): class FFmpegExtractAudioPP(PostProcessor):
def __init__(self, downloader=None, preferredcodec=None, preferredquality=None, keepvideo=False): def __init__(self, downloader=None, preferredcodec=None, preferredquality=None, keepvideo=False):
PostProcessor.__init__(self, downloader) PostProcessor.__init__(self, downloader)
if preferredcodec is None: if preferredcodec is None:
preferredcodec = 'best' preferredcodec = 'best'
self._preferredcodec = preferredcodec self._preferredcodec = preferredcodec
self._preferredquality = preferredquality self._preferredquality = preferredquality
self._keepvideo = keepvideo self._keepvideo = keepvideo
self._exes = self.detect_executables() self._exes = self.detect_executables()
@staticmethod @staticmethod
def detect_executables(): def detect_executables():
def executable(exe): def executable(exe):
try: try:
subprocess.Popen([exe, '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() subprocess.Popen([exe, '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
except OSError: except OSError:
return False return False
return exe return exe
programs = ['avprobe', 'avconv', 'ffmpeg', 'ffprobe'] programs = ['avprobe', 'avconv', 'ffmpeg', 'ffprobe']
return dict((program, executable(program)) for program in programs) return dict((program, executable(program)) for program in programs)
def get_audio_codec(self, path): def get_audio_codec(self, path):
if not self._exes['ffprobe'] and not self._exes['avprobe']: return None if not self._exes['ffprobe'] and not self._exes['avprobe']: return None
try: try:
cmd = [self._exes['avprobe'] or self._exes['ffprobe'], '-show_streams', '--', encodeFilename(path)] cmd = [self._exes['avprobe'] or self._exes['ffprobe'], '-show_streams', '--', encodeFilename(path)]
handle = subprocess.Popen(cmd, stderr=file(os.path.devnull, 'w'), stdout=subprocess.PIPE) handle = subprocess.Popen(cmd, stderr=file(os.path.devnull, 'w'), stdout=subprocess.PIPE)
output = handle.communicate()[0] output = handle.communicate()[0]
if handle.wait() != 0: if handle.wait() != 0:
return None return None
except (IOError, OSError): except (IOError, OSError):
return None return None
audio_codec = None audio_codec = None
for line in output.split('\n'): for line in output.split('\n'):
if line.startswith('codec_name='): if line.startswith('codec_name='):
audio_codec = line.split('=')[1].strip() audio_codec = line.split('=')[1].strip()
elif line.strip() == 'codec_type=audio' and audio_codec is not None: elif line.strip() == 'codec_type=audio' and audio_codec is not None:
return audio_codec return audio_codec
return None return None
def run_ffmpeg(self, path, out_path, codec, more_opts): def run_ffmpeg(self, path, out_path, codec, more_opts):
if not self._exes['ffmpeg'] and not self._exes['avconv']: if not self._exes['ffmpeg'] and not self._exes['avconv']:
raise AudioConversionError('ffmpeg or avconv not found. Please install one.') raise AudioConversionError('ffmpeg or avconv not found. Please install one.')
if codec is None: if codec is None:
acodec_opts = [] acodec_opts = []
else: else:
acodec_opts = ['-acodec', codec] acodec_opts = ['-acodec', codec]
cmd = ([self._exes['avconv'] or self._exes['ffmpeg'], '-y', '-i', encodeFilename(path), '-vn'] cmd = ([self._exes['avconv'] or self._exes['ffmpeg'], '-y', '-i', encodeFilename(path), '-vn']
+ acodec_opts + more_opts + + acodec_opts + more_opts +
['--', encodeFilename(out_path)]) ['--', encodeFilename(out_path)])
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout,stderr = p.communicate() stdout,stderr = p.communicate()
if p.returncode != 0: if p.returncode != 0:
msg = stderr.strip().split('\n')[-1] msg = stderr.strip().split('\n')[-1]
raise AudioConversionError(msg) raise AudioConversionError(msg)
def run(self, information): def run(self, information):
path = information['filepath'] path = information['filepath']
filecodec = self.get_audio_codec(path) filecodec = self.get_audio_codec(path)
if filecodec is None: if filecodec is None:
self._downloader.to_stderr(u'WARNING: unable to obtain file audio codec with ffprobe') self._downloader.to_stderr(u'WARNING: unable to obtain file audio codec with ffprobe')
return None return None
more_opts = [] more_opts = []
if self._preferredcodec == 'best' or self._preferredcodec == filecodec or (self._preferredcodec == 'm4a' and filecodec == 'aac'): if self._preferredcodec == 'best' or self._preferredcodec == filecodec or (self._preferredcodec == 'm4a' and filecodec == 'aac'):
if self._preferredcodec == 'm4a' and filecodec == 'aac': if self._preferredcodec == 'm4a' and filecodec == 'aac':
# Lossless, but in another container # Lossless, but in another container
acodec = 'copy' acodec = 'copy'
extension = self._preferredcodec extension = self._preferredcodec
more_opts = [self._exes['avconv'] and '-bsf:a' or '-absf', 'aac_adtstoasc'] more_opts = [self._exes['avconv'] and '-bsf:a' or '-absf', 'aac_adtstoasc']
elif filecodec in ['aac', 'mp3', 'vorbis']: elif filecodec in ['aac', 'mp3', 'vorbis']:
# Lossless if possible # Lossless if possible
acodec = 'copy' acodec = 'copy'
extension = filecodec extension = filecodec
if filecodec == 'aac': if filecodec == 'aac':
more_opts = ['-f', 'adts'] more_opts = ['-f', 'adts']
if filecodec == 'vorbis': if filecodec == 'vorbis':
extension = 'ogg' extension = 'ogg'
else: else:
# MP3 otherwise. # MP3 otherwise.
acodec = 'libmp3lame' acodec = 'libmp3lame'
extension = 'mp3' extension = 'mp3'
more_opts = [] more_opts = []
if self._preferredquality is not None: if self._preferredquality is not None:
if int(self._preferredquality) < 10: if int(self._preferredquality) < 10:
more_opts += [self._exes['avconv'] and '-q:a' or '-aq', self._preferredquality] more_opts += [self._exes['avconv'] and '-q:a' or '-aq', self._preferredquality]
else: else:
more_opts += [self._exes['avconv'] and '-b:a' or '-ab', self._preferredquality + 'k'] more_opts += [self._exes['avconv'] and '-b:a' or '-ab', self._preferredquality + 'k']
else: else:
# We convert the audio (lossy) # We convert the audio (lossy)
acodec = {'mp3': 'libmp3lame', 'aac': 'aac', 'm4a': 'aac', 'vorbis': 'libvorbis', 'wav': None}[self._preferredcodec] acodec = {'mp3': 'libmp3lame', 'aac': 'aac', 'm4a': 'aac', 'vorbis': 'libvorbis', 'wav': None}[self._preferredcodec]
extension = self._preferredcodec extension = self._preferredcodec
more_opts = [] more_opts = []
if self._preferredquality is not None: if self._preferredquality is not None:
if int(self._preferredquality) < 10: if int(self._preferredquality) < 10:
more_opts += [self._exes['avconv'] and '-q:a' or '-aq', self._preferredquality] more_opts += [self._exes['avconv'] and '-q:a' or '-aq', self._preferredquality]
else: else:
more_opts += [self._exes['avconv'] and '-b:a' or '-ab', self._preferredquality + 'k'] more_opts += [self._exes['avconv'] and '-b:a' or '-ab', self._preferredquality + 'k']
if self._preferredcodec == 'aac': if self._preferredcodec == 'aac':
more_opts += ['-f', 'adts'] more_opts += ['-f', 'adts']
if self._preferredcodec == 'm4a': if self._preferredcodec == 'm4a':
more_opts += [self._exes['avconv'] and '-bsf:a' or '-absf', 'aac_adtstoasc'] more_opts += [self._exes['avconv'] and '-bsf:a' or '-absf', 'aac_adtstoasc']
if self._preferredcodec == 'vorbis': if self._preferredcodec == 'vorbis':
extension = 'ogg' extension = 'ogg'
if self._preferredcodec == 'wav': if self._preferredcodec == 'wav':
extension = 'wav' extension = 'wav'
more_opts += ['-f', 'wav'] more_opts += ['-f', 'wav']
prefix, sep, ext = path.rpartition(u'.') # not os.path.splitext, since the latter does not work on unicode in all setups prefix, sep, ext = path.rpartition(u'.') # not os.path.splitext, since the latter does not work on unicode in all setups
new_path = prefix + sep + extension new_path = prefix + sep + extension
self._downloader.to_screen(u'[' + (self._exes['avconv'] and 'avconv' or 'ffmpeg') + '] Destination: ' + new_path) self._downloader.to_screen(u'[' + (self._exes['avconv'] and 'avconv' or 'ffmpeg') + '] Destination: ' + new_path)
try: try:
self.run_ffmpeg(path, new_path, acodec, more_opts) self.run_ffmpeg(path, new_path, acodec, more_opts)
except: except:
etype,e,tb = sys.exc_info() etype,e,tb = sys.exc_info()
if isinstance(e, AudioConversionError): if isinstance(e, AudioConversionError):
self._downloader.to_stderr(u'ERROR: audio conversion failed: ' + e.message) self._downloader.to_stderr(u'ERROR: audio conversion failed: ' + e.message)
else: else:
self._downloader.to_stderr(u'ERROR: error running ' + (self._exes['avconv'] and 'avconv' or 'ffmpeg')) self._downloader.to_stderr(u'ERROR: error running ' + (self._exes['avconv'] and 'avconv' or 'ffmpeg'))
return None return None
# Try to update the date time for extracted audio file. # Try to update the date time for extracted audio file.
if information.get('filetime') is not None: if information.get('filetime') is not None:
try: try:
os.utime(encodeFilename(new_path), (time.time(), information['filetime'])) os.utime(encodeFilename(new_path), (time.time(), information['filetime']))
except: except:
self._downloader.to_stderr(u'WARNING: Cannot update utime of audio file') self._downloader.to_stderr(u'WARNING: Cannot update utime of audio file')
if not self._keepvideo: if not self._keepvideo:
try: try:
os.remove(encodeFilename(path)) os.remove(encodeFilename(path))
except (IOError, OSError): except (IOError, OSError):
self._downloader.to_stderr(u'WARNING: Unable to remove downloaded video file') self._downloader.to_stderr(u'WARNING: Unable to remove downloaded video file')
return None return None
information['filepath'] = new_path information['filepath'] = new_path
return information return information

File diff suppressed because it is too large Load Diff

View File

@ -4,4 +4,4 @@
import __init__ import __init__
if __name__ == '__main__': if __name__ == '__main__':
__init__.main() __init__.main()

View File

@ -12,490 +12,490 @@ import email.utils
import json import json
try: try:
import urllib.request as compat_urllib_request import urllib.request as compat_urllib_request
except ImportError: # Python 2 except ImportError: # Python 2
import urllib2 as compat_urllib_request import urllib2 as compat_urllib_request
try: try:
import urllib.error as compat_urllib_error import urllib.error as compat_urllib_error
except ImportError: # Python 2 except ImportError: # Python 2
import urllib2 as compat_urllib_error import urllib2 as compat_urllib_error
try: try:
import urllib.parse as compat_urllib_parse import urllib.parse as compat_urllib_parse
except ImportError: # Python 2 except ImportError: # Python 2
import urllib as compat_urllib_parse import urllib as compat_urllib_parse
try: try:
import http.cookiejar as compat_cookiejar import http.cookiejar as compat_cookiejar
except ImportError: # Python 2 except ImportError: # Python 2
import cookielib as compat_cookiejar import cookielib as compat_cookiejar
try: try:
import html.entities as compat_html_entities import html.entities as compat_html_entities
except ImportError: # Python 2 except ImportError: # Python 2
import htmlentitydefs as compat_html_entities import htmlentitydefs as compat_html_entities
try: try:
import html.parser as compat_html_parser import html.parser as compat_html_parser
except ImportError: # Python 2 except ImportError: # Python 2
import HTMLParser as compat_html_parser import HTMLParser as compat_html_parser
try: try:
import http.client as compat_http_client import http.client as compat_http_client
except ImportError: # Python 2 except ImportError: # Python 2
import httplib as compat_http_client import httplib as compat_http_client
try: try:
from urllib.parse import parse_qs as compat_parse_qs from urllib.parse import parse_qs as compat_parse_qs
except ImportError: # Python 2 except ImportError: # Python 2
# HACK: The following is the correct parse_qs implementation from cpython 3's stdlib. # HACK: The following is the correct parse_qs implementation from cpython 3's stdlib.
# Python 2's version is apparently totally broken # Python 2's version is apparently totally broken
def _unquote(string, encoding='utf-8', errors='replace'): def _unquote(string, encoding='utf-8', errors='replace'):
if string == '': if string == '':
return string return string
res = string.split('%') res = string.split('%')
if len(res) == 1: if len(res) == 1:
return string return string
if encoding is None: if encoding is None:
encoding = 'utf-8' encoding = 'utf-8'
if errors is None: if errors is None:
errors = 'replace' errors = 'replace'
# pct_sequence: contiguous sequence of percent-encoded bytes, decoded # pct_sequence: contiguous sequence of percent-encoded bytes, decoded
pct_sequence = b'' pct_sequence = b''
string = res[0] string = res[0]
for item in res[1:]: for item in res[1:]:
try: try:
if not item: if not item:
raise ValueError raise ValueError
pct_sequence += item[:2].decode('hex') pct_sequence += item[:2].decode('hex')
rest = item[2:] rest = item[2:]
if not rest: if not rest:
# This segment was just a single percent-encoded character. # This segment was just a single percent-encoded character.
# May be part of a sequence of code units, so delay decoding. # May be part of a sequence of code units, so delay decoding.
# (Stored in pct_sequence). # (Stored in pct_sequence).
continue continue
except ValueError: except ValueError:
rest = '%' + item rest = '%' + item
# Encountered non-percent-encoded characters. Flush the current # Encountered non-percent-encoded characters. Flush the current
# pct_sequence. # pct_sequence.
string += pct_sequence.decode(encoding, errors) + rest string += pct_sequence.decode(encoding, errors) + rest
pct_sequence = b'' pct_sequence = b''
if pct_sequence: if pct_sequence:
# Flush the final pct_sequence # Flush the final pct_sequence
string += pct_sequence.decode(encoding, errors) string += pct_sequence.decode(encoding, errors)
return string return string
def _parse_qsl(qs, keep_blank_values=False, strict_parsing=False, def _parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace'): encoding='utf-8', errors='replace'):
qs, _coerce_result = qs, unicode qs, _coerce_result = qs, unicode
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
r = [] r = []
for name_value in pairs: for name_value in pairs:
if not name_value and not strict_parsing: if not name_value and not strict_parsing:
continue continue
nv = name_value.split('=', 1) nv = name_value.split('=', 1)
if len(nv) != 2: if len(nv) != 2:
if strict_parsing: if strict_parsing:
raise ValueError("bad query field: %r" % (name_value,)) raise ValueError("bad query field: %r" % (name_value,))
# Handle case of a control-name with no equal sign # Handle case of a control-name with no equal sign
if keep_blank_values: if keep_blank_values:
nv.append('') nv.append('')
else: else:
continue continue
if len(nv[1]) or keep_blank_values: if len(nv[1]) or keep_blank_values:
name = nv[0].replace('+', ' ') name = nv[0].replace('+', ' ')
name = _unquote(name, encoding=encoding, errors=errors) name = _unquote(name, encoding=encoding, errors=errors)
name = _coerce_result(name) name = _coerce_result(name)
value = nv[1].replace('+', ' ') value = nv[1].replace('+', ' ')
value = _unquote(value, encoding=encoding, errors=errors) value = _unquote(value, encoding=encoding, errors=errors)
value = _coerce_result(value) value = _coerce_result(value)
r.append((name, value)) r.append((name, value))
return r return r
def compat_parse_qs(qs, keep_blank_values=False, strict_parsing=False, def compat_parse_qs(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace'): encoding='utf-8', errors='replace'):
parsed_result = {} parsed_result = {}
pairs = _parse_qsl(qs, keep_blank_values, strict_parsing, pairs = _parse_qsl(qs, keep_blank_values, strict_parsing,
encoding=encoding, errors=errors) encoding=encoding, errors=errors)
for name, value in pairs: for name, value in pairs:
if name in parsed_result: if name in parsed_result:
parsed_result[name].append(value) parsed_result[name].append(value)
else: else:
parsed_result[name] = [value] parsed_result[name] = [value]
return parsed_result return parsed_result
try: try:
compat_str = unicode # Python 2 compat_str = unicode # Python 2
except NameError: except NameError:
compat_str = str compat_str = str
try: try:
compat_chr = unichr # Python 2 compat_chr = unichr # Python 2
except NameError: except NameError:
compat_chr = chr compat_chr = chr
std_headers = { std_headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate', 'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-us,en;q=0.5', 'Accept-Language': 'en-us,en;q=0.5',
} }
def preferredencoding(): def preferredencoding():
"""Get preferred encoding. """Get preferred encoding.
Returns the best encoding scheme for the system, based on Returns the best encoding scheme for the system, based on
locale.getpreferredencoding() and some further tweaks. locale.getpreferredencoding() and some further tweaks.
""" """
try: try:
pref = locale.getpreferredencoding() pref = locale.getpreferredencoding()
u'TEST'.encode(pref) u'TEST'.encode(pref)
except: except:
pref = 'UTF-8' pref = 'UTF-8'
return pref return pref
if sys.version_info < (3,0): if sys.version_info < (3,0):
def compat_print(s): def compat_print(s):
print(s.encode(preferredencoding(), 'xmlcharrefreplace')) print(s.encode(preferredencoding(), 'xmlcharrefreplace'))
else: else:
def compat_print(s): def compat_print(s):
assert type(s) == type(u'') assert type(s) == type(u'')
print(s) print(s)
def htmlentity_transform(matchobj): def htmlentity_transform(matchobj):
"""Transforms an HTML entity to a character. """Transforms an HTML entity to a character.
This function receives a match object and is intended to be used with This function receives a match object and is intended to be used with
the re.sub() function. the re.sub() function.
""" """
entity = matchobj.group(1) entity = matchobj.group(1)
# Known non-numeric HTML entity # Known non-numeric HTML entity
if entity in compat_html_entities.name2codepoint: if entity in compat_html_entities.name2codepoint:
return compat_chr(compat_html_entities.name2codepoint[entity]) return compat_chr(compat_html_entities.name2codepoint[entity])
mobj = re.match(u'(?u)#(x?\\d+)', entity) mobj = re.match(u'(?u)#(x?\\d+)', entity)
if mobj is not None: if mobj is not None:
numstr = mobj.group(1) numstr = mobj.group(1)
if numstr.startswith(u'x'): if numstr.startswith(u'x'):
base = 16 base = 16
numstr = u'0%s' % numstr numstr = u'0%s' % numstr
else: else:
base = 10 base = 10
return compat_chr(int(numstr, base)) return compat_chr(int(numstr, base))
# Unknown entity in name, return its literal representation # Unknown entity in name, return its literal representation
return (u'&%s;' % entity) return (u'&%s;' % entity)
compat_html_parser.locatestarttagend = re.compile(r"""<[a-zA-Z][-.a-zA-Z0-9:_]*(?:\s+(?:(?<=['"\s])[^\s/>][^\s/=>]*(?:\s*=+\s*(?:'[^']*'|"[^"]*"|(?!['"])[^>\s]*))?\s*)*)?\s*""", re.VERBOSE) # backport bugfix compat_html_parser.locatestarttagend = re.compile(r"""<[a-zA-Z][-.a-zA-Z0-9:_]*(?:\s+(?:(?<=['"\s])[^\s/>][^\s/=>]*(?:\s*=+\s*(?:'[^']*'|"[^"]*"|(?!['"])[^>\s]*))?\s*)*)?\s*""", re.VERBOSE) # backport bugfix
class IDParser(compat_html_parser.HTMLParser): class IDParser(compat_html_parser.HTMLParser):
"""Modified HTMLParser that isolates a tag with the specified id""" """Modified HTMLParser that isolates a tag with the specified id"""
def __init__(self, id): def __init__(self, id):
self.id = id self.id = id
self.result = None self.result = None
self.started = False self.started = False
self.depth = {} self.depth = {}
self.html = None self.html = None
self.watch_startpos = False self.watch_startpos = False
self.error_count = 0 self.error_count = 0
compat_html_parser.HTMLParser.__init__(self) compat_html_parser.HTMLParser.__init__(self)
def error(self, message): def error(self, message):
if self.error_count > 10 or self.started: if self.error_count > 10 or self.started:
raise compat_html_parser.HTMLParseError(message, self.getpos()) raise compat_html_parser.HTMLParseError(message, self.getpos())
self.rawdata = '\n'.join(self.html.split('\n')[self.getpos()[0]:]) # skip one line self.rawdata = '\n'.join(self.html.split('\n')[self.getpos()[0]:]) # skip one line
self.error_count += 1 self.error_count += 1
self.goahead(1) self.goahead(1)
def loads(self, html): def loads(self, html):
self.html = html self.html = html
self.feed(html) self.feed(html)
self.close() self.close()
def handle_starttag(self, tag, attrs): def handle_starttag(self, tag, attrs):
attrs = dict(attrs) attrs = dict(attrs)
if self.started: if self.started:
self.find_startpos(None) self.find_startpos(None)
if 'id' in attrs and attrs['id'] == self.id: if 'id' in attrs and attrs['id'] == self.id:
self.result = [tag] self.result = [tag]
self.started = True self.started = True
self.watch_startpos = True self.watch_startpos = True
if self.started: if self.started:
if not tag in self.depth: self.depth[tag] = 0 if not tag in self.depth: self.depth[tag] = 0
self.depth[tag] += 1 self.depth[tag] += 1
def handle_endtag(self, tag): def handle_endtag(self, tag):
if self.started: if self.started:
if tag in self.depth: self.depth[tag] -= 1 if tag in self.depth: self.depth[tag] -= 1
if self.depth[self.result[0]] == 0: if self.depth[self.result[0]] == 0:
self.started = False self.started = False
self.result.append(self.getpos()) self.result.append(self.getpos())
def find_startpos(self, x): def find_startpos(self, x):
"""Needed to put the start position of the result (self.result[1]) """Needed to put the start position of the result (self.result[1])
after the opening tag with the requested id""" after the opening tag with the requested id"""
if self.watch_startpos: if self.watch_startpos:
self.watch_startpos = False self.watch_startpos = False
self.result.append(self.getpos()) self.result.append(self.getpos())
handle_entityref = handle_charref = handle_data = handle_comment = \ handle_entityref = handle_charref = handle_data = handle_comment = \
handle_decl = handle_pi = unknown_decl = find_startpos handle_decl = handle_pi = unknown_decl = find_startpos
def get_result(self): def get_result(self):
if self.result is None: if self.result is None:
return None return None
if len(self.result) != 3: if len(self.result) != 3:
return None return None
lines = self.html.split('\n') lines = self.html.split('\n')
lines = lines[self.result[1][0]-1:self.result[2][0]] lines = lines[self.result[1][0]-1:self.result[2][0]]
lines[0] = lines[0][self.result[1][1]:] lines[0] = lines[0][self.result[1][1]:]
if len(lines) == 1: if len(lines) == 1:
lines[-1] = lines[-1][:self.result[2][1]-self.result[1][1]] lines[-1] = lines[-1][:self.result[2][1]-self.result[1][1]]
lines[-1] = lines[-1][:self.result[2][1]] lines[-1] = lines[-1][:self.result[2][1]]
return '\n'.join(lines).strip() return '\n'.join(lines).strip()
def get_element_by_id(id, html): def get_element_by_id(id, html):
"""Return the content of the tag with the specified id in the passed HTML document""" """Return the content of the tag with the specified id in the passed HTML document"""
parser = IDParser(id) parser = IDParser(id)
try: try:
parser.loads(html) parser.loads(html)
except compat_html_parser.HTMLParseError: except compat_html_parser.HTMLParseError:
pass pass
return parser.get_result() return parser.get_result()
def clean_html(html): def clean_html(html):
"""Clean an HTML snippet into a readable string""" """Clean an HTML snippet into a readable string"""
# Newline vs <br /> # Newline vs <br />
html = html.replace('\n', ' ') html = html.replace('\n', ' ')
html = re.sub('\s*<\s*br\s*/?\s*>\s*', '\n', html) html = re.sub('\s*<\s*br\s*/?\s*>\s*', '\n', html)
# Strip html tags # Strip html tags
html = re.sub('<.*?>', '', html) html = re.sub('<.*?>', '', html)
# Replace html entities # Replace html entities
html = unescapeHTML(html) html = unescapeHTML(html)
return html return html
def sanitize_open(filename, open_mode): def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails. """Try to open the given filename, and slightly tweak it if this fails.
Attempts to open the given filename. If this fails, it tries to change Attempts to open the given filename. If this fails, it tries to change
the filename slightly, step by step, until it's either able to open it the filename slightly, step by step, until it's either able to open it
or it fails and raises a final exception, like the standard open() or it fails and raises a final exception, like the standard open()
function. function.
It returns the tuple (stream, definitive_file_name). It returns the tuple (stream, definitive_file_name).
""" """
try: try:
if filename == u'-': if filename == u'-':
if sys.platform == 'win32': if sys.platform == 'win32':
import msvcrt import msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout, filename) return (sys.stdout, filename)
stream = open(encodeFilename(filename), open_mode) stream = open(encodeFilename(filename), open_mode)
return (stream, filename) return (stream, filename)
except (IOError, OSError) as err: except (IOError, OSError) as err:
# In case of error, try to remove win32 forbidden chars # In case of error, try to remove win32 forbidden chars
filename = re.sub(u'[/<>:"\\|\\\\?\\*]', u'#', filename) filename = re.sub(u'[/<>:"\\|\\\\?\\*]', u'#', filename)
# An exception here should be caught in the caller # An exception here should be caught in the caller
stream = open(encodeFilename(filename), open_mode) stream = open(encodeFilename(filename), open_mode)
return (stream, filename) return (stream, filename)
def timeconvert(timestr): def timeconvert(timestr):
"""Convert RFC 2822 defined time string into system timestamp""" """Convert RFC 2822 defined time string into system timestamp"""
timestamp = None timestamp = None
timetuple = email.utils.parsedate_tz(timestr) timetuple = email.utils.parsedate_tz(timestr)
if timetuple is not None: if timetuple is not None:
timestamp = email.utils.mktime_tz(timetuple) timestamp = email.utils.mktime_tz(timetuple)
return timestamp return timestamp
def sanitize_filename(s, restricted=False): def sanitize_filename(s, restricted=False):
"""Sanitizes a string so it could be used as part of a filename. """Sanitizes a string so it could be used as part of a filename.
If restricted is set, use a stricter subset of allowed characters. If restricted is set, use a stricter subset of allowed characters.
""" """
def replace_insane(char): def replace_insane(char):
if char == '?' or ord(char) < 32 or ord(char) == 127: if char == '?' or ord(char) < 32 or ord(char) == 127:
return '' return ''
elif char == '"': elif char == '"':
return '' if restricted else '\'' return '' if restricted else '\''
elif char == ':': elif char == ':':
return '_-' if restricted else ' -' return '_-' if restricted else ' -'
elif char in '\\/|*<>': elif char in '\\/|*<>':
return '_' return '_'
if restricted and (char in '!&\'' or char.isspace()): if restricted and (char in '!&\'' or char.isspace()):
return '_' return '_'
if restricted and ord(char) > 127: if restricted and ord(char) > 127:
return '_' return '_'
return char return char
result = u''.join(map(replace_insane, s)) result = u''.join(map(replace_insane, s))
while '__' in result: while '__' in result:
result = result.replace('__', '_') result = result.replace('__', '_')
result = result.strip('_') result = result.strip('_')
# Common case of "Foreign band name - English song title" # Common case of "Foreign band name - English song title"
if restricted and result.startswith('-_'): if restricted and result.startswith('-_'):
result = result[2:] result = result[2:]
if not result: if not result:
result = '_' result = '_'
return result return result
def orderedSet(iterable): def orderedSet(iterable):
""" Remove all duplicates from the input iterable """ """ Remove all duplicates from the input iterable """
res = [] res = []
for el in iterable: for el in iterable:
if el not in res: if el not in res:
res.append(el) res.append(el)
return res return res
def unescapeHTML(s): def unescapeHTML(s):
""" """
@param s a string @param s a string
""" """
assert type(s) == type(u'') assert type(s) == type(u'')
result = re.sub(u'(?u)&(.+?);', htmlentity_transform, s) result = re.sub(u'(?u)&(.+?);', htmlentity_transform, s)
return result return result
def encodeFilename(s): def encodeFilename(s):
""" """
@param s The name of the file @param s The name of the file
""" """
assert type(s) == type(u'') assert type(s) == type(u'')
# Python 3 has a Unicode API # Python 3 has a Unicode API
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
return s return s
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5: if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
# Pass u'' directly to use Unicode APIs on Windows 2000 and up # Pass u'' directly to use Unicode APIs on Windows 2000 and up
# (Detecting Windows NT 4 is tricky because 'major >= 4' would # (Detecting Windows NT 4 is tricky because 'major >= 4' would
# match Windows 9x series as well. Besides, NT 4 is obsolete.) # match Windows 9x series as well. Besides, NT 4 is obsolete.)
return s return s
else: else:
return s.encode(sys.getfilesystemencoding(), 'ignore') return s.encode(sys.getfilesystemencoding(), 'ignore')
class DownloadError(Exception): class DownloadError(Exception):
"""Download Error exception. """Download Error exception.
This exception may be thrown by FileDownloader objects if they are not This exception may be thrown by FileDownloader objects if they are not
configured to continue on errors. They will contain the appropriate configured to continue on errors. They will contain the appropriate
error message. error message.
""" """
pass pass
class SameFileError(Exception): class SameFileError(Exception):
"""Same File exception. """Same File exception.
This exception will be thrown by FileDownloader objects if they detect This exception will be thrown by FileDownloader objects if they detect
multiple files would have to be downloaded to the same file on disk. multiple files would have to be downloaded to the same file on disk.
""" """
pass pass
class PostProcessingError(Exception): class PostProcessingError(Exception):
"""Post Processing exception. """Post Processing exception.
This exception may be raised by PostProcessor's .run() method to This exception may be raised by PostProcessor's .run() method to
indicate an error in the postprocessing task. indicate an error in the postprocessing task.
""" """
pass pass
class MaxDownloadsReached(Exception): class MaxDownloadsReached(Exception):
""" --max-downloads limit has been reached. """ """ --max-downloads limit has been reached. """
pass pass
class UnavailableVideoError(Exception): class UnavailableVideoError(Exception):
"""Unavailable Format exception. """Unavailable Format exception.
This exception will be thrown when a video is requested This exception will be thrown when a video is requested
in a format that is not available for that video. in a format that is not available for that video.
""" """
pass pass
class ContentTooShortError(Exception): class ContentTooShortError(Exception):
"""Content Too Short exception. """Content Too Short exception.
This exception may be raised by FileDownloader objects when a file they This exception may be raised by FileDownloader objects when a file they
download is too small for what the server announced first, indicating download is too small for what the server announced first, indicating
the connection was probably interrupted. the connection was probably interrupted.
""" """
# Both in bytes # Both in bytes
downloaded = None downloaded = None
expected = None expected = None
def __init__(self, downloaded, expected): def __init__(self, downloaded, expected):
self.downloaded = downloaded self.downloaded = downloaded
self.expected = expected self.expected = expected
class Trouble(Exception): class Trouble(Exception):
"""Trouble helper exception """Trouble helper exception
This is an exception to be handled with This is an exception to be handled with
FileDownloader.trouble FileDownloader.trouble
""" """
class YoutubeDLHandler(compat_urllib_request.HTTPHandler): class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
"""Handler for HTTP requests and responses. """Handler for HTTP requests and responses.
This class, when installed with an OpenerDirector, automatically adds This class, when installed with an OpenerDirector, automatically adds
the standard headers to every HTTP request and handles gzipped and the standard headers to every HTTP request and handles gzipped and
deflated responses from web servers. If compression is to be avoided in deflated responses from web servers. If compression is to be avoided in
a particular request, the original request in the program code only has a particular request, the original request in the program code only has
to include the HTTP header "Youtubedl-No-Compression", which will be to include the HTTP header "Youtubedl-No-Compression", which will be
removed before making the real request. removed before making the real request.
Part of this code was copied from: Part of this code was copied from:
http://techknack.net/python-urllib2-handlers/ http://techknack.net/python-urllib2-handlers/
Andrew Rowls, the author of that code, agreed to release it to the Andrew Rowls, the author of that code, agreed to release it to the
public domain. public domain.
""" """
@staticmethod @staticmethod
def deflate(data): def deflate(data):
try: try:
return zlib.decompress(data, -zlib.MAX_WBITS) return zlib.decompress(data, -zlib.MAX_WBITS)
except zlib.error: except zlib.error:
return zlib.decompress(data) return zlib.decompress(data)
@staticmethod @staticmethod
def addinfourl_wrapper(stream, headers, url, code): def addinfourl_wrapper(stream, headers, url, code):
if hasattr(compat_urllib_request.addinfourl, 'getcode'): if hasattr(compat_urllib_request.addinfourl, 'getcode'):
return compat_urllib_request.addinfourl(stream, headers, url, code) return compat_urllib_request.addinfourl(stream, headers, url, code)
ret = compat_urllib_request.addinfourl(stream, headers, url) ret = compat_urllib_request.addinfourl(stream, headers, url)
ret.code = code ret.code = code
return ret return ret
def http_request(self, req): def http_request(self, req):
for h in std_headers: for h in std_headers:
if h in req.headers: if h in req.headers:
del req.headers[h] del req.headers[h]
req.add_header(h, std_headers[h]) req.add_header(h, std_headers[h])
if 'Youtubedl-no-compression' in req.headers: if 'Youtubedl-no-compression' in req.headers:
if 'Accept-encoding' in req.headers: if 'Accept-encoding' in req.headers:
del req.headers['Accept-encoding'] del req.headers['Accept-encoding']
del req.headers['Youtubedl-no-compression'] del req.headers['Youtubedl-no-compression']
return req return req
def http_response(self, req, resp): def http_response(self, req, resp):
old_resp = resp old_resp = resp
# gzip # gzip
if resp.headers.get('Content-encoding', '') == 'gzip': if resp.headers.get('Content-encoding', '') == 'gzip':
gz = gzip.GzipFile(fileobj=io.BytesIO(resp.read()), mode='r') gz = gzip.GzipFile(fileobj=io.BytesIO(resp.read()), mode='r')
resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code) resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg resp.msg = old_resp.msg
# deflate # deflate
if resp.headers.get('Content-encoding', '') == 'deflate': if resp.headers.get('Content-encoding', '') == 'deflate':
gz = io.BytesIO(self.deflate(resp.read())) gz = io.BytesIO(self.deflate(resp.read()))
resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code) resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg resp.msg = old_resp.msg
return resp return resp