[compat] Remove deprecated functions from core code

This commit is contained in:
pukkandan 2022-06-24 16:24:43 +05:30
parent 54007a45f1
commit 14f25df2b6
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
30 changed files with 203 additions and 245 deletions

View file

@ -14,10 +14,10 @@
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import urllib.parse
import urllib.request
from test.helper import gettestcases
from yt_dlp.utils import compat_urllib_parse_urlparse
if len(sys.argv) > 1:
METHOD = 'LIST'
@ -38,7 +38,7 @@
RESULT = 'porn' in webpage.lower()
elif METHOD == 'LIST':
domain = compat_urllib_parse_urlparse(test['url']).netloc
domain = urllib.parse.urlparse(test['url']).netloc
if not domain:
print('\nFail: {}'.format(test['name']))
continue

View file

@ -9,7 +9,7 @@
import yt_dlp.extractor
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name, compat_str
from yt_dlp.compat import compat_os_name
from yt_dlp.utils import preferredencoding, write_string
if 'pytest' in sys.modules:
@ -96,29 +96,29 @@ def gettestcases(include_onlymatching=False):
def expect_value(self, got, expected, field):
if isinstance(expected, compat_str) and expected.startswith('re:'):
if isinstance(expected, str) and expected.startswith('re:'):
match_str = expected[len('re:'):]
match_rex = re.compile(match_str)
self.assertTrue(
isinstance(got, compat_str),
f'Expected a {compat_str.__name__} object, but got {type(got).__name__} for field {field}')
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
match_rex.match(got),
f'field {field} (value: {got!r}) should match {match_str!r}')
elif isinstance(expected, compat_str) and expected.startswith('startswith:'):
elif isinstance(expected, str) and expected.startswith('startswith:'):
start_str = expected[len('startswith:'):]
self.assertTrue(
isinstance(got, compat_str),
f'Expected a {compat_str.__name__} object, but got {type(got).__name__} for field {field}')
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
got.startswith(start_str),
f'field {field} (value: {got!r}) should start with {start_str!r}')
elif isinstance(expected, compat_str) and expected.startswith('contains:'):
elif isinstance(expected, str) and expected.startswith('contains:'):
contains_str = expected[len('contains:'):]
self.assertTrue(
isinstance(got, compat_str),
f'Expected a {compat_str.__name__} object, but got {type(got).__name__} for field {field}')
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
contains_str in got,
f'field {field} (value: {got!r}) should contain {contains_str!r}')
@ -142,12 +142,12 @@ def expect_value(self, got, expected, field):
index, field, type_expected, type_got))
expect_value(self, item_got, item_expected, field)
else:
if isinstance(expected, compat_str) and expected.startswith('md5:'):
if isinstance(expected, str) and expected.startswith('md5:'):
self.assertTrue(
isinstance(got, compat_str),
isinstance(got, str),
f'Expected field {field} to be a unicode object, but got value {got!r} of type {type(got)!r}')
got = 'md5:' + md5(got)
elif isinstance(expected, compat_str) and re.match(r'^(?:min|max)?count:\d+', expected):
elif isinstance(expected, str) and re.match(r'^(?:min|max)?count:\d+', expected):
self.assertTrue(
isinstance(got, (list, dict)),
f'Expected field {field} to be a list or a dict, but it is of type {type(got).__name__}')
@ -236,7 +236,7 @@ def expect_info_dict(self, got_dict, expected_dict):
missing_keys = set(test_info_dict.keys()) - set(expected_dict.keys())
if missing_keys:
def _repr(v):
if isinstance(v, compat_str):
if isinstance(v, str):
return "'%s'" % v.replace('\\', '\\\\').replace("'", "\\'").replace('\n', '\\n')
elif isinstance(v, type):
return v.__name__

View file

@ -14,7 +14,7 @@
from test.helper import FakeYDL, assertRegexpMatches
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name, compat_str
from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE
from yt_dlp.extractor.common import InfoExtractor
from yt_dlp.postprocessor.common import PostProcessor
@ -1185,7 +1185,7 @@ class PlaylistIE(InfoExtractor):
def _entries(self):
for n in range(3):
video_id = compat_str(n)
video_id = str(n)
yield {
'_type': 'url_transparent',
'ie_key': VideoIE.ie_key(),

View file

@ -15,7 +15,6 @@
from yt_dlp.compat import (
compat_etree_fromstring,
compat_expanduser,
compat_str,
compat_urllib_parse_unquote,
compat_urllib_parse_urlencode,
)
@ -82,11 +81,11 @@ def test_compat_etree_fromstring(self):
</root>
'''
doc = compat_etree_fromstring(xml.encode())
self.assertTrue(isinstance(doc.attrib['foo'], compat_str))
self.assertTrue(isinstance(doc.attrib['spam'], compat_str))
self.assertTrue(isinstance(doc.find('normal').text, compat_str))
self.assertTrue(isinstance(doc.find('chinese').text, compat_str))
self.assertTrue(isinstance(doc.find('foo/bar').text, compat_str))
self.assertTrue(isinstance(doc.attrib['foo'], str))
self.assertTrue(isinstance(doc.attrib['spam'], str))
self.assertTrue(isinstance(doc.find('normal').text, str))
self.assertTrue(isinstance(doc.find('chinese').text, str))
self.assertTrue(isinstance(doc.find('foo/bar').text, str))
def test_compat_etree_fromstring_doctype(self):
xml = '''<?xml version="1.0"?>

View file

@ -26,7 +26,6 @@
)
import yt_dlp.YoutubeDL # isort: split
from yt_dlp.compat import compat_HTTPError
from yt_dlp.extractor import get_info_extractor
from yt_dlp.utils import (
DownloadError,
@ -168,7 +167,7 @@ def try_rm_tcs_files(tcs=None):
force_generic_extractor=params.get('force_generic_extractor', False))
except (DownloadError, ExtractorError) as err:
# Check if the exception is not a network related one
if not err.exc_info[0] in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
if not err.exc_info[0] in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine) or (err.exc_info[0] == urllib.error.HTTPError and err.exc_info[1].code == 503):
raise
if try_num == RETRIES:

View file

@ -13,7 +13,6 @@
import urllib.request
from test.helper import FakeYDL, get_params, is_download_test
from yt_dlp.compat import compat_str
@is_download_test
@ -102,13 +101,13 @@ def _get_ip(self, protocol):
return ydl.urlopen('http://yt-dl.org/ip').read().decode()
def test_socks4(self):
self.assertTrue(isinstance(self._get_ip('socks4'), compat_str))
self.assertTrue(isinstance(self._get_ip('socks4'), str))
def test_socks4a(self):
self.assertTrue(isinstance(self._get_ip('socks4a'), compat_str))
self.assertTrue(isinstance(self._get_ip('socks4a'), str))
def test_socks5(self):
self.assertTrue(isinstance(self._get_ip('socks5'), compat_str))
self.assertTrue(isinstance(self._get_ip('socks5'), str))
if __name__ == '__main__':

View file

@ -14,7 +14,6 @@
import urllib.request
from test.helper import FakeYDL, is_download_test
from yt_dlp.compat import compat_str
from yt_dlp.extractor import YoutubeIE
from yt_dlp.jsinterp import JSInterpreter
@ -159,7 +158,7 @@ def test_func(self):
def signature(jscode, sig_input):
func = YoutubeIE(FakeYDL())._parse_sig_js(jscode)
src_sig = (
compat_str(string.printable[:sig_input])
str(string.printable[:sig_input])
if isinstance(sig_input, int) else sig_input)
return func(src_sig)

View file

