1
0
Fork 0
mirror of https://github.com/ytdl-org/youtube-dl.git synced 2024-11-10 19:41:20 +00:00

[MicrosoftStream] Fixed code style and refactor

This commit is contained in:
Nick Lai 2020-04-10 17:57:10 +08:00
parent 05698ebf9b
commit 5416301787
No known key found for this signature in database
GPG key ID: AF5E3B79EE6B1CC4

View file

@ -17,11 +17,36 @@ class MicrosoftStreamBaseIE(InfoExtractor):
if username is not None or password is not None: if username is not None or password is not None:
raise ExtractorError('MicrosoftStream Extractor does not support username/password log-in at the moment. Please use cookies log-in instead. See https://github.com/ytdl-org/youtube-dl/blob/master/README.md#how-do-i-pass-cookies-to-youtube-dl for more information') raise ExtractorError('MicrosoftStream Extractor does not support username/password log-in at the moment. Please use cookies log-in instead. See https://github.com/ytdl-org/youtube-dl/blob/master/README.md#how-do-i-pass-cookies-to-youtube-dl for more information')
"""
Extraction Helper
"""
def _extract_access_token(self, webpage):
"""
Extract the JWT access token with Regex
"""
self._ACCESS_TOKEN = self._html_search_regex(r"\"AccessToken\":\"(?P<AccessToken>.+?)\"", webpage, 'AccessToken')
return self._ACCESS_TOKEN
def _extract_api_gateway(self, webpage):
"""
Extract the API gateway with Regex
"""
self._API_GATEWAY = self._html_search_regex(r"\"ApiGatewayUri\":\"(?P<APIGateway>.+?)\"", webpage, 'APIGateway')
return self._API_GATEWAY
class MicrosoftStreamIE(MicrosoftStreamBaseIE): class MicrosoftStreamIE(MicrosoftStreamBaseIE):
"""
Extract of single Microsoft Stream video
"""
IE_NAME = 'microsoftstream' IE_NAME = 'microsoftstream'
_VALID_URL = r'https?://(?:(?:web|www)\.)?microsoftstream\.com/video/(?P<id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' # https://regex101.com/r/K1mlgK/1/ _VALID_URL = r'https?://(?:(?:web|www)\.)?microsoftstream\.com/video/(?P<id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' # https://regex101.com/r/K1mlgK/1/
_NETRC_MACHINE = 'microsoftstream' _NETRC_MACHINE = 'microsoftstream'
_ACCESS_TOKEN = None # A JWT token
_API_GATEWAY = None
_TEXTTRACKS_RESPONSE = None
_VIDEO_ID = None
_TEST = { _TEST = {
'url': 'https://web.microsoftstream.com/video/c883c6a5-9895-4900-9a35-62f4b5d506c9', 'url': 'https://web.microsoftstream.com/video/c883c6a5-9895-4900-9a35-62f4b5d506c9',
@ -33,42 +58,49 @@ class MicrosoftStreamIE(MicrosoftStreamBaseIE):
} }
} }
def _remap_thumbnails(self, thumbnail_dict_list): """
output = [] Getters
preference_index = ['extraSmall', 'small', 'medium', 'large']
for _, key in enumerate(thumbnail_dict_list): The following getters include helpful message to prompt developers for potential errors.
output.append({ """
'preference': preference_index.index(key), @property
'url': thumbnail_dict_list[key]['url'] def api_gateway(self):
}) if self._API_GATEWAY is None:
return output raise ExtractorError('API gateway is None. Did you forget to call "_extract_api_gateway"?')
return self._API_GATEWAY
def _remap_playback(self, master_playlist_urls, video_id, http_headers={}): @property
def access_token(self):
if self._ACCESS_TOKEN is None:
raise ExtractorError('Access token is None. Did you forget to call "_extract_access_token"?')
return self._ACCESS_TOKEN
@property
def video_id(self):
if self._VIDEO_ID is None:
raise('Variable "_VIDEO_ID" is not defined. Did you make the main extraction call?')
return self._VIDEO_ID
@property
def headers(self):
return {'Authorization': 'Bearer %s' % self.access_token}
@property
def texttrack_info_endpoint(self):
return "%s/videos/%s/texttracks?api-version=1.3-private" % (self.api_gateway, self.video_id)
@property
def media_info_endpoint(self):
return "%s/videos/%s?$expand=creator,tokens,status,liveEvent,extensions&api-version=1.3-private" % (self.api_gateway, self.video_id)
def _request_texttracks(self):
""" """
A parser for the HLS and MPD playlists from the API endpoint. Make an additional request to Microsoft Stream for the subtitle and auto-caption
""" """
output = [] # Map default variable
self._TEXTTRACKS_RESPONSE = self._download_json(self.texttrack_info_endpoint, self.video_id, headers=self.headers)['value']
for master_playlist_url in master_playlist_urls: return self._TEXTTRACKS_RESPONSE
# Handle HLS Master playlist
if self._determine_protocol(master_playlist_url['mimeType']) == 'm3u8':
varient_playlists = self._extract_m3u8_formats(master_playlist_url['playbackUrl'], video_id, headers=http_headers)
# For MPEG-DASH Master playlists
elif self._determine_protocol(master_playlist_url['mimeType']) == 'http_dash_segments':
varient_playlists = self._extract_mpd_formats(master_playlist_url['playbackUrl'], video_id, headers=http_headers)
else:
self.to_screen('Found unresolvable stream with format %s' % master_playlist_url['mimeType'])
continue
# Patching the "Authorization" header
for varient_playlist in varient_playlists:
varient_playlist['http_headers'] = http_headers
output.append(varient_playlist)
return output
def _determine_protocol(self, mime): def _determine_protocol(self, mime):
""" """
@ -81,53 +113,100 @@ class MicrosoftStreamIE(MicrosoftStreamBaseIE):
else: else:
return None return None
def _remap_texttracks(self, tracks): def _remap_thumbnails(self, thumbnail_dict_list):
output = []
preference_index = ['extraSmall', 'small', 'medium', 'large']
for _, key in enumerate(thumbnail_dict_list):
output.append({
'preference': preference_index.index(key),
'url': thumbnail_dict_list.get(key).get('url')
})
return output
def _remap_playback(self, master_playlist_urls):
""" """
A parser for the texttracks response. A parser for the HLS and MPD playlists from the API endpoint.
""" """
subtitle = {} output = []
automatic_captions = {}
for track in tracks: for master_playlist_url in master_playlist_urls:
if track['autoGenerated'] is True: protocol = self._determine_protocol(master_playlist_url['mimeType'])
if track['language'] not in automatic_captions: # Handle HLS Master playlist
automatic_captions[track['language']] = [] if protocol == 'm3u8':
automatic_captions[track['language']].append({'url': track['url']}) varient_playlists = self._extract_m3u8_formats(master_playlist_url['playbackUrl'], video_id=self.video_id, headers=self.headers)
# For MPEG-DASH Master playlists
elif protocol == 'http_dash_segments':
varient_playlists = self._extract_mpd_formats(master_playlist_url['playbackUrl'], video_id=self.video_id, headers=self.headers)
# For other Master playlists (like Microsoft Smooth Streaming)
else: else:
if track['language'] not in subtitle: self.to_screen('Found unresolvable stream with format %s' % master_playlist_url['mimeType'])
subtitle[track['language']] = [] continue
subtitle[track['language']].append({'url': track['url']})
return (subtitle, automatic_captions) # Patching the "Authorization" header
for varient_playlist in varient_playlists:
varient_playlist['http_headers'] = self.headers
output.append(varient_playlist)
return output
def _extract_subtitle(self, tracks, is_auto_generated):
"""
An internal method for filtering and remapping text tracks
"""
if type(is_auto_generated) is not bool:
raise ExtractorError('Unexpected variable "is_auto_generated" type: must be a Boolean')
subtitle_subset = {}
for track in tracks:
track_language = track.get('language') # The track language must have a language code.
if track.get('autoGenerated') is is_auto_generated:
if track_language not in subtitle_subset:
subtitle_subset[track_language] = [] # Scaffold an empty list for the object to insert into
# Since the subtitle is token protected, a get request will fire here.
data = self._download_webpage(url_or_request=track.get('url'), video_id=self.video_id, headers=self.headers)
subtitle_subset[track_language].append({'data': data, "ext": "vtt"})
return subtitle_subset
def _get_subtitles(self, tracks=None): # Fulfill abstract method
tracks = self._TEXTTRACKS_RESPONSE if tracks is None else tracks
return self._extract_subtitle(tracks, False)
def _get_automatic_captions(self, tracks=None): # Fulfill abstract method
tracks = self._TEXTTRACKS_RESPONSE if tracks is None else tracks
return self._extract_subtitle(tracks, True)
def _real_extract(self, url): def _real_extract(self, url):
video_id = self._match_id(url) self._VIDEO_ID = self._match_id(url)
webpage = self._download_webpage(url, video_id) webpage = self._download_webpage(url, self.video_id)
if not self.is_logged_in(webpage): if not self.is_logged_in(webpage):
return self.raise_login_required() return self.raise_login_required()
# Extract access token from webpage # Extract access token from webpage
accessToken = self._html_search_regex(r"\"AccessToken\":\"(?P<AccessToken>.+?)\"", webpage, 'AccessToken') self._extract_access_token(webpage)
apiGateway = self._html_search_regex(r"\"ApiGatewayUri\":\"(?P<APIGateway>.+?)\"", webpage, 'APIGateway') self._extract_api_gateway(webpage)
headers = {'Authorization': 'Bearer %s' % accessToken}
# "GET" api for video information # "GET" api for video information
apiUri = "%s/videos/%s?$expand=creator,tokens,status,liveEvent,extensions&api-version=1.3-private" % (apiGateway, video_id) apiUri = self.media_info_endpoint
apiCall = self._download_json(apiUri, video_id, headers=headers) apiCall = self._download_json(apiUri, self.video_id, headers=self.headers)
# "GET" api for subtitles and auto-captions texttracks = self._request_texttracks()
texttracksUri = "%s/videos/%s/texttracks?api-version=1.3-private" % (apiGateway, video_id)
texttracksCall = self._download_json(texttracksUri, video_id, headers=headers)['value']
subtitles, automatic_captions = self._remap_texttracks(texttracksCall)
return { return {
'id': video_id, 'id': self.video_id,
'title': apiCall['name'], 'title': apiCall['name'],
'description': apiCall['description'], 'description': apiCall.get('description'),
'uploader': apiCall['creator']['name'], 'uploader': apiCall.get('creator').get('name'),
'thumbnails': self._remap_thumbnails(apiCall['posterImage']), 'thumbnails': self._remap_thumbnails(apiCall.get('posterImage')),
'formats': self._remap_playback(apiCall['playbackUrls'], video_id, http_headers=headers), 'formats': self._remap_playback(apiCall['playbackUrls']),
'subtitles': subtitles, 'subtitles': self._get_subtitles(texttracks),
'automatic_captions': automatic_captions, 'automatic_captions': self._get_automatic_captions(texttracks),
'is_live': False, 'is_live': False,
# 'duration': apiCall['media']['duration'], # 'duration': apiCall['media']['duration'],
} }