[fd/external] Scope cookies

- ffmpeg: Calculate cookies from cookiejar and pass with `-cookies` arg instead of `-headers`
- aria2c, curl, wget: Write cookiejar to file and use external FD built-in cookiejar support
- httpie: Calculate cookies from cookiejar instead of `http_headers`
- axel: Calculate cookies from cookiejar and disable http redirection if cookies are passed
    - May break redirects, but axel simply don't have proper cookie support

Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj

Authored by: bashonly, coletdjnz
This commit is contained in:
bashonly 2023-07-05 15:16:28 -05:00 committed by pukkandan
parent ad8902f616
commit 1ceb657bdd
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
3 changed files with 179 additions and 2 deletions

View file

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import http.cookiejar
from test.helper import FakeYDL
from yt_dlp.downloader.external import (
Aria2cFD,
AxelFD,
CurlFD,
FFmpegFD,
HttpieFD,
WgetFD,
)
TEST_COOKIE = {
'version': 0,
'name': 'test',
'value': 'ytdlp',
'port': None,
'port_specified': False,
'domain': '.example.com',
'domain_specified': True,
'domain_initial_dot': False,
'path': '/',
'path_specified': True,
'secure': False,
'expires': None,
'discard': False,
'comment': None,
'comment_url': None,
'rest': {},
}
TEST_INFO = {'url': 'http://www.example.com/'}
class TestHttpieFD(unittest.TestCase):
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = HttpieFD(ydl, {})
self.assertEqual(
downloader._make_cmd('test', TEST_INFO),
['http', '--download', '--output', 'test', 'http://www.example.com/'])
# Test cookie header is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
self.assertEqual(
downloader._make_cmd('test', TEST_INFO),
['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
class TestAxelFD(unittest.TestCase):
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = AxelFD(ydl, {})
self.assertEqual(
downloader._make_cmd('test', TEST_INFO),
['axel', '-o', 'test', '--', 'http://www.example.com/'])
# Test cookie header is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
self.assertEqual(
downloader._make_cmd('test', TEST_INFO),
['axel', '-o', 'test', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
class TestWgetFD(unittest.TestCase):
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = WgetFD(ydl, {})
self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
# Test cookiejar tempfile arg is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
class TestCurlFD(unittest.TestCase):
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = CurlFD(ydl, {})
self.assertNotIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO))
# Test cookiejar tempfile arg is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
self.assertIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO))
class TestAria2cFD(unittest.TestCase):
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = Aria2cFD(ydl, {})
downloader._make_cmd('test', TEST_INFO)
self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
# Test cookiejar tempfile arg is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
cmd = downloader._make_cmd('test', TEST_INFO)
self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
class TestFFmpegFD(unittest.TestCase):
_args = []
def _test_cmd(self, args):
self._args = args
def test_make_cmd(self):
with FakeYDL() as ydl:
downloader = FFmpegFD(ydl, {})
downloader._debug_cmd = self._test_cmd
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
self.assertEqual(self._args, [
'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
'-c', 'copy', '-f', 'mp4', 'file:test'])
# Test cookies arg is added
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
self.assertEqual(self._args, [
'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
'-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
if __name__ == '__main__':
unittest.main()

View file

@ -1327,6 +1327,13 @@ def get_cookie_header(self, url):
self.add_cookie_header(cookie_req) self.add_cookie_header(cookie_req)
return cookie_req.get_header('Cookie') return cookie_req.get_header('Cookie')
def get_cookies_for_url(self, url):
"""Generate a list of Cookie objects for a given url"""
# Policy `_now` attribute must be set before calling `_cookies_for_request`
# Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360
self._policy._now = self._now = int(time.time())
return self._cookies_for_request(urllib.request.Request(escape_url(sanitize_url(url))))
def clear(self, *args, **kwargs): def clear(self, *args, **kwargs):
with contextlib.suppress(KeyError): with contextlib.suppress(KeyError):
return super().clear(*args, **kwargs) return super().clear(*args, **kwargs)

View file

@ -1,9 +1,10 @@
import enum import enum
import json import json
import os.path import os
import re import re
import subprocess import subprocess
import sys import sys
import tempfile
import time import time
import uuid import uuid
@ -42,6 +43,7 @@ class ExternalFD(FragmentFD):
def real_download(self, filename, info_dict): def real_download(self, filename, info_dict):
self.report_destination(filename) self.report_destination(filename)
tmpfilename = self.temp_name(filename) tmpfilename = self.temp_name(filename)
self._cookies_tempfile = None
try: try:
started = time.time() started = time.time()
@ -54,6 +56,9 @@ def real_download(self, filename, info_dict):
# should take place # should take place
retval = 0 retval = 0
self.to_screen('[%s] Interrupted by user' % self.get_basename()) self.to_screen('[%s] Interrupted by user' % self.get_basename())
finally:
if self._cookies_tempfile:
self.try_remove(self._cookies_tempfile)
if retval == 0: if retval == 0:
status = { status = {
@ -125,6 +130,16 @@ def _configuration_args(self, keys=None, *args, **kwargs):
self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME, self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME,
keys, *args, **kwargs) keys, *args, **kwargs)
def _write_cookies(self):
if not self.ydl.cookiejar.filename:
tmp_cookies = tempfile.NamedTemporaryFile(suffix='.cookies', delete=False)
tmp_cookies.close()
self._cookies_tempfile = tmp_cookies.name
self.to_screen(f'[download] Writing temporary cookies file to "{self._cookies_tempfile}"')
# real_download resets _cookies_tempfile; if it's None then save() will write to cookiejar.filename
self.ydl.cookiejar.save(self._cookies_tempfile)
return self.ydl.cookiejar.filename or self._cookies_tempfile
def _call_downloader(self, tmpfilename, info_dict): def _call_downloader(self, tmpfilename, info_dict):
""" Either overwrite this or implement _make_cmd """ """ Either overwrite this or implement _make_cmd """
cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)] cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)]
@ -184,6 +199,8 @@ class CurlFD(ExternalFD):
def _make_cmd(self, tmpfilename, info_dict): def _make_cmd(self, tmpfilename, info_dict):
cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed'] cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed']
if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
cmd += ['--cookie-jar', self._write_cookies()]
if info_dict.get('http_headers') is not None: if info_dict.get('http_headers') is not None:
for key, val in info_dict['http_headers'].items(): for key, val in info_dict['http_headers'].items():
cmd += ['--header', f'{key}: {val}'] cmd += ['--header', f'{key}: {val}']
@ -214,6 +231,9 @@ def _make_cmd(self, tmpfilename, info_dict):
if info_dict.get('http_headers') is not None: if info_dict.get('http_headers') is not None:
for key, val in info_dict['http_headers'].items(): for key, val in info_dict['http_headers'].items():
cmd += ['-H', f'{key}: {val}'] cmd += ['-H', f'{key}: {val}']
cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
if cookie_header:
cmd += [f'Cookie: {cookie_header}', '--max-redirect=0']
cmd += self._configuration_args() cmd += self._configuration_args()
cmd += ['--', info_dict['url']] cmd += ['--', info_dict['url']]
return cmd return cmd
@ -223,7 +243,9 @@ class WgetFD(ExternalFD):
AVAILABLE_OPT = '--version' AVAILABLE_OPT = '--version'
def _make_cmd(self, tmpfilename, info_dict): def _make_cmd(self, tmpfilename, info_dict):
cmd = [self.exe, '-O', tmpfilename, '-nv', '--no-cookies', '--compression=auto'] cmd = [self.exe, '-O', tmpfilename, '-nv', '--compression=auto']
if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
cmd += ['--load-cookies', self._write_cookies()]
if info_dict.get('http_headers') is not None: if info_dict.get('http_headers') is not None:
for key, val in info_dict['http_headers'].items(): for key, val in info_dict['http_headers'].items():
cmd += ['--header', f'{key}: {val}'] cmd += ['--header', f'{key}: {val}']
@ -279,6 +301,8 @@ def _make_cmd(self, tmpfilename, info_dict):
else: else:
cmd += ['--min-split-size', '1M'] cmd += ['--min-split-size', '1M']
if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
cmd += [f'--load-cookies={self._write_cookies()}']
if info_dict.get('http_headers') is not None: if info_dict.get('http_headers') is not None:
for key, val in info_dict['http_headers'].items(): for key, val in info_dict['http_headers'].items():
cmd += ['--header', f'{key}: {val}'] cmd += ['--header', f'{key}: {val}']
@ -417,6 +441,14 @@ def _make_cmd(self, tmpfilename, info_dict):
if info_dict.get('http_headers') is not None: if info_dict.get('http_headers') is not None:
for key, val in info_dict['http_headers'].items(): for key, val in info_dict['http_headers'].items():
cmd += [f'{key}:{val}'] cmd += [f'{key}:{val}']
# httpie 3.1.0+ removes the Cookie header on redirect, so this should be safe for now. [1]
# If we ever need cookie handling for redirects, we can export the cookiejar into a session. [2]
# 1: https://github.com/httpie/httpie/security/advisories/GHSA-9w4w-cpc8-h2fq
# 2: https://httpie.io/docs/cli/sessions
cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
if cookie_header:
cmd += [f'Cookie:{cookie_header}']
return cmd return cmd
@ -527,6 +559,11 @@ def _call_downloader(self, tmpfilename, info_dict):
selected_formats = info_dict.get('requested_formats') or [info_dict] selected_formats = info_dict.get('requested_formats') or [info_dict]
for i, fmt in enumerate(selected_formats): for i, fmt in enumerate(selected_formats):
cookies = self.ydl.cookiejar.get_cookies_for_url(fmt['url'])
if cookies:
args.extend(['-cookies', ''.join(
f'{cookie.name}={cookie.value}; path={cookie.path}; domain={cookie.domain};\r\n'
for cookie in cookies)])
if fmt.get('http_headers') and re.match(r'^https?://', fmt['url']): if fmt.get('http_headers') and re.match(r'^https?://', fmt['url']):
# Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv: # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv:
# [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header. # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.