@ -26,7 +26,7 @@
from .cache import Cache
from .compat import HAS_LEGACY as compat_has_legacy
from .compat import compat_os_name, compat_shlex_quote, compat_str
from .compat import compat_os_name, compat_shlex_quote
from .cookies import load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
@ -791,7 +791,7 @@ def _bidi_workaround(self, message):
return message
assert hasattr(self, '_output_process')
assert isinstance(message, compat_str)
assert isinstance(message, str)
line_count = message.count('\n') + 1
self._output_process.stdin.write((message + '\n').encode())
self._output_process.stdin.flush()
@ -827,7 +827,7 @@ def to_screen(self, message, skip_eol=False, quiet=None):
def to_stderr(self, message, only_once=False):
"""Print message to stderr"""
assert isinstance(message, compat_str)
assert isinstance(message, str)
if self.params.get('logger'):
self.params['logger'].error(message)
else:
@ -1562,7 +1562,7 @@ def process_ie_result(self, ie_result, download=True, extra_info=None):
additional_urls = (ie_result or {}).get('additional_urls')
if additional_urls:
# TODO: Improve MetadataParserPP to allow setting a list
if isinstance(additional_urls, compat_str):
if isinstance(additional_urls, str):
additional_urls = [additional_urls]
self.to_screen(
'[info] %s: %d additional URL(s) requested' % (ie_result['id'], len(additional_urls)))
@ -2355,10 +2355,10 @@ def report_force_conversion(field, field_not, conversion):
def sanitize_string_field(info, string_field):
field = info.get(string_field)
if field is None or isinstance(field, compat_str):
if field is None or isinstance(field, str):
return
report_force_conversion(string_field, 'a string', 'string')
info[string_field] = compat_str(field)
info[string_field] = str(field)
def sanitize_numeric_fields(info):
for numeric_field in self._NUMERIC_FIELDS:
@ -2461,7 +2461,7 @@ def is_wellformed(f):
sanitize_numeric_fields(format)
format['url'] = sanitize_url(format['url'])
if not format.get('format_id'):
format['format_id'] = compat_str(i)
format['format_id'] = str(i)
else:
# Sanitize format_id from characters used in format selector expression
format['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', format['format_id'])

View file

