initial oauth implementation

This commit is contained in:
coletdjnz 2024-09-14 12:06:05 +12:00
parent 3a3bd00037
commit 1b6b43a1bb
No known key found for this signature in database
GPG key ID: 91984263BB39894A

View file

@ -18,6 +18,7 @@
import time
import traceback
import urllib.parse
import uuid
from .common import InfoExtractor, SearchInfoExtractor
from .openload import PhantomJSwrapper
@ -55,6 +56,7 @@
str_or_none,
str_to_int,
strftime_or_none,
time_seconds,
traverse_obj,
try_call,
try_get,
@ -526,6 +528,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
_YT_HANDLE_RE = r'@[\w.-]{3,30}' # https://support.google.com/youtube/answer/11585688?hl=en
_YT_CHANNEL_UCID_RE = r'UC[\w-]{22}'
_NETRC_MACHINE = 'youtube'
def ucid_or_none(self, ucid):
return self._search_regex(rf'^({self._YT_CHANNEL_UCID_RE})$', ucid, 'UC-id', default=None)
@ -584,8 +588,144 @@ def _real_initialize(self):
self._initialize_consent()
self._check_login_required()
def _perform_login(self, username, password):
auth_type, sep, user = (username or '').partition('+')
if user and sep != '+':
raise ExtractorError('Invalid username format. Expected "AUTH_TYPE+USER".', expected=True)
if auth_type != 'oauth':
raise ExtractorError(
'Login using username and password is not supported. '
'Use "--username oauth[+USER] --password \'\'" to login using an oauth, '
f'or else {self._login_hint(method="cookies")}', expected=True)
self._initialize_oauth(user, password)
_OAUTH_USER = None
_OAUTH_REFRESH_TOKEN = None
_OAUTH_ACCESS_TOKEN = None
_OAUTH_ACCESS_TOKEN_EXPIRY = None
_OAUTH_ACCESS_TOKEN_TYPE = None
# YouTube TV (TVHTML5) client
_OAUTH_CLIENT_ID = '861556708454-d6dlm3lh05idd8npek18k6be8ba3oc68.apps.googleusercontent.com'
_OAUTH_CLIENT_SECRET = 'SboVhoG9s0rNafixCSGGKXAT'
_OAUTH_SCOPE = 'http://gdata.youtube.com https://www.googleapis.com/auth/youtube'
def _set_oauth_info(self, token_response, user):
self._OAUTH_ACCESS_TOKEN = token_response['access_token']
self._OAUTH_ACCESS_TOKEN_TYPE = token_response['token_type']
refresh_token = traverse_obj(token_response, 'refresh_token', {str})
if refresh_token:
self.cache.store(self._NETRC_MACHINE, f'oauth_refresh_token_{user}', refresh_token)
self._OAUTH_REFRESH_TOKEN = refresh_token
self._OAUTH_ACCESS_TOKEN_EXPIRY = time_seconds(
seconds=traverse_obj(token_response, ('expires_in', {float_or_none}), default=300) - 10)
self._OAUTH_USER = user
def _initialize_oauth(self, user, refresh_token):
if not user:
user = 'default'
self.write_debug(f'Logging in using oauth with user "{user}"')
if refresh_token:
self._OAUTH_REFRESH_TOKEN = refresh_token
if not self._OAUTH_REFRESH_TOKEN:
self._OAUTH_REFRESH_TOKEN = self.cache.load(self._NETRC_MACHINE, f'oauth_refresh_token_{user}')
if self._OAUTH_REFRESH_TOKEN:
try:
token_response = self._refresh_token(self._OAUTH_REFRESH_TOKEN)
except ExtractorError as e:
self.report_warning(f'Failed to refresh access token: {e}. Reinitializing oauth authorization flow.')
token_response = self._oauth_authorize()
else:
token_response = self._oauth_authorize()
self._set_oauth_info(token_response, user)
self.write_debug(f'Logged in as "{user}" using oauth')
def _refresh_token(self, refresh_token):
token_response = self._download_json(
'https://www.youtube.com/o/oauth2/token',
video_id='oauth',
note='Refreshing oauth token',
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'client_secret': self._OAUTH_CLIENT_SECRET,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
}).encode(),
headers={'Content-Type': 'application/json'})
error = traverse_obj(token_response, 'error')
if error:
raise ExtractorError(f'Failed to refresh access token: {error}', expected=True)
return token_response
def _oauth_authorize(self):
code_response = self._download_json(
'https://www.youtube.com/o/oauth2/device/code',
video_id='oauth',
note='Initializing oauth authorization flow',
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'scope': self._OAUTH_SCOPE,
'device_id': uuid.uuid4().hex,
'device_model': 'ytlr::',
}).encode(),
headers={'Content-Type': 'application/json'})
verification_url = traverse_obj(code_response, 'verification_url', {str})
user_code = traverse_obj(code_response, 'user_code', {str})
if not verification_url or not user_code:
raise ExtractorError('Failed to initialize oauth authorization flow')
self.to_screen(f'To give yt-dlp access to your account, go to {verification_url} and enter code {user_code}')
while True:
# TODO: add a retry manager to retry 3 times if there is some sort of network/http error, and then give up.
token_response = self._download_json(
'https://www.youtube.com/o/oauth2/token',
video_id='oauth',
note=False,
data=json.dumps({
'client_id': self._OAUTH_CLIENT_ID,
'client_secret': self._OAUTH_CLIENT_SECRET,
'code': code_response['device_code'],
'grant_type': 'http://oauth.net/grant_type/device/1.0',
}).encode(),
headers={'Content-Type': 'application/json'})
error = traverse_obj(token_response, 'error', {str})
if error:
if error == 'authorization_pending':
time.sleep(code_response['interval'])
continue
elif error == 'expired_token':
raise ExtractorError('oauth authorization flow timed out', expected=True)
else:
raise ExtractorError(f'Unknown error occurred during oauth authorization flow: {error}')
return token_response
def _update_oauth(self):
if self._OAUTH_ACCESS_TOKEN_EXPIRY and self._OAUTH_ACCESS_TOKEN_EXPIRY > time.time():
return
if not self._OAUTH_REFRESH_TOKEN:
return
self._set_oauth_info(self._refresh_token(self._OAUTH_REFRESH_TOKEN), self._OAUTH_USER)
def _check_login_required(self):
if self._LOGIN_REQUIRED and not self._cookies_passed:
if self._LOGIN_REQUIRED and not self.is_authenticated:
self.raise_login_required('Login details are needed to download this content', method='cookies')
_YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*='
@ -685,17 +825,6 @@ def _extract_session_index(*data):
if session_index is not None:
return session_index
# Deprecated?
def _extract_identity_token(self, ytcfg=None, webpage=None):
if ytcfg:
token = try_get(ytcfg, lambda x: x['ID_TOKEN'], str)
if token:
return token
if webpage:
return self._search_regex(
r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage,
'identity token', default=None, fatal=False)
def _data_sync_id_to_delegated_session_id(self, data_sync_id):
if not data_sync_id:
return
@ -742,7 +871,7 @@ def _extract_visitor_data(self, *args):
@functools.cached_property
def is_authenticated(self):
return bool(self._generate_sapisidhash_header())
return self._OAUTH_ACCESS_TOKEN or bool(self._generate_sapisidhash_header())
def extract_ytcfg(self, video_id, webpage):
if not webpage:
@ -752,21 +881,19 @@ def extract_ytcfg(self, video_id, webpage):
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
default='{}'), video_id, fatal=False) or {}
def generate_api_headers(
self, *, ytcfg=None, account_syncid=None, session_index=None,
visitor_data=None, identity_token=None, api_hostname=None, default_client='web'):
def _generate_oauth_headers(self):
self._update_oauth()
if self._OAUTH_ACCESS_TOKEN:
return {
'Authorization': f'{self._OAUTH_ACCESS_TOKEN_TYPE} {self._OAUTH_ACCESS_TOKEN}',
}
return {}
origin = 'https://' + (self._select_api_hostname(api_hostname, default_client))
headers = {
'X-YouTube-Client-Name': str(
self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)),
'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client),
'Origin': origin,
'X-Youtube-Identity-Token': identity_token or self._extract_identity_token(ytcfg),
'X-Goog-PageId': account_syncid or self._extract_account_syncid(ytcfg),
'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg),
'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client),
}
def _generate_cookie_auth_headers(self, *, ytcfg=None, account_syncid=None, session_index=None, origin=None, **kwargs):
headers = {}
account_syncid = account_syncid or self._extract_account_syncid(ytcfg)
if account_syncid:
headers['X-Goog-AuthUser'] = account_syncid
if session_index is None:
session_index = self._extract_session_index(ytcfg)
if account_syncid or session_index is not None:
@ -776,8 +903,32 @@ def generate_api_headers(
if auth is not None:
headers['Authorization'] = auth
headers['X-Origin'] = origin
return headers
def generate_api_headers(
self, *, ytcfg=None, account_syncid=None, session_index=None,
visitor_data=None, api_hostname=None, default_client='web', **kwargs):
origin = 'https://' + (self._select_api_hostname(api_hostname, default_client))
headers = {
'X-YouTube-Client-Name': str(
self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)),
'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client),
'Origin': origin,
'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg),
'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client),
**self._generate_oauth_headers(),
**self._generate_cookie_auth_headers(ytcfg=ytcfg, account_syncid=account_syncid, session_index=session_index, origin=origin),
}
return filter_dict(headers)
def generate_webpage_headers(self, url):
# Do not need to add cookie auth headers to webpage requests - surprise - it uses hte cookies instead
if not urllib.parse.urlparse(url).netloc.endswith('youtube.com'):
return {}
return self._generate_oauth_headers()
def _download_ytcfg(self, client, video_id):
url = {
'web': 'https://www.youtube.com',
@ -787,7 +938,8 @@ def _download_ytcfg(self, client, video_id):
if not url:
return {}
webpage = self._download_webpage(
url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config')
url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config',
headers=self.generate_webpage_headers(url))
return self.extract_ytcfg(video_id, webpage) or {}
@staticmethod
@ -3047,7 +3199,8 @@ def _load_player(self, video_id, player_url, fatal=True):
code = self._download_webpage(
player_url, video_id, fatal=fatal,
note='Downloading player ' + player_id,
errnote=f'Download of {player_url} failed')
errnote=f'Download of {player_url} failed',
headers=self.generate_webpage_headers(player_url))
if code:
self._code_cache[player_id] = code
return self._code_cache.get(player_id)
@ -3330,7 +3483,8 @@ def _mark_watched(self, video_id, player_responses):
self._download_webpage(
url, video_id, f'Marking {label}watched',
'Unable to mark watched', fatal=False)
'Unable to mark watched', fatal=False,
headers=self.generate_webpage_headers(url))
@classmethod
def _extract_from_webpage(cls, url, webpage):
@ -4321,7 +4475,7 @@ def _download_player_responses(self, url, smuggled_data, video_id, webpage_url):
if pp:
query['pp'] = pp
webpage = self._download_webpage(
webpage_url, video_id, fatal=False, query=query)
webpage_url, video_id, fatal=False, query=query, headers=self.generate_webpage_headers(webpage_url))
master_ytcfg = self.extract_ytcfg(video_id, webpage) or self._get_default_ytcfg()
@ -5593,7 +5747,7 @@ def _extract_webpage(self, url, item_id, fatal=True):
webpage, data = None, None
for retry in self.RetryManager(fatal=fatal):
try:
webpage = self._download_webpage(url, item_id, note='Downloading webpage')
webpage = self._download_webpage(url, item_id, note='Downloading webpage', headers=self.generate_webpage_headers(url))
data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {}
except ExtractorError as e:
if isinstance(e.cause, network_exceptions):
@ -6967,6 +7121,7 @@ def _real_extract(self, url, smuggled_data):
raise ExtractorError('Unable to recognize tab page')
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubePlaylistIE(InfoExtractor):
IE_DESC = 'YouTube playlists'
_VALID_URL = r'''(?x)(?:
@ -7081,6 +7236,7 @@ def _real_extract(self, url):
return self.url_result(url, ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeYtBeIE(InfoExtractor):
IE_DESC = 'youtu.be'
_VALID_URL = rf'https?://youtu\.be/(?P<id>[0-9A-Za-z_-]{{11}})/*?.*?\blist=(?P<playlist_id>{YoutubeBaseInfoExtractor._PLAYLIST_ID_RE})'
@ -7132,6 +7288,7 @@ def _real_extract(self, url):
}), ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeLivestreamEmbedIE(InfoExtractor):
IE_DESC = 'YouTube livestream embeds'
_VALID_URL = r'https?://(?:\w+\.)?youtube\.com/embed/live_stream/?\?(?:[^#]+&)?channel=(?P<id>[^&#]+)'
@ -7147,6 +7304,7 @@ def _real_extract(self, url):
ie=YoutubeTabIE.ie_key(), video_id=channel_id)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeYtUserIE(InfoExtractor):
IE_DESC = 'YouTube user videos; "ytuser:" prefix'
IE_NAME = 'youtube:user'
@ -7433,6 +7591,8 @@ def _real_extract(self, url):
title = join_nonempty(query, section, delim=' - ')
return self.playlist_result(self._search_results(query, params, default_client='web_music'), title, title)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeFeedsInfoExtractor(InfoExtractor):
"""
@ -7453,6 +7613,8 @@ def _real_extract(self, url):
return self.url_result(
f'https://www.youtube.com/feed/{self._FEED_NAME}', ie=YoutubeTabIE.ie_key())
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeWatchLaterIE(InfoExtractor):
IE_NAME = 'youtube:watchlater'
@ -7508,6 +7670,7 @@ class YoutubeHistoryIE(YoutubeFeedsInfoExtractor):
}]
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeShortsAudioPivotIE(InfoExtractor):
IE_DESC = 'YouTube Shorts audio pivot (Shorts using audio of a given video)'
IE_NAME = 'youtube:shorts:pivot:audio'
@ -7531,6 +7694,8 @@ def _real_extract(self, url):
f'https://www.youtube.com/feed/sfv_audio_pivot?bp={self._generate_audio_pivot_params(video_id)}',
ie=YoutubeTabIE)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeTruncatedURLIE(InfoExtractor):
IE_NAME = 'youtube:truncated_url'
@ -7688,6 +7853,8 @@ def _real_extract(self, url):
raise ExtractorError('Invalid cookie consent redirect URL', expected=True)
return self.url_result(redirect_url)
# TODO: this extractor MUST subclass YoutubeBaseInfoExtractor
class YoutubeTruncatedIDIE(InfoExtractor):
IE_NAME = 'youtube:truncated_id'