From 1b6b43a1bb69e2bbf016c9b6dea807efe45a9796 Mon Sep 17 00:00:00 2001 From: coletdjnz Date: Sat, 14 Sep 2024 12:06:05 +1200 Subject: [PATCH] initial oauth implementation --- yt_dlp/extractor/youtube.py | 231 +++++++++++++++++++++++++++++++----- 1 file changed, 199 insertions(+), 32 deletions(-) diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index 343d103f6..a4194e752 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -18,6 +18,7 @@ import time import traceback import urllib.parse +import uuid from .common import InfoExtractor, SearchInfoExtractor from .openload import PhantomJSwrapper @@ -55,6 +56,7 @@ str_or_none, str_to_int, strftime_or_none, + time_seconds, traverse_obj, try_call, try_get, @@ -526,6 +528,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor): _YT_HANDLE_RE = r'@[\w.-]{3,30}' # https://support.google.com/youtube/answer/11585688?hl=en _YT_CHANNEL_UCID_RE = r'UC[\w-]{22}' + _NETRC_MACHINE = 'youtube' + def ucid_or_none(self, ucid): return self._search_regex(rf'^({self._YT_CHANNEL_UCID_RE})$', ucid, 'UC-id', default=None) @@ -584,8 +588,144 @@ def _real_initialize(self): self._initialize_consent() self._check_login_required() + def _perform_login(self, username, password): + auth_type, sep, user = (username or '').partition('+') + + if user and sep != '+': + raise ExtractorError('Invalid username format. Expected "AUTH_TYPE+USER".', expected=True) + + if auth_type != 'oauth': + raise ExtractorError( + 'Login using username and password is not supported. ' + 'Use "--username oauth[+USER] --password \'\'" to login using an oauth, ' + f'or else {self._login_hint(method="cookies")}', expected=True) + + self._initialize_oauth(user, password) + + _OAUTH_USER = None + _OAUTH_REFRESH_TOKEN = None + _OAUTH_ACCESS_TOKEN = None + _OAUTH_ACCESS_TOKEN_EXPIRY = None + _OAUTH_ACCESS_TOKEN_TYPE = None + + # YouTube TV (TVHTML5) client + _OAUTH_CLIENT_ID = '861556708454-d6dlm3lh05idd8npek18k6be8ba3oc68.apps.googleusercontent.com' + _OAUTH_CLIENT_SECRET = 'SboVhoG9s0rNafixCSGGKXAT' + _OAUTH_SCOPE = 'http://gdata.youtube.com https://www.googleapis.com/auth/youtube' + + def _set_oauth_info(self, token_response, user): + self._OAUTH_ACCESS_TOKEN = token_response['access_token'] + self._OAUTH_ACCESS_TOKEN_TYPE = token_response['token_type'] + refresh_token = traverse_obj(token_response, 'refresh_token', {str}) + + if refresh_token: + self.cache.store(self._NETRC_MACHINE, f'oauth_refresh_token_{user}', refresh_token) + self._OAUTH_REFRESH_TOKEN = refresh_token + + self._OAUTH_ACCESS_TOKEN_EXPIRY = time_seconds( + seconds=traverse_obj(token_response, ('expires_in', {float_or_none}), default=300) - 10) + + self._OAUTH_USER = user + + def _initialize_oauth(self, user, refresh_token): + if not user: + user = 'default' + + self.write_debug(f'Logging in using oauth with user "{user}"') + + if refresh_token: + self._OAUTH_REFRESH_TOKEN = refresh_token + + if not self._OAUTH_REFRESH_TOKEN: + self._OAUTH_REFRESH_TOKEN = self.cache.load(self._NETRC_MACHINE, f'oauth_refresh_token_{user}') + + if self._OAUTH_REFRESH_TOKEN: + try: + token_response = self._refresh_token(self._OAUTH_REFRESH_TOKEN) + except ExtractorError as e: + self.report_warning(f'Failed to refresh access token: {e}. Reinitializing oauth authorization flow.') + token_response = self._oauth_authorize() + else: + token_response = self._oauth_authorize() + + self._set_oauth_info(token_response, user) + self.write_debug(f'Logged in as "{user}" using oauth') + + def _refresh_token(self, refresh_token): + token_response = self._download_json( + 'https://www.youtube.com/o/oauth2/token', + video_id='oauth', + note='Refreshing oauth token', + data=json.dumps({ + 'client_id': self._OAUTH_CLIENT_ID, + 'client_secret': self._OAUTH_CLIENT_SECRET, + 'refresh_token': refresh_token, + 'grant_type': 'refresh_token', + }).encode(), + headers={'Content-Type': 'application/json'}) + error = traverse_obj(token_response, 'error') + if error: + raise ExtractorError(f'Failed to refresh access token: {error}', expected=True) + + return token_response + + def _oauth_authorize(self): + code_response = self._download_json( + 'https://www.youtube.com/o/oauth2/device/code', + video_id='oauth', + note='Initializing oauth authorization flow', + data=json.dumps({ + 'client_id': self._OAUTH_CLIENT_ID, + 'scope': self._OAUTH_SCOPE, + 'device_id': uuid.uuid4().hex, + 'device_model': 'ytlr::', + }).encode(), + headers={'Content-Type': 'application/json'}) + + verification_url = traverse_obj(code_response, 'verification_url', {str}) + user_code = traverse_obj(code_response, 'user_code', {str}) + if not verification_url or not user_code: + raise ExtractorError('Failed to initialize oauth authorization flow') + + self.to_screen(f'To give yt-dlp access to your account, go to {verification_url} and enter code {user_code}') + + while True: + # TODO: add a retry manager to retry 3 times if there is some sort of network/http error, and then give up. + token_response = self._download_json( + 'https://www.youtube.com/o/oauth2/token', + video_id='oauth', + note=False, + data=json.dumps({ + 'client_id': self._OAUTH_CLIENT_ID, + 'client_secret': self._OAUTH_CLIENT_SECRET, + 'code': code_response['device_code'], + 'grant_type': 'http://oauth.net/grant_type/device/1.0', + }).encode(), + headers={'Content-Type': 'application/json'}) + + error = traverse_obj(token_response, 'error', {str}) + if error: + if error == 'authorization_pending': + time.sleep(code_response['interval']) + continue + elif error == 'expired_token': + raise ExtractorError('oauth authorization flow timed out', expected=True) + else: + raise ExtractorError(f'Unknown error occurred during oauth authorization flow: {error}') + + return token_response + + def _update_oauth(self): + if self._OAUTH_ACCESS_TOKEN_EXPIRY and self._OAUTH_ACCESS_TOKEN_EXPIRY > time.time(): + return + + if not self._OAUTH_REFRESH_TOKEN: + return + + self._set_oauth_info(self._refresh_token(self._OAUTH_REFRESH_TOKEN), self._OAUTH_USER) + def _check_login_required(self): - if self._LOGIN_REQUIRED and not self._cookies_passed: + if self._LOGIN_REQUIRED and not self.is_authenticated: self.raise_login_required('Login details are needed to download this content', method='cookies') _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=' @@ -685,17 +825,6 @@ def _extract_session_index(*data): if session_index is not None: return session_index - # Deprecated? - def _extract_identity_token(self, ytcfg=None, webpage=None): - if ytcfg: - token = try_get(ytcfg, lambda x: x['ID_TOKEN'], str) - if token: - return token - if webpage: - return self._search_regex( - r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage, - 'identity token', default=None, fatal=False) - def _data_sync_id_to_delegated_session_id(self, data_sync_id): if not data_sync_id: return @@ -742,7 +871,7 @@ def _extract_visitor_data(self, *args): @functools.cached_property def is_authenticated(self): - return bool(self._generate_sapisidhash_header()) + return self._OAUTH_ACCESS_TOKEN or bool(self._generate_sapisidhash_header()) def extract_ytcfg(self, video_id, webpage): if not webpage: @@ -752,21 +881,19 @@ def extract_ytcfg(self, video_id, webpage): r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg', default='{}'), video_id, fatal=False) or {} - def generate_api_headers( - self, *, ytcfg=None, account_syncid=None, session_index=None, - visitor_data=None, identity_token=None, api_hostname=None, default_client='web'): + def _generate_oauth_headers(self): + self._update_oauth() + if self._OAUTH_ACCESS_TOKEN: + return { + 'Authorization': f'{self._OAUTH_ACCESS_TOKEN_TYPE} {self._OAUTH_ACCESS_TOKEN}', + } + return {} - origin = 'https://' + (self._select_api_hostname(api_hostname, default_client)) - headers = { - 'X-YouTube-Client-Name': str( - self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)), - 'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client), - 'Origin': origin, - 'X-Youtube-Identity-Token': identity_token or self._extract_identity_token(ytcfg), - 'X-Goog-PageId': account_syncid or self._extract_account_syncid(ytcfg), - 'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg), - 'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client), - } + def _generate_cookie_auth_headers(self, *, ytcfg=None, account_syncid=None, session_index=None, origin=None, **kwargs): + headers = {} + account_syncid = account_syncid or self._extract_account_syncid(ytcfg) + if account_syncid: + headers['X-Goog-AuthUser'] = account_syncid if session_index is None: session_index = self._extract_session_index(ytcfg) if account_syncid or session_index is not None: @@ -776,8 +903,32 @@ def generate_api_headers( if auth is not None: headers['Authorization'] = auth headers['X-Origin'] = origin + + return headers + + def generate_api_headers( + self, *, ytcfg=None, account_syncid=None, session_index=None, + visitor_data=None, api_hostname=None, default_client='web', **kwargs): + + origin = 'https://' + (self._select_api_hostname(api_hostname, default_client)) + headers = { + 'X-YouTube-Client-Name': str( + self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)), + 'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client), + 'Origin': origin, + 'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg), + 'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client), + **self._generate_oauth_headers(), + **self._generate_cookie_auth_headers(ytcfg=ytcfg, account_syncid=account_syncid, session_index=session_index, origin=origin), + } return filter_dict(headers) + def generate_webpage_headers(self, url): + # Do not need to add cookie auth headers to webpage requests - surprise - it uses hte cookies instead + if not urllib.parse.urlparse(url).netloc.endswith('youtube.com'): + return {} + return self._generate_oauth_headers() + def _download_ytcfg(self, client, video_id): url = { 'web': 'https://www.youtube.com', @@ -787,7 +938,8 @@ def _download_ytcfg(self, client, video_id): if not url: return {} webpage = self._download_webpage( - url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config') + url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config', + headers=self.generate_webpage_headers(url)) return self.extract_ytcfg(video_id, webpage) or {} @staticmethod @@ -3047,7 +3199,8 @@ def _load_player(self, video_id, player_url, fatal=True): code = self._download_webpage( player_url, video_id, fatal=fatal, note='Downloading player ' + player_id, - errnote=f'Download of {player_url} failed') + errnote=f'Download of {player_url} failed', + headers=self.generate_webpage_headers(player_url)) if code: self._code_cache[player_id] = code return self._code_cache.get(player_id) @@ -3330,7 +3483,8 @@ def _mark_watched(self, video_id, player_responses): self._download_webpage( url, video_id, f'Marking {label}watched', - 'Unable to mark watched', fatal=False) + 'Unable to mark watched', fatal=False, + headers=self.generate_webpage_headers(url)) @classmethod def _extract_from_webpage(cls, url, webpage): @@ -4321,7 +4475,7 @@ def _download_player_responses(self, url, smuggled_data, video_id, webpage_url): if pp: query['pp'] = pp webpage = self._download_webpage( - webpage_url, video_id, fatal=False, query=query) + webpage_url, video_id, fatal=False, query=query, headers=self.generate_webpage_headers(webpage_url)) master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg() @@ -5593,7 +5747,7 @@ def _extract_webpage(self, url, item_id, fatal=True): webpage, data = None, None for retry in self.RetryManager(fatal=fatal): try: - webpage = self._download_webpage(url, item_id, note='Downloading webpage') + webpage = self._download_webpage(url, item_id, note='Downloading webpage', headers=self.generate_webpage_headers(url)) data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {} except ExtractorError as e: if isinstance(e.cause, network_exceptions): @@ -6967,6 +7121,7 @@ def _real_extract(self, url, smuggled_data): raise ExtractorError('Unable to recognize tab page') +# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor class YoutubePlaylistIE(InfoExtractor): IE_DESC = 'YouTube playlists' _VALID_URL = r'''(?x)(?: @@ -7081,6 +7236,7 @@ def _real_extract(self, url): return self.url_result(url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id) +# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor class YoutubeYtBeIE(InfoExtractor): IE_DESC = 'youtu.be' _VALID_URL = rf'https?://youtu\.be/(?P[0-9A-Za-z_-]{{11}})/*?.*?\blist=(?P{YoutubeBaseInfoExtractor._PLAYLIST_ID_RE})' @@ -7132,6 +7288,7 @@ def _real_extract(self, url): }), ie=YoutubeTabIE.ie_key(), video_id=playlist_id) +# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor class YoutubeLivestreamEmbedIE(InfoExtractor): IE_DESC = 'YouTube livestream embeds' _VALID_URL = r'https?://(?:\w+\.)?youtube\.com/embed/live_stream/?\?(?:[^#]+&)?channel=(?P[^&#]+)' @@ -7147,6 +7304,7 @@ def _real_extract(self, url): ie=YoutubeTabIE.ie_key(), video_id=channel_id) +# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor class YoutubeYtUserIE(InfoExtractor): IE_DESC = 'YouTube user videos; "ytuser:" prefix' IE_NAME = 'youtube:user' @@ -7433,6 +7591,8 @@ def _real_extract(self, url): title = join_nonempty(query, section, delim=' - ') return self.playlist_result(self._search_results(query, params, default_client='web_music'), title, title) + # TODO: this extractor MUST subclass YoutubeBaseInfoExtractor + class YoutubeFeedsInfoExtractor(InfoExtractor): """ @@ -7453,6 +7613,8 @@ def _real_extract(self, url): return self.url_result( f'https://www.youtube.com/feed/{self._FEED_NAME}', ie=YoutubeTabIE.ie_key()) + # TODO: this extractor MUST subclass YoutubeBaseInfoExtractor + class YoutubeWatchLaterIE(InfoExtractor): IE_NAME = 'youtube:watchlater' @@ -7508,6 +7670,7 @@ class YoutubeHistoryIE(YoutubeFeedsInfoExtractor): }] +# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor class YoutubeShortsAudioPivotIE(InfoExtractor): IE_DESC = 'YouTube Shorts audio pivot (Shorts using audio of a given video)' IE_NAME = 'youtube:shorts:pivot:audio' @@ -7531,6 +7694,8 @@ def _real_extract(self, url): f'https://www.youtube.com/feed/sfv_audio_pivot?bp={self._generate_audio_pivot_params(video_id)}', ie=YoutubeTabIE) + # TODO: this extractor MUST subclass YoutubeBaseInfoExtractor + class YoutubeTruncatedURLIE(InfoExtractor): IE_NAME = 'youtube:truncated_url' @@ -7688,6 +7853,8 @@ def _real_extract(self, url): raise ExtractorError('Invalid cookie consent redirect URL', expected=True) return self.url_result(redirect_url) + # TODO: this extractor MUST subclass YoutubeBaseInfoExtractor + class YoutubeTruncatedIDIE(InfoExtractor): IE_NAME = 'youtube:truncated_id'