@ -1,6 +1,7 @@
import base64
from math import ceil
from .compat import compat_b64decode, compat_ord
from .compat import compat_ord
from .dependencies import Cryptodome_AES
from .utils import bytes_to_intlist, intlist_to_bytes
@ -264,7 +265,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
"""
NONCE_LENGTH_BYTES = 8
data = bytes_to_intlist(compat_b64decode(data))
data = bytes_to_intlist(base64.b64decode(data))
password = bytes_to_intlist(password.encode())
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))

View file

@ -1,3 +1,4 @@
import base64
import contextlib
import ctypes
import http.cookiejar
@ -18,7 +19,6 @@
aes_gcm_decrypt_and_verify_bytes,
unpad_pkcs7,
)
from .compat import compat_b64decode
from .dependencies import (
_SECRETSTORAGE_UNAVAILABLE_REASON,
secretstorage,
@ -836,7 +836,7 @@ def _get_windows_v10_key(browser_root, logger):
except KeyError:
logger.error('no encrypted key in Local State')
return None
encrypted_key = compat_b64decode(base64_key)
encrypted_key = base64.b64decode(base64_key)
prefix = b'DPAPI'
if not encrypted_key.startswith(prefix):
logger.error('invalid key')

View file

@ -6,7 +6,7 @@
import time
from .fragment import FragmentFD
from ..compat import functools # isort: split
from ..compat import functools
from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor
from ..utils import (
Popen,

View file

@ -1,16 +1,13 @@
import base64
import io
import itertools
import struct
import time
import urllib.error
import urllib.parse
from .fragment import FragmentFD
from ..compat import (
compat_b64decode,
compat_etree_fromstring,
compat_urllib_parse_urlparse,
compat_urlparse,
)
from ..compat import compat_etree_fromstring
from ..utils import fix_xml_ampersands, xpath_text
@ -300,12 +297,12 @@ def _parse_bootstrap_node(self, node, base_url):
# 1. http://live-1-1.rutube.ru/stream/1024/HDS/SD/C2NKsS85HQNckgn5HdEmOQ/1454167650/S-s604419906/move/four/dirs/upper/1024-576p.f4m
bootstrap_url = node.get('url')
if bootstrap_url:
bootstrap_url = compat_urlparse.urljoin(
bootstrap_url = urllib.parse.urljoin(
base_url, bootstrap_url)
boot_info = self._get_bootstrap_from_url(bootstrap_url)
else:
bootstrap_url = None
bootstrap = compat_b64decode(node.text)
bootstrap = base64.b64decode(node.text)
boot_info = read_bootstrap_info(bootstrap)
return boot_info, bootstrap_url
@ -335,14 +332,14 @@ def real_download(self, filename, info_dict):
# Prefer baseURL for relative URLs as per 11.2 of F4M 3.0 spec.
man_base_url = get_base_url(doc) or man_url
base_url = compat_urlparse.urljoin(man_base_url, media.attrib['url'])
base_url = urllib.parse.urljoin(man_base_url, media.attrib['url'])
bootstrap_node = doc.find(_add_ns('bootstrapInfo'))
boot_info, bootstrap_url = self._parse_bootstrap_node(
bootstrap_node, man_base_url)
live = boot_info['live']
metadata_node = media.find(_add_ns('metadata'))
if metadata_node is not None:
metadata = compat_b64decode(metadata_node.text)
metadata = base64.b64decode(metadata_node.text)
else:
metadata = None
@ -370,7 +367,7 @@ def real_download(self, filename, info_dict):
if not live:
write_metadata_tag(dest_stream, metadata)
base_url_parsed = compat_urllib_parse_urlparse(base_url)
base_url_parsed = urllib.parse.urlparse(base_url)
self._start_frag_download(ctx, info_dict)

View file

@ -1,12 +1,12 @@
import binascii
import io
import re
import urllib.parse
from . import get_suitable_downloader
from .external import FFmpegFD
from .fragment import FragmentFD
from .. import webvtt
from ..compat import compat_urlparse
from ..dependencies import Cryptodome_AES
from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
@ -140,7 +140,7 @@ def is_ad_fragment_end(s):
extra_query = None
extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url')
if extra_param_to_segment_url:
extra_query = compat_urlparse.parse_qs(extra_param_to_segment_url)
extra_query = urllib.parse.parse_qs(extra_param_to_segment_url)
i = 0
media_sequence = 0
decrypt_info = {'METHOD': 'NONE'}
@ -162,7 +162,7 @@ def is_ad_fragment_end(s):
frag_url = (
line
if re.match(r'^https?://', line)
else compat_urlparse.urljoin(man_url, line))
else urllib.parse.urljoin(man_url, line))
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
@ -187,7 +187,7 @@ def is_ad_fragment_end(s):
frag_url = (
map_info.get('URI')
if re.match(r'^https?://', map_info.get('URI'))
else compat_urlparse.urljoin(man_url, map_info.get('URI')))
else urllib.parse.urljoin(man_url, map_info.get('URI')))
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
@ -215,7 +215,7 @@ def is_ad_fragment_end(s):
if 'IV' in decrypt_info:
decrypt_info['IV'] = binascii.unhexlify(decrypt_info['IV'][2:].zfill(32))
if not re.match(r'^https?://', decrypt_info['URI']):
decrypt_info['URI'] = compat_urlparse.urljoin(
decrypt_info['URI'] = urllib.parse.urljoin(
man_url, decrypt_info['URI'])
if extra_query:
decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query)

View file

@ -4,7 +4,6 @@
import time
from .common import FileDownloader
from ..compat import compat_str
from ..utils import (
Popen,
check_executable,
@ -143,7 +142,7 @@ def run_rtmpdump(args):
if isinstance(conn, list):
for entry in conn:
basic_args += ['--conn', entry]
elif isinstance(conn, compat_str):
elif isinstance(conn, str):
basic_args += ['--conn', conn]
if protocol is not None:
basic_args += ['--protocol', protocol]

View file

@ -7,13 +7,13 @@
import re
import struct
import time
import urllib.parse
import urllib.request
import urllib.response
import uuid
from .common import InfoExtractor
from ..aes import aes_ecb_decrypt
from ..compat import compat_urllib_parse_urlparse
from ..utils import (
ExtractorError,
bytes_to_intlist,
@ -137,7 +137,7 @@ def _get_videokey_from_ticket(self, ticket):
def abematv_license_open(self, url):
url = request_to_url(url)
ticket = compat_urllib_parse_urlparse(url).netloc
ticket = urllib.parse.urlparse(url).netloc
response_data = self._get_videokey_from_ticket(ticket)
return urllib.response.addinfourl(io.BytesIO(response_data), headers={
'Content-Length': len(response_data),

View file

@ -1,8 +1,8 @@
import random
from .common import InfoExtractor
from ..utils import ExtractorError, try_get, compat_str, str_or_none
from ..compat import compat_urllib_parse_unquote
from ..compat import compat_str, compat_urllib_parse_unquote
from ..utils import ExtractorError, str_or_none, try_get
class AudiusBaseIE(InfoExtractor):

View file

@ -13,19 +13,12 @@
import random
import sys
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree
from ..compat import functools, re # isort: split
from ..compat import (
compat_etree_fromstring,
compat_expanduser,
compat_os_name,
compat_str,
compat_urllib_parse_unquote,
compat_urllib_parse_urlencode,
compat_urlparse,
)
from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
from ..downloader import FileDownloader
from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
@ -834,7 +827,7 @@ def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=
"""
# Strip hashes from the URL (#1038)
if isinstance(url_or_request, (compat_str, str)):
if isinstance(url_or_request, str):
url_or_request = url_or_request.partition('#')[0]
urlh = self._request_webpage(url_or_request, video_id, note, errnote, fatal, data=data, headers=headers, query=query, expected_status=expected_status)
@ -1427,7 +1420,7 @@ def _search_json_ld(self, html, video_id, expected_type=None, *, fatal=True, def
return {}
def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
if isinstance(json_ld, compat_str):
if isinstance(json_ld, str):
json_ld = self._parse_json(json_ld, video_id, fatal=fatal)
if not json_ld:
return {}
@ -1517,7 +1510,7 @@ def extract_video_object(e):
# both types can have 'name' property(inherited from 'Thing' type). [1]
# however some websites are using 'Text' type instead.
# 1. https://schema.org/VideoObject
'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None,
'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, str) else None,
'filesize': int_or_none(float_or_none(e.get('contentSize'))),
'tbr': int_or_none(e.get('bitrate')),
'width': int_or_none(e.get('width')),
@ -2166,7 +2159,7 @@ def _parse_m3u8_formats_and_subtitles(
]), m3u8_doc)
def format_url(url):
return url if re.match(r'^https?://', url) else compat_urlparse.urljoin(m3u8_url, url)
return url if re.match(r'^https?://', url) else urllib.parse.urljoin(m3u8_url, url)
if self.get_param('hls_split_discontinuity', False):
def _extract_m3u8_playlist_indices(manifest_url=None, m3u8_doc=None):
@ -2539,7 +2532,7 @@ def _parse_smil_formats(self, smil, smil_url, video_id, namespace=None, f4m_para
})
continue
src_url = src if src.startswith('http') else compat_urlparse.urljoin(base, src)
src_url = src if src.startswith('http') else urllib.parse.urljoin(base, src)
src_url = src_url.strip()
if proto == 'm3u8' or src_ext == 'm3u8':
@ -2562,7 +2555,7 @@ def _parse_smil_formats(self, smil, smil_url, video_id, namespace=None, f4m_para
'plugin': 'flowplayer-3.2.0.1',
}
f4m_url += '&' if '?' in f4m_url else '?'
f4m_url += compat_urllib_parse_urlencode(f4m_params)
f4m_url += urllib.parse.urlencode(f4m_params)
formats.extend(self._extract_f4m_formats(f4m_url, video_id, f4m_id='hds', fatal=False))
elif src_ext == 'mpd':
formats.extend(self._extract_mpd_formats(
@ -2832,7 +2825,7 @@ def extract_Initialization(source):
if re.match(r'^https?://', base_url):
break
if mpd_base_url and base_url.startswith('/'):
base_url = compat_urlparse.urljoin(mpd_base_url, base_url)
base_url = urllib.parse.urljoin(mpd_base_url, base_url)
elif mpd_base_url and not re.match(r'^https?://', base_url):
if not mpd_base_url.endswith('/'):
mpd_base_url += '/'
@ -3102,7 +3095,7 @@ def _parse_ism_formats_and_subtitles(self, ism_doc, ism_url, ism_id=None):
sampling_rate = int_or_none(track.get('SamplingRate'))
track_url_pattern = re.sub(r'{[Bb]itrate}', track.attrib['Bitrate'], url_pattern)
track_url_pattern = compat_urlparse.urljoin(ism_url, track_url_pattern)
track_url_pattern = urllib.parse.urljoin(ism_url, track_url_pattern)
fragments = []
fragment_ctx = {
@ -3121,7 +3114,7 @@ def _parse_ism_formats_and_subtitles(self, ism_doc, ism_url, ism_id=None):
fragment_ctx['duration'] = (next_fragment_time - fragment_ctx['time']) / fragment_repeat
for _ in range(fragment_repeat):
fragments.append({
'url': re.sub(r'{start[ _]time}', compat_str(fragment_ctx['time']), track_url_pattern),
'url': re.sub(r'{start[ _]time}', str(fragment_ctx['time']), track_url_pattern),
'duration': fragment_ctx['duration'] / stream_timescale,
})
fragment_ctx['time'] += fragment_ctx['duration']
@ -3365,7 +3358,7 @@ def _extract_akamai_formats_and_subtitles(self, manifest_url, video_id, hosts={}
return formats, subtitles
def _extract_wowza_formats(self, url, video_id, m3u8_entry_protocol='m3u8_native', skip_protocols=[]):
query = compat_urlparse.urlparse(url).query
query = urllib.parse.urlparse(url).query
url = re.sub(r'/(?:manifest|playlist|jwplayer)\.(?:m3u8|f4m|mpd|smil)', '', url)
mobj = re.search(
r'(?:(?:http|rtmp|rtsp)(?P<s>s)?:)?(?P<url>//[^?]+)', url)
@ -3471,7 +3464,7 @@ def _parse_jwplayer_data(self, jwplayer_data, video_id=None, require_title=True,
if not isinstance(track, dict):
continue
track_kind = track.get('kind')
if not track_kind or not isinstance(track_kind, compat_str):
if not track_kind or not isinstance(track_kind, str):
continue
if track_kind.lower() not in ('captions', 'subtitles'):
continue
@ -3544,7 +3537,7 @@ def _parse_jwplayer_formats(self, jwplayer_sources_data, video_id=None,
# Often no height is provided but there is a label in
# format like "1080p", "720p SD", or 1080.
height = int_or_none(self._search_regex(
r'^(\d{3,4})[pP]?(?:\b|$)', compat_str(source.get('label') or ''),
r'^(\d{3,4})[pP]?(?:\b|$)', str(source.get('label') or ''),
'height', default=None))
a_format = {
'url': source_url,
@ -3770,10 +3763,10 @@ def geo_verification_headers(self):
return headers
def _generic_id(self, url):
return compat_urllib_parse_unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
return urllib.parse.unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
def _generic_title(self, url):
return compat_urllib_parse_unquote(os.path.splitext(url_basename(url))[0])
return urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):

View file

@ -1,5 +1,6 @@
import urllib.parse
from .common import InfoExtractor
from ..compat import compat_urlparse
class RtmpIE(InfoExtractor):
@ -23,7 +24,7 @@ def _real_extract(self, url):
'formats': [{
'url': url,
'ext': 'flv',
'format_id': compat_urlparse.urlparse(url).scheme,
'format_id': urllib.parse.urlparse(url).scheme,
}],
}

View file

@ -1,12 +1,8 @@
import re
from .common import InfoExtractor
from ..utils import (
int_or_none,
urlencode_postdata,
compat_str,
ExtractorError,
)
from ..compat import compat_str
from ..utils import ExtractorError, int_or_none, urlencode_postdata
class CuriosityStreamBaseIE(InfoExtractor):
@ -50,7 +46,7 @@ class CuriosityStreamIE(CuriosityStreamBaseIE):
IE_NAME = 'curiositystream'
_VALID_URL = r'https?://(?:app\.)?curiositystream\.com/video/(?P<id>\d+)'
_TESTS = [{
'url': 'https://app.curiositystream.com/video/2',
'url': 'http://app.curiositystream.com/video/2',
'info_dict': {
'id': '2',
'ext': 'mp4',

View file

@ -3,8 +3,8 @@
import re
import urllib.parse
from .common import InfoExtractor
from .adobepass import AdobePassIE
from .common import InfoExtractor
from .once import OnceIE
from ..utils import (
determine_ext,
@ -197,7 +197,7 @@ class ESPNArticleIE(InfoExtractor):
@classmethod
def suitable(cls, url):
return False if (ESPNIE.suitable(url) or WatchESPNIE.suitable(url)) else super(ESPNArticleIE, cls).suitable(url)
return False if (ESPNIE.suitable(url) or WatchESPNIE.suitable(url)) else super().suitable(url)
def _real_extract(self, url):
video_id = self._match_id(url)

View file

@ -1,5 +1,6 @@
import os
import re
import urllib.parse
import xml.etree.ElementTree
from .ant1newsgr import Ant1NewsGrEmbedIE
@ -106,12 +107,7 @@
from .youporn import YouPornIE
from .youtube import YoutubeIE
from .zype import ZypeIE
from ..compat import (
compat_etree_fromstring,
compat_str,
compat_urllib_parse_unquote,
compat_urlparse,
)
from ..compat import compat_etree_fromstring
from ..utils import (
KNOWN_EXTENSIONS,
ExtractorError,
@ -2703,7 +2699,7 @@ def _extract_camtasia(self, url, video_id, webpage):
title = self._html_search_meta('DC.title', webpage, fatal=True)
camtasia_url = compat_urlparse.urljoin(url, camtasia_cfg)
camtasia_url = urllib.parse.urljoin(url, camtasia_cfg)
camtasia_cfg = self._download_xml(
camtasia_url, video_id,
note='Downloading camtasia configuration',
@ -2719,7 +2715,7 @@ def _extract_camtasia(self, url, video_id, webpage):
entries.append({
'id': os.path.splitext(url_n.text.rpartition('/')[2])[0],
'title': f'{title} - {n.tag}',
'url': compat_urlparse.urljoin(url, url_n.text),
'url': urllib.parse.urljoin(url, url_n.text),
'duration': float_or_none(n.find('./duration').text),
})
@ -2771,7 +2767,7 @@ def _real_extract(self, url):
if url.startswith('//'):
return self.url_result(self.http_scheme() + url)
parsed_url = compat_urlparse.urlparse(url)
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme:
default_search = self.get_param('default_search')
if default_search is None:
@ -2847,7 +2843,7 @@ def _real_extract(self, url):
m = re.match(r'^(?P<type>audio|video|application(?=/(?:ogg$|(?:vnd\.apple\.|x-)?mpegurl)))/(?P<format_id>[^;\s]+)', content_type)
if m:
self.report_detected('direct video link')
format_id = compat_str(m.group('format_id'))
format_id = str(m.group('format_id'))
subtitles = {}
if format_id.endswith('mpegurl'):
formats, subtitles = self._extract_m3u8_formats_and_subtitles(url, video_id, 'mp4')
@ -2966,7 +2962,7 @@ def _real_extract(self, url):
# Unescaping the whole page allows to handle those cases in a generic way
# FIXME: unescaping the whole page may break URLs, commenting out for now.
# There probably should be a second run of generic extractor on unescaped webpage.
# webpage = compat_urllib_parse_unquote(webpage)
# webpage = urllib.parse.unquote(webpage)
# Unescape squarespace embeds to be detected by generic extractor,
# see https://github.com/ytdl-org/youtube-dl/issues/21294
@ -3239,7 +3235,7 @@ def _real_extract(self, url):
return self.url_result(mobj.group('url'))
mobj = re.search(r'class=["\']embedly-embed["\'][^>]src=["\'][^"\']*url=(?P<url>[^&]+)', webpage)
if mobj is not None:
return self.url_result(compat_urllib_parse_unquote(mobj.group('url')))
return self.url_result(urllib.parse.unquote(mobj.group('url')))
# Look for funnyordie embed
matches = re.findall(r'<iframe[^>]+?src="(https?://(?:www\.)?funnyordie\.com/embed/[^"]+)"', webpage)
@ -3492,7 +3488,7 @@ def _real_extract(self, url):
r'<iframe[^>]+src="(?:https?:)?(?P<url>%s)"' % UDNEmbedIE._PROTOCOL_RELATIVE_VALID_URL, webpage)
if mobj is not None:
return self.url_result(
compat_urlparse.urljoin(url, mobj.group('url')), 'UDNEmbed')
urllib.parse.urljoin(url, mobj.group('url')), 'UDNEmbed')
# Look for Senate ISVP iframe
senate_isvp_url = SenateISVPIE._search_iframe_url(webpage)
@ -3725,7 +3721,7 @@ def _real_extract(self, url):
if mediasite_urls:
entries = [
self.url_result(smuggle_url(
compat_urlparse.urljoin(url, mediasite_url),
urllib.parse.urljoin(url, mediasite_url),
{'UrlReferrer': url}), ie=MediasiteIE.ie_key())
for mediasite_url in mediasite_urls]
return self.playlist_result(entries, video_id, video_title)
@ -3920,11 +3916,11 @@ def _real_extract(self, url):
subtitles = {}
for source in sources:
src = source.get('src')
if not src or not isinstance(src, compat_str):
if not src or not isinstance(src, str):
continue
src = compat_urlparse.urljoin(url, src)
src = urllib.parse.urljoin(url, src)
src_type = source.get('type')
if isinstance(src_type, compat_str):
if isinstance(src_type, str):
src_type = src_type.lower()
ext = determine_ext(src).lower()
if src_type == 'video/youtube':
@ -3958,7 +3954,7 @@ def _real_extract(self, url):
if not src:
continue
subtitles.setdefault(dict_get(sub, ('language', 'srclang')) or 'und', []).append({
'url': compat_urlparse.urljoin(url, src),
'url': urllib.parse.urljoin(url, src),
'name': sub.get('label'),
'http_headers': {
'Referer': full_response.geturl(),
@ -3985,7 +3981,7 @@ def check_video(vurl):
return True
if RtmpIE.suitable(vurl):
return True
vpath = compat_urlparse.urlparse(vurl).path
vpath = urllib.parse.urlparse(vurl).path
vext = determine_ext(vpath, None)
return vext not in (None, 'swf', 'png', 'jpg', 'srt', 'sbv', 'sub', 'vtt', 'ttml', 'js', 'xml')
@ -4113,7 +4109,7 @@ def filter_video(urls):
if refresh_header:
found = re.search(REDIRECT_REGEX, refresh_header)
if found:
new_url = compat_urlparse.urljoin(url, unescapeHTML(found.group(1)))
new_url = urllib.parse.urljoin(url, unescapeHTML(found.group(1)))
if new_url != url:
self.report_following_redirect(new_url)
return {
@ -4139,8 +4135,8 @@ def filter_video(urls):
for video_url in orderedSet(found):
video_url = unescapeHTML(video_url)
video_url = video_url.replace('\\/', '/')
video_url = compat_urlparse.urljoin(url, video_url)
video_id = compat_urllib_parse_unquote(os.path.basename(video_url))
video_url = urllib.parse.urljoin(url, video_url)
video_id = urllib.parse.unquote(os.path.basename(video_url))
# Sometimes, jwplayer extraction will result in a YouTube URL
if YoutubeIE.suitable(video_url):

View file

@ -1,13 +1,8 @@
import itertools
from .common import InfoExtractor
from ..utils import (
qualities,
compat_str,
parse_duration,
parse_iso8601,
str_to_int,
)
from ..compat import compat_str
from ..utils import parse_duration, parse_iso8601, qualities, str_to_int
class GigaIE(InfoExtractor):

View file

@ -1,13 +1,13 @@
import re
from .common import InfoExtractor
from ..compat import compat_str
from ..utils import (
clean_html,
parse_iso8601,
determine_ext,
float_or_none,
int_or_none,
compat_str,
determine_ext,
parse_iso8601,
)

View file

@ -1,7 +1,7 @@
from .common import InfoExtractor
from ..compat import compat_str
from ..utils import (
clean_html,
compat_str,
format_field,
int_or_none,
parse_iso8601,

View file

@ -3,18 +3,17 @@
import re
from .common import InfoExtractor
from ..compat import compat_str
from ..compat import compat_HTTPError, compat_str
from ..utils import (
compat_HTTPError,
determine_ext,
ExtractorError,
determine_ext,
int_or_none,
parse_duration,
parse_iso8601,
str_or_none,
try_get,
urljoin,
url_or_none,
urljoin,
)

View file

@ -1,9 +1,6 @@
from .prosiebensat1 import ProSiebenSat1BaseIE
from ..utils import (
unified_strdate,
parse_duration,
compat_str,
)
from ..compat import compat_str
from ..utils import parse_duration, unified_strdate
class Puls4IE(ProSiebenSat1BaseIE):

View file

@ -1,6 +1,6 @@
from .common import InfoExtractor
from ..compat import compat_str
from ..utils import (
compat_str,
float_or_none,
int_or_none,
smuggle_url,

View file

@ -13,18 +13,11 @@
import threading
import time
import traceback
import urllib.error
import urllib.parse
from .common import InfoExtractor, SearchInfoExtractor
from ..compat import functools # isort: split
from ..compat import (
compat_HTTPError,
compat_parse_qs,
compat_str,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
compat_urlparse,
)
from ..compat import functools
from ..jsinterp import JSInterpreter
from ..utils import (
NO_DEFAULT,
@ -381,11 +374,11 @@ def _initialize_pref(self):
pref = {}
if pref_cookie:
try:
pref = dict(compat_urlparse.parse_qsl(pref_cookie.value))
pref = dict(urllib.parse.parse_qsl(pref_cookie.value))
except ValueError:
self.report_warning('Failed to parse user PREF cookie' + bug_reports_message())
pref.update({'hl': 'en', 'tz': 'UTC'})
self._set_cookie('.youtube.com', name='PREF', value=compat_urllib_parse_urlencode(pref))
self._set_cookie('.youtube.com', name='PREF', value=urllib.parse.urlencode(pref))
def _real_initialize(self):
self._initialize_pref()
@ -413,19 +406,19 @@ def _ytcfg_get_safe(self, ytcfg, getter, expected_type=None, default_client='web
def _extract_client_name(self, ytcfg, default_client='web'):
return self._ytcfg_get_safe(
ytcfg, (lambda x: x['INNERTUBE_CLIENT_NAME'],
lambda x: x['INNERTUBE_CONTEXT']['client']['clientName']), compat_str, default_client)
lambda x: x['INNERTUBE_CONTEXT']['client']['clientName']), str, default_client)
def _extract_client_version(self, ytcfg, default_client='web'):
return self._ytcfg_get_safe(
ytcfg, (lambda x: x['INNERTUBE_CLIENT_VERSION'],
lambda x: x['INNERTUBE_CONTEXT']['client']['clientVersion']), compat_str, default_client)
lambda x: x['INNERTUBE_CONTEXT']['client']['clientVersion']), str, default_client)
def _select_api_hostname(self, req_api_hostname, default_client=None):
return (self._configuration_arg('innertube_host', [''], ie_key=YoutubeIE.ie_key())[0]
or req_api_hostname or self._get_innertube_host(default_client or 'web'))
def _extract_api_key(self, ytcfg=None, default_client='web'):
return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_API_KEY'], compat_str, default_client)
return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_API_KEY'], str, default_client)
def _extract_context(self, ytcfg=None, default_client='web'):
context = get_first(
@ -497,7 +490,7 @@ def _extract_session_index(*data):
# Deprecated?
def _extract_identity_token(self, ytcfg=None, webpage=None):
if ytcfg:
token = try_get(ytcfg, lambda x: x['ID_TOKEN'], compat_str)
token = try_get(ytcfg, lambda x: x['ID_TOKEN'], str)
if token:
return token
if webpage:
@ -513,12 +506,12 @@ def _extract_account_syncid(*args):
"""
for data in args:
# ytcfg includes channel_syncid if on secondary channel
delegated_sid = try_get(data, lambda x: x['DELEGATED_SESSION_ID'], compat_str)
delegated_sid = try_get(data, lambda x: x['DELEGATED_SESSION_ID'], str)
if delegated_sid:
return delegated_sid
sync_ids = (try_get(
data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'],
lambda x: x['DATASYNC_ID']), compat_str) or '').split('||')
lambda x: x['DATASYNC_ID']), str) or '').split('||')
if len(sync_ids) >= 2 and sync_ids[1]:
# datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
# and just "user_syncid||" for primary channel. We only want the channel_syncid
@ -552,7 +545,7 @@ def generate_api_headers(
origin = 'https://' + (self._select_api_hostname(api_hostname, default_client))
headers = {
'X-YouTube-Client-Name': compat_str(
'X-YouTube-Client-Name': str(
self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)),
'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client),
'Origin': origin,
@ -612,7 +605,7 @@ def _extract_next_continuation_data(cls, renderer):
def _extract_continuation_ep_data(cls, continuation_ep: dict):
if isinstance(continuation_ep, dict):
continuation = try_get(
continuation_ep, lambda x: x['continuationCommand']['token'], compat_str)
continuation_ep, lambda x: x['continuationCommand']['token'], str)
if not continuation:
return
ctp = continuation_ep.get('clickTrackingParams')
@ -672,7 +665,7 @@ def _extract_and_report_alerts(self, data, *args, **kwargs):
def _extract_badges(self, renderer: dict):
badges = set()
for badge in try_get(renderer, lambda x: x['badges'], list) or []:
label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], compat_str)
label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], str)
if label:
badges.add(label.lower())
return badges
@ -687,7 +680,7 @@ def _get_text(data, *path_list, max_runs=None):
if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)):
obj = [obj]
for item in obj:
text = try_get(item, lambda x: x['simpleText'], compat_str)
text = try_get(item, lambda x: x['simpleText'], str)
if text:
return text
runs = try_get(item, lambda x: x['runs'], list) or []
@ -789,20 +782,20 @@ def _extract_response(self, item_id, query, note='Downloading API JSON', headers
note='%s%s' % (note, ' (retry #%d)' % count if count else ''))
except ExtractorError as e:
if isinstance(e.cause, network_exceptions):
if isinstance(e.cause, compat_HTTPError):
if isinstance(e.cause, urllib.error.HTTPError):
first_bytes = e.cause.read(512)
if not is_html(first_bytes):
yt_error = try_get(
self._parse_json(
self._webpage_read_content(e.cause, None, item_id, prefix=first_bytes) or '{}', item_id, fatal=False),
lambda x: x['error']['message'], compat_str)
lambda x: x['error']['message'], str)
if yt_error:
self._report_alerts([('ERROR', yt_error)], fatal=False)
# Downloading page may result in intermittent 5xx HTTP error
# Sometimes a 404 is also recieved. See: https://github.com/ytdl-org/youtube-dl/issues/28289
# We also want to catch all other network exceptions since errors in later pages can be troublesome
# See https://github.com/yt-dlp/yt-dlp/issues/507#issuecomment-880188210
if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429):
if not isinstance(e.cause, urllib.error.HTTPError) or e.cause.code not in (403, 429):
last_error = error_to_compat_str(e.cause or e.msg)
if count < retries:
continue
@ -2345,7 +2338,7 @@ def _extract_sequence_from_mpd(refresh_sequence, immediate):
# Obtain from MPD's maximum seq value
old_mpd_url = mpd_url
last_error = ctx.pop('last_error', None)
expire_fast = immediate or last_error and isinstance(last_error, compat_HTTPError) and last_error.code == 403
expire_fast = immediate or last_error and isinstance(last_error, urllib.error.HTTPError) and last_error.code == 403
mpd_url, stream_number, is_live = (mpd_feed(format_id, 5 if expire_fast else 18000)
or (mpd_url, stream_number, False))
if not refresh_sequence:
@ -2427,7 +2420,7 @@ def _extract_sequence_from_mpd(refresh_sequence, immediate):
def _extract_player_url(self, *ytcfgs, webpage=None):
player_url = traverse_obj(
ytcfgs, (..., 'PLAYER_JS_URL'), (..., 'WEB_PLAYER_CONTEXT_CONFIGS', ..., 'jsUrl'),
get_all=False, expected_type=compat_str)
get_all=False, expected_type=str)
if not player_url:
return
return urljoin('https://www.youtube.com', player_url)
@ -2444,7 +2437,7 @@ def _download_player_url(self, video_id, fatal=False):
def _signature_cache_id(self, example_sig):
""" Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
return '.'.join(str(len(part)) for part in example_sig.split('.'))
@classmethod
def _extract_player_info(cls, player_url):
@ -2526,7 +2519,7 @@ def _genslice(start, end, step):
cache_spec = [ord(c) for c in cache_res]
expr_code = ' + '.join(gen_sig_code(cache_spec))
signature_id_tuple = '(%s)' % (
', '.join(compat_str(len(p)) for p in example_sig.split('.')))
', '.join(str(len(p)) for p in example_sig.split('.')))
code = ('if tuple(len(p) for p in s.split(\'.\')) == %s:\n'
' return %s\n') % (signature_id_tuple, expr_code)
self.to_screen('Extracted signature function:\n' + code)
@ -2649,8 +2642,8 @@ def _mark_watched(self, video_id, player_responses):
if not url:
self.report_warning(f'Unable to mark {label}watched')
return
parsed_url = compat_urlparse.urlparse(url)
qs = compat_urlparse.parse_qs(parsed_url.query)
parsed_url = urllib.parse.urlparse(url)
qs = urllib.parse.parse_qs(parsed_url.query)
# cpn generation algorithm is reverse engineered from base.js.
# In fact it works even with dummy cpn.
@ -2675,8 +2668,8 @@ def _mark_watched(self, video_id, player_responses):
'et': video_length,
})
url = compat_urlparse.urlunparse(
parsed_url._replace(query=compat_urllib_parse_urlencode(qs, True)))
url = urllib.parse.urlunparse(
parsed_url._replace(query=urllib.parse.urlencode(qs, True)))
self._download_webpage(
url, video_id, f'Marking {label}watched',
@ -2793,12 +2786,12 @@ def _extract_comment(self, comment_renderer, parent=None):
timestamp, time_text = self._extract_time_text(comment_renderer, 'publishedTimeText')
author = self._get_text(comment_renderer, 'authorText')
author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], str)
votes = parse_count(try_get(comment_renderer, (lambda x: x['voteCount']['simpleText'],
lambda x: x['likeCount']), compat_str)) or 0
lambda x: x['likeCount']), str)) or 0
author_thumbnail = try_get(comment_renderer,
lambda x: x['authorThumbnail']['thumbnails'][-1]['url'], compat_str)
lambda x: x['authorThumbnail']['thumbnails'][-1]['url'], str)
author_is_uploader = try_get(comment_renderer, lambda x: x['authorIsChannelOwner'], bool)
is_favorited = 'creatorHeart' in (try_get(
@ -3178,7 +3171,7 @@ def _extract_formats(self, streaming_data, video_id, player_url, is_live, durati
fmt_url = fmt.get('url')
if not fmt_url:
sc = compat_parse_qs(fmt.get('signatureCipher'))
sc = urllib.parse.parse_qs(fmt.get('signatureCipher'))
fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
encrypted_sig = try_get(sc, lambda x: x['s'][0])
if not all((sc, fmt_url, player_url, encrypted_sig)):
@ -3419,12 +3412,12 @@ def _real_extract(self, url):
# Unquote should take place before split on comma (,) since textual
# fields may contain comma as well (see
# https://github.com/ytdl-org/youtube-dl/issues/8536)
feed_data = compat_parse_qs(
feed_data = urllib.parse.parse_qs(
urllib.parse.unquote_plus(feed))
def feed_entry(name):
return try_get(
feed_data, lambda x: x[name][0], compat_str)
feed_data, lambda x: x[name][0], str)
feed_id = feed_entry('id')
if not feed_id:
@ -3651,9 +3644,9 @@ def process_language(container, base_url, lang_code, sub_name, query):
info['automatic_captions'] = automatic_captions
info['subtitles'] = subtitles
parsed_url = compat_urllib_parse_urlparse(url)
parsed_url = urllib.parse.urlparse(url)
for component in [parsed_url.fragment, parsed_url.query]:
query = compat_parse_qs(component)
query = urllib.parse.parse_qs(component)
for k, v in query.items():
for d_k, s_ks in [('start', ('start', 't')), ('end', ('end',))]:
d_k += '_time'
@ -3946,7 +3939,7 @@ def _grid_entries(self, grid_renderer):
# generic endpoint URL support
ep_url = urljoin('https://www.youtube.com/', try_get(
renderer, lambda x: x['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str))
str))
if ep_url:
for ie in (YoutubeTabIE, YoutubePlaylistIE, YoutubeIE):
if ie.suitable(ep_url):
@ -3990,7 +3983,7 @@ def _shelf_entries_from_content(self, shelf_renderer):
def _shelf_entries(self, shelf_renderer, skip_channels=False):
ep = try_get(
shelf_renderer, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str)
str)
shelf_url = urljoin('https://www.youtube.com', ep)
if shelf_url:
# Skipping links to another channels, note that checking for
@ -4050,7 +4043,7 @@ def _post_thread_entries(self, post_thread_renderer):
yield entry
# playlist attachment
playlist_id = try_get(
post_renderer, lambda x: x['backstageAttachment']['playlistRenderer']['playlistId'], compat_str)
post_renderer, lambda x: x['backstageAttachment']['playlistRenderer']['playlistId'], str)
if playlist_id:
yield self.url_result(
'https://www.youtube.com/playlist?list=%s' % playlist_id,
@ -4061,7 +4054,7 @@ def _post_thread_entries(self, post_thread_renderer):
if not isinstance(run, dict):
continue
ep_url = try_get(
run, lambda x: x['navigationEndpoint']['urlEndpoint']['url'], compat_str)
run, lambda x: x['navigationEndpoint']['urlEndpoint']['url'], str)
if not ep_url:
continue
if not YoutubeIE.suitable(ep_url):
@ -4238,10 +4231,10 @@ def _extract_uploader(self, data):
uploader['uploader'] = self._search_regex(
r'^by (.+) and \d+ others?$', owner_text, 'uploader', default=owner_text)
uploader['uploader_id'] = try_get(
owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str)
owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], str)
uploader['uploader_url'] = urljoin(
'https://www.youtube.com/',
try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str))
try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], str))
return {k: v for k, v in uploader.items() if v is not None}
def _extract_from_tabs(self, item_id, ytcfg, data, tabs):
@ -4369,13 +4362,13 @@ def _extract_inline_playlist(self, playlist, playlist_id, data, ytcfg):
def _extract_from_playlist(self, item_id, url, data, playlist, ytcfg):
title = playlist.get('title') or try_get(
data, lambda x: x['titleText']['simpleText'], compat_str)
data, lambda x: x['titleText']['simpleText'], str)
playlist_id = playlist.get('playlistId') or item_id
# Delegating everything except mix playlists to regular tab-based playlist URL
playlist_url = urljoin(url, try_get(
playlist, lambda x: x['endpoint']['commandMetadata']['webCommandMetadata']['url'],
compat_str))
str))
# Some playlists are unviewable but YouTube still provides a link to the (broken) playlist page [1]
# [1] MLCT, RLTDwFCb4jeqaKWnciAYM-ZVHg
@ -4446,7 +4439,7 @@ def _reload_with_unavailable_videos(self, item_id, data, ytcfg):
continue
nav_item_renderer = menu_item.get('menuNavigationItemRenderer')
text = try_get(
nav_item_renderer, lambda x: x['text']['simpleText'], compat_str)
nav_item_renderer, lambda x: x['text']['simpleText'], str)
if not text or text.lower() != 'show unavailable videos':
continue
browse_endpoint = try_get(
@ -4488,7 +4481,7 @@ def _extract_webpage(self, url, item_id, fatal=True):
data = self.extract_yt_initial_data(item_id, webpage or '', fatal=fatal) or {}
except ExtractorError as e:
if isinstance(e.cause, network_exceptions):
if not isinstance(e.cause, compat_HTTPError) or e.cause.code not in (403, 429):
if not isinstance(e.cause, urllib.error.HTTPError) or e.cause.code not in (403, 429):
last_error = error_to_compat_str(e.cause or e.msg)
if count < retries:
continue
@ -5301,8 +5294,8 @@ def suitable(cls, url):
@YoutubeTabBaseInfoExtractor.passthrough_smuggled_data
def _real_extract(self, url, smuggled_data):
item_id = self._match_id(url)
url = compat_urlparse.urlunparse(
compat_urlparse.urlparse(url)._replace(netloc='www.youtube.com'))
url = urllib.parse.urlunparse(
urllib.parse.urlparse(url)._replace(netloc='www.youtube.com'))
compat_opts = self.get_param('compat_opts', [])
def get_mobj(url):
@ -5322,7 +5315,7 @@ def get_mobj(url):
mdata = self._extract_tab_endpoint(
f'https://music.youtube.com/channel/{item_id}', item_id, default_client='web_music')
murl = traverse_obj(mdata, ('microformat', 'microformatDataRenderer', 'urlCanonical'),
get_all=False, expected_type=compat_str)
get_all=False, expected_type=str)
if not murl:
raise ExtractorError('Failed to resolve album to playlist')
return self.url_result(murl, ie=YoutubeTabIE.ie_key())

View file

@ -1,9 +1,9 @@
import hashlib
import json
import re
import urllib.parse
from .ffmpeg import FFmpegPostProcessor
from ..compat import compat_urllib_parse_urlencode
class SponsorBlockPP(FFmpegPostProcessor):
@ -86,7 +86,7 @@ def to_chapter(s):
def _get_sponsor_segments(self, video_id, service):
hash = hashlib.sha256(video_id.encode('ascii')).hexdigest()
# SponsorBlock API recommends using first 4 hash characters.
url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + compat_urllib_parse_urlencode({
url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + urllib.parse.urlencode({
'service': service,
'categories': json.dumps(self._categories),
'actionTypes': json.dumps(['skip', 'poi'])

View file

@ -39,6 +39,7 @@
import time
import traceback
import types
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree
@ -49,14 +50,8 @@
compat_etree_fromstring,
compat_expanduser,
compat_HTMLParseError,
compat_HTTPError,
compat_os_name,
compat_parse_qs,
compat_shlex_quote,
compat_str,
compat_urllib_parse_urlencode,
compat_urllib_parse_urlparse,
compat_urlparse,
)
from .dependencies import brotli, certifi, websockets, xattr
from .socks import ProxyType, sockssocket
@ -67,8 +62,8 @@ def register_socks_protocols():
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
# URLs with protocols not in urlparse.uses_netloc are not handled correctly
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
if scheme not in compat_urlparse.uses_netloc:
compat_urlparse.uses_netloc.append(scheme)
if scheme not in urllib.parse.uses_netloc:
urllib.parse.uses_netloc.append(scheme)
# This is not clearly defined otherwise
@ -311,7 +306,7 @@ def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
return node.find(xpath)
if isinstance(xpath, (str, compat_str)):
if isinstance(xpath, str):
n = _find_xpath(xpath)
else:
for xp in xpath:
@ -741,10 +736,10 @@ def sanitize_url(url):
def extract_basic_auth(url):
parts = compat_urlparse.urlsplit(url)
parts = urllib.parse.urlsplit(url)
if parts.username is None:
return url, None
url = compat_urlparse.urlunsplit(parts._replace(netloc=(
url = urllib.parse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
@ -889,7 +884,7 @@ def decodeFilename(b, for_subprocess=False):
def encodeArgument(s):
# Legacy code that uses byte strings
# Uncomment the following line after fixing all post processors
# assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
# assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
return s if isinstance(s, str) else s.decode('ascii')
@ -903,7 +898,7 @@ def decodeOption(optval):
if isinstance(optval, bytes):
optval = optval.decode(preferredencoding())
assert isinstance(optval, compat_str)
assert isinstance(optval, str)
return optval
@ -1395,7 +1390,7 @@ def make_socks_conn_class(base_class, socks_proxy):
assert issubclass(base_class, (
http.client.HTTPConnection, http.client.HTTPSConnection))
url_components = compat_urlparse.urlparse(socks_proxy)
url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'):
@ -1639,7 +1634,7 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
m = req.get_method()
if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
or code in (301, 302, 303) and m == "POST")):
raise compat_HTTPError(req.full_url, code, msg, headers, fp)
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
# Strictly (according to RFC 2616), 301 or 302 in response to
# a POST MUST NOT cause a redirection without confirmation
# from the user (of urllib.request, in this case). In practice,
@ -1739,7 +1734,7 @@ def unified_strdate(date_str, day_first=True):
with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
if upload_date is not None:
return compat_str(upload_date)
return str(upload_date)
def unified_timestamp(date_str, day_first=True):
@ -1913,12 +1908,12 @@ def __str__(self):
def platform_name():
""" Returns the platform name as a compat_str """
""" Returns the platform name as a str """
res = platform.platform()
if isinstance(res, bytes):
res = res.decode(preferredencoding())
assert isinstance(res, compat_str)
assert isinstance(res, str)
return res
@ -2144,7 +2139,7 @@ def smuggle_url(url, data):
url, idata = unsmuggle_url(url, {})
data.update(idata)
sdata = compat_urllib_parse_urlencode(
sdata = urllib.parse.urlencode(
{'__youtubedl_smuggle': json.dumps(data)})
return url + '#' + sdata
@ -2153,7 +2148,7 @@ def unsmuggle_url(smug_url, default=None):
if '#__youtubedl_smuggle' not in smug_url:
return smug_url, default
url, _, sdata = smug_url.rpartition('#')
jsond = compat_parse_qs(sdata)['__youtubedl_smuggle'][0]
jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
data = json.loads(jsond)
return url, data
@ -2313,7 +2308,7 @@ def parse_resolution(s, *, lenient=False):
def parse_bitrate(s):
if not isinstance(s, compat_str):
if not isinstance(s, str):
return
mobj = re.search(r'\b(\d+)\s*kbps', s)
if mobj:
@ -2350,7 +2345,7 @@ def fix_xml_ampersands(xml_str):
def setproctitle(title):
assert isinstance(title, compat_str)
assert isinstance(title, str)
# ctypes in Jython is not complete
# http://bugs.jython.org/issue2148
@ -2398,7 +2393,7 @@ def get_domain(url):
def url_basename(url):
path = compat_urlparse.urlparse(url).path
path = urllib.parse.urlparse(url).path
return path.strip('/').split('/')[-1]
@ -2409,16 +2404,16 @@ def base_url(url):
def urljoin(base, path):
if isinstance(path, bytes):
path = path.decode()
if not isinstance(path, compat_str) or not path:
if not isinstance(path, str) or not path:
return None
if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
return path
if isinstance(base, bytes):
base = base.decode()
if not isinstance(base, compat_str) or not re.match(
if not isinstance(base, str) or not re.match(
r'^(?:https?:)?//', base):
return None
return compat_urlparse.urljoin(base, path)
return urllib.parse.urljoin(base, path)
class HEADRequest(urllib.request.Request):
@ -2441,14 +2436,14 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
def str_or_none(v, default=None):
return default if v is None else compat_str(v)
return default if v is None else str(v)
def str_to_int(int_str):
""" A more relaxed version of int_or_none """
if isinstance(int_str, int):
return int_str
elif isinstance(int_str, compat_str):
elif isinstance(int_str, str):
int_str = re.sub(r'[,\.\+]', '', int_str)
return int_or_none(int_str)
@ -2467,11 +2462,11 @@ def bool_or_none(v, default=None):
def strip_or_none(v, default=None):
return v.strip() if isinstance(v, compat_str) else default
return v.strip() if isinstance(v, str) else default
def url_or_none(url):
if not url or not isinstance(url, compat_str):
if not url or not isinstance(url, str):
return None
url = url.strip()
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
@ -2489,7 +2484,7 @@ def strftime_or_none(timestamp, date_format, default=None):
try:
if isinstance(timestamp, (int, float)): # unix timestamp
datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
elif isinstance(timestamp, compat_str): # assume YYYYMMDD
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
return datetime_object.strftime(date_format)
except (ValueError, TypeError, AttributeError):
@ -2592,7 +2587,7 @@ def _get_exe_version_output(exe, args, *, to_screen=None):
def detect_exe_version(output, version_re=None, unrecognized='present'):
assert isinstance(output, compat_str)
assert isinstance(output, str)
if version_re is None:
version_re = r'version\s+([-0-9._a-zA-Z]+)'
m = re.search(version_re, output)
@ -2973,7 +2968,7 @@ def escape_rfc3986(s):
def escape_url(url):
"""Escape URL as suggested by RFC 3986"""
url_parsed = compat_urllib_parse_urlparse(url)
url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(url_parsed.path),
@ -2984,12 +2979,12 @@ def escape_url(url):
def parse_qs(url):
return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
def read_batch_urls(batch_fd):
def fixup(url):
if not isinstance(url, compat_str):
if not isinstance(url, str):
url = url.decode('utf-8', 'replace')
BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
for bom in BOM_UTF8:
@ -3007,17 +3002,17 @@ def fixup(url):
def urlencode_postdata(*args, **kargs):
return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
return urllib.parse.urlencode(*args, **kargs).encode('ascii')
def update_url_query(url, query):
if not query:
return url
parsed_url = compat_urlparse.urlparse(url)
qs = compat_parse_qs(parsed_url.query)
parsed_url = urllib.parse.urlparse(url)
qs = urllib.parse.parse_qs(parsed_url.query)
qs.update(query)
return compat_urlparse.urlunparse(parsed_url._replace(
query=compat_urllib_parse_urlencode(qs, True)))
return urllib.parse.urlunparse(parsed_url._replace(
query=urllib.parse.urlencode(qs, True)))
def update_Request(req, url=None, data=None, headers={}, query={}):
@ -3046,9 +3041,9 @@ def _multipart_encode_impl(data, boundary):
out = b''
for k, v in data.items():
out += b'--' + boundary.encode('ascii') + b'\r\n'
if isinstance(k, compat_str):
if isinstance(k, str):
k = k.encode()
if isinstance(v, compat_str):
if isinstance(v, str):
v = v.encode()
# RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
# suggests sending UTF-8 directly. Firefox sends UTF-8, too
@ -3129,7 +3124,7 @@ def merge_dicts(*dicts):
def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
return string if isinstance(string, compat_str) else compat_str(string, encoding, errors)
return string if isinstance(string, str) else str(string, encoding, errors)
US_RATINGS = {
@ -3509,7 +3504,7 @@ def determine_protocol(info_dict):
elif ext == 'f4m':
return 'f4m'
return compat_urllib_parse_urlparse(url).scheme
return urllib.parse.urlparse(url).scheme
def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
@ -4632,7 +4627,7 @@ def random_ipv4(cls, code_or_block):
addr, preflen = block.split('/')
addr_min = struct.unpack('!L', socket.inet_aton(addr))[0]
addr_max = addr_min | (0xffffffff >> int(preflen))
return compat_str(socket.inet_ntoa(
return str(socket.inet_ntoa(
struct.pack('!L', random.randint(addr_min, addr_max))))
@ -4653,7 +4648,7 @@ def proxy_open(self, req, proxy, type):
if proxy == '__noproxy__':
return None # No Proxy
if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy)
# yt-dlp's http/https handlers do wrapping the socket with socks
return None
@ -5036,7 +5031,7 @@ def iri_to_uri(iri):
The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact.
"""
iri_parts = compat_urllib_parse_urlparse(iri)
iri_parts = urllib.parse.urlparse(iri)
if '[' in iri_parts.netloc:
raise ValueError('IPv6 URIs are not, yet, supported.')