This commit is contained in:
Matheus Gois 2025-06-01 22:58:07 -03:00
parent a084c80f7b
commit f014773f2f

View file

@ -5,6 +5,8 @@ import re
import sys import sys
import time import time
import random import random
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, Callable, Union
from ..compat import compat_os_name from ..compat import compat_os_name
from ..utils import ( from ..utils import (
@ -16,163 +18,263 @@ from ..utils import (
timeconvert, timeconvert,
) )
# Constantes
TEST_FILE_SIZE = 10241
MAX_BLOCK_SIZE = 4194304 # 4 MB
MIN_BLOCK_SIZE = 1.0
class FileDownloader(object): @dataclass
"""File Downloader class. class DownloadProgress:
"""Classe para representar o progresso do download."""
File downloader objects are the ones responsible of downloading the status: str
actual video file and writing it to disk. downloaded_bytes: Optional[int] = None
total_bytes: Optional[int] = None
File downloaders accept a lot of parameters. In order not to saturate total_bytes_estimate: Optional[int] = None
the object constructor with arguments, it receives a dictionary of speed: Optional[float] = None
options instead. eta: Optional[int] = None
elapsed: Optional[float] = None
Available options:
verbose: Print additional info to stdout.
quiet: Do not print messages to stdout.
ratelimit: Download speed limit, in bytes/sec.
retries: Number of times to retry for HTTP error 5xx
buffersize: Size of download buffer in bytes.
noresizebuffer: Do not automatically resize the download buffer.
continuedl: Try to continue downloads if possible.
noprogress: Do not print the progress bar.
logtostderr: Log messages to stderr instead of stdout.
consoletitle: Display progress in console window's titlebar.
nopart: Do not use temporary .part files.
updatetime: Use the Last-modified header to set output file timestamps.
test: Download only first bytes to test the downloader.
min_filesize: Skip files smaller than this size
max_filesize: Skip files larger than this size
xattr_set_filesize: Set ytdl.filesize user xattribute with expected size.
external_downloader_args: A list of additional command-line arguments for the
external downloader.
hls_use_mpegts: Use the mpegts container for HLS videos.
http_chunk_size: Size of a chunk for chunk-based HTTP downloading. May be
useful for bypassing bandwidth throttling imposed by
a webserver (experimental)
Subclasses of this one must re-define the real_download method.
"""
_TEST_FILE_SIZE = 10241
params = None
def __init__(self, ydl, params):
"""Create a FileDownloader object with the given options."""
self.ydl = ydl
self._progress_hooks = []
self.params = params
self.add_progress_hook(self.report_progress)
class ProgressFormatter:
"""Classe responsável por formatar informações de progresso."""
@staticmethod @staticmethod
def format_seconds(seconds): def format_seconds(seconds: float) -> str:
(mins, secs) = divmod(seconds, 60) """Formata segundos em formato HH:MM:SS."""
(hours, mins) = divmod(mins, 60) mins, secs = divmod(seconds, 60)
hours, mins = divmod(mins, 60)
if hours > 99: if hours > 99:
return '--:--:--' return '--:--:--'
if hours == 0: if hours == 0:
return '%02d:%02d' % (mins, secs) return f'{mins:02d}:{secs:02d}'
else: return f'{hours:02d}:{mins:02d}:{secs:02d}'
return '%02d:%02d:%02d' % (hours, mins, secs)
@staticmethod @staticmethod
def calc_percent(byte_counter, data_len): def format_percent(percent: Optional[float]) -> str:
if data_len is None: """Formata porcentagem."""
return None
return float(byte_counter) / float(data_len) * 100.0
@staticmethod
def format_percent(percent):
if percent is None: if percent is None:
return '---.-%' return '---.-%'
return '%6s' % ('%3.1f%%' % percent) return f'{percent:6.1f}%'
@classmethod
def calc_eta(cls, start_or_rate, now_or_remaining, *args):
if len(args) < 2:
rate, remaining = (start_or_rate, now_or_remaining)
if None in (rate, remaining):
return None
return int(float(remaining) / rate)
start, now = (start_or_rate, now_or_remaining)
total, current = args[:2]
if total is None:
return None
if now is None:
now = time.time()
rate = cls.calc_speed(start, now, current)
return rate and int((float(total) - float(current)) / rate)
@staticmethod @staticmethod
def format_eta(eta): def format_speed(speed: Optional[float]) -> str:
if eta is None: """Formata velocidade de download."""
return '--:--'
return FileDownloader.format_seconds(eta)
@staticmethod
def calc_speed(start, now, bytes):
dif = now - start
if bytes == 0 or dif < 0.001: # One millisecond
return None
return float(bytes) / dif
@staticmethod
def format_speed(speed):
if speed is None: if speed is None:
return '%10s' % '---b/s' return '%10s' % '---b/s'
return '%10s' % ('%s/s' % format_bytes(speed)) return f'{format_bytes(speed):10s}/s'
@staticmethod @staticmethod
def format_retries(retries): def format_eta(eta: Optional[int]) -> str:
return 'inf' if retries == float('inf') else '%.0f' % retries """Formata tempo estimado."""
if eta is None:
return '--:--'
return ProgressFormatter.format_seconds(eta)
@staticmethod class FileDownloader:
def filesize_or_none(unencoded_filename): """Classe base para download de arquivos.
fn = encodeFilename(unencoded_filename)
if os.path.isfile(fn): Responsável por gerenciar o download de arquivos e fornecer feedback
return os.path.getsize(fn) sobre o progresso.
"""
@staticmethod def __init__(self, ydl: Any, params: Dict[str, Any]):
def best_block_size(elapsed_time, bytes): """Inicializa o downloader.
new_min = max(bytes / 2.0, 1.0)
new_max = min(max(bytes * 2.0, 1.0), 4194304) # Do not surpass 4 MB Args:
if elapsed_time < 0.001: ydl: Instância do YoutubeDL
return int(new_max) params: Dicionário com parâmetros de configuração
rate = bytes / elapsed_time """
if rate > new_max: self.ydl = ydl
return int(new_max) self._progress_hooks: List[Callable] = []
if rate < new_min: self.params = params
return int(new_min) self.formatter = ProgressFormatter()
return int(rate) self.add_progress_hook(self.report_progress)
@staticmethod def add_progress_hook(self, hook: Callable) -> None:
def parse_bytes(bytestr): """Adiciona um hook de progresso."""
"""Parse a string indicating a byte quantity into an integer.""" self._progress_hooks.append(hook)
matchobj = re.match(r'(?i)^(\d+(?:\.\d+)?)([kMGTPEZY]?)$', bytestr)
if matchobj is None:
return None
number = float(matchobj.group(1))
multiplier = 1024.0 ** 'bkmgtpezy'.index(matchobj.group(2).lower())
return int(round(number * multiplier))
def to_screen(self, *args, **kargs): def _hook_progress(self, status: Dict[str, Any]) -> None:
self.ydl.to_screen(*args, **kargs) """Executa todos os hooks de progresso registrados."""
for hook in self._progress_hooks:
hook(status)
def to_stderr(self, message): def report_progress(self, status: Dict[str, Any]) -> None:
"""Reporta o progresso do download."""
if status['status'] == 'finished':
self._report_finished(status)
return
if self.params.get('noprogress') or status['status'] != 'downloading':
return
self._report_downloading(status)
def _report_finished(self, status: Dict[str, Any]) -> None:
"""Reporta conclusão do download."""
if self.params.get('noprogress', False):
self.to_screen('[download] Download completed')
return
msg_template = '100%%'
if status.get('total_bytes') is not None:
status['_total_bytes_str'] = format_bytes(status['total_bytes'])
msg_template += ' of %(_total_bytes_str)s'
if status.get('elapsed') is not None:
status['_elapsed_str'] = self.formatter.format_seconds(status['elapsed'])
msg_template += ' in %(_elapsed_str)s'
self._report_progress_status(msg_template % status, is_last_line=True)
def _report_downloading(self, status: Dict[str, Any]) -> None:
"""Reporta progresso durante o download."""
status.update({
'_eta_str': self.formatter.format_eta(status.get('eta')),
'_speed_str': self.formatter.format_speed(status.get('speed')),
})
if status.get('total_bytes') and status.get('downloaded_bytes') is not None:
percent = 100 * status['downloaded_bytes'] / status['total_bytes']
status['_percent_str'] = self.formatter.format_percent(percent)
status['_total_bytes_str'] = format_bytes(status['total_bytes'])
msg_template = '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'
else:
msg_template = self._get_progress_template(status)
self._report_progress_status(msg_template % status)
def _get_progress_template(self, status: Dict[str, Any]) -> str:
"""Retorna o template apropriado para o progresso."""
if status.get('total_bytes_estimate') is not None:
status['_total_bytes_estimate_str'] = format_bytes(status['total_bytes_estimate'])
return '%(_percent_str)s of ~%(_total_bytes_estimate_str)s at %(_speed_str)s ETA %(_eta_str)s'
if status.get('downloaded_bytes') is not None:
status['_downloaded_bytes_str'] = format_bytes(status['downloaded_bytes'])
if status.get('elapsed'):
status['_elapsed_str'] = self.formatter.format_seconds(status['elapsed'])
return '%(_downloaded_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'
return '%(_downloaded_bytes_str)s at %(_speed_str)s'
return '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s'
def _report_progress_status(self, msg: str, is_last_line: bool = False) -> None:
"""Reporta o status do progresso."""
fullmsg = '[download] ' + msg
if self.params.get('progress_with_newline', False):
self.to_screen(fullmsg)
return
if compat_os_name == 'nt':
prev_len = getattr(self, '_report_progress_prev_line_length', 0)
if prev_len > len(fullmsg):
fullmsg += ' ' * (prev_len - len(fullmsg))
self._report_progress_prev_line_length = len(fullmsg)
clear_line = '\r'
else:
clear_line = '\r\x1b[K' if sys.stderr.isatty() else '\r'
self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line)
self.to_console_title('youtube-dl ' + msg)
def download(self, filename: str, info_dict: Dict[str, Any]) -> bool:
"""Inicia o download do arquivo.
Args:
filename: Nome do arquivo de destino
info_dict: Dicionário com informações do download
Returns:
bool: True se o download foi bem sucedido, False caso contrário
"""
if self._should_skip_download(filename):
return True
self._handle_sleep_interval()
return self.real_download(filename, info_dict)
def _should_skip_download(self, filename: str) -> bool:
"""Verifica se o download deve ser pulado."""
if hasattr(filename, 'write'):
return False
nooverwrites_and_exists = (
self.params.get('nooverwrites', False)
and os.path.exists(encodeFilename(filename))
)
continuedl_and_exists = (
self.params.get('continuedl', True)
and os.path.isfile(encodeFilename(filename))
and not self.params.get('nopart', False)
)
if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists):
self.report_file_already_downloaded(filename)
self._hook_progress({
'filename': filename,
'status': 'finished',
'total_bytes': os.path.getsize(encodeFilename(filename)),
})
return True
return False
def _handle_sleep_interval(self) -> None:
"""Gerencia o intervalo de sono entre downloads."""
min_sleep_interval = self.params.get('sleep_interval')
if not min_sleep_interval:
return
max_sleep_interval = self.params.get('max_sleep_interval', min_sleep_interval)
sleep_interval = random.uniform(min_sleep_interval, max_sleep_interval)
self.to_screen(
'[download] Sleeping %s seconds...' % (
int(sleep_interval) if sleep_interval.is_integer()
else '%.2f' % sleep_interval))
time.sleep(sleep_interval)
def real_download(self, filename: str, info_dict: Dict[str, Any]) -> bool:
"""Implementação real do download. Deve ser sobrescrita por subclasses."""
raise NotImplementedError('This method must be implemented by subclasses')
# Métodos de utilidade
def to_screen(self, *args: Any, **kwargs: Any) -> None:
"""Envia mensagem para a tela."""
self.ydl.to_screen(*args, **kwargs)
def to_stderr(self, message: str) -> None:
"""Envia mensagem para stderr."""
self.ydl.to_screen(message) self.ydl.to_screen(message)
def to_console_title(self, message): def to_console_title(self, message: str) -> None:
"""Atualiza o título do console."""
self.ydl.to_console_title(message) self.ydl.to_console_title(message)
def trouble(self, *args, **kargs): def trouble(self, *args: Any, **kwargs: Any) -> None:
self.ydl.trouble(*args, **kargs) """Reporta um problema."""
self.ydl.trouble(*args, **kwargs)
def report_warning(self, *args, **kargs): def report_warning(self, *args: Any, **kwargs: Any) -> None:
self.ydl.report_warning(*args, **kargs) """Reporta um aviso."""
self.ydl.report_warning(*args, **kwargs)
def report_error(self, *args, **kargs): def report_error(self, *args: Any, **kwargs: Any) -> None:
self.ydl.report_error(*args, **kargs) """Reporta um erro."""
self.ydl.report_error(*args, **kwargs)
def report_file_already_downloaded(self, file_name):
"""Report file has already been fully downloaded."""
try:
self.to_screen('[download] %s has already been downloaded' % file_name)
except UnicodeEncodeError:
self.to_screen('[download] The file has already been downloaded')
def report_unable_to_resume(self):
"""Report it was impossible to resume download."""
self.to_screen('[download] Unable to resume')
def slow_down(self, start_time, now, byte_counter): def slow_down(self, start_time, now, byte_counter):
"""Sleep if the download speed is over the rate limit.""" """Sleep if the download speed is over the rate limit."""
@ -238,83 +340,6 @@ class FileDownloader(object):
"""Report destination filename.""" """Report destination filename."""
self.to_screen('[download] Destination: ' + filename) self.to_screen('[download] Destination: ' + filename)
def _report_progress_status(self, msg, is_last_line=False):
fullmsg = '[download] ' + msg
if self.params.get('progress_with_newline', False):
self.to_screen(fullmsg)
else:
if compat_os_name == 'nt':
prev_len = getattr(self, '_report_progress_prev_line_length',
0)
if prev_len > len(fullmsg):
fullmsg += ' ' * (prev_len - len(fullmsg))
self._report_progress_prev_line_length = len(fullmsg)
clear_line = '\r'
else:
clear_line = ('\r\x1b[K' if sys.stderr.isatty() else '\r')
self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line)
self.to_console_title('youtube-dl ' + msg)
def report_progress(self, s):
if s['status'] == 'finished':
if self.params.get('noprogress', False):
self.to_screen('[download] Download completed')
else:
msg_template = '100%%'
if s.get('total_bytes') is not None:
s['_total_bytes_str'] = format_bytes(s['total_bytes'])
msg_template += ' of %(_total_bytes_str)s'
if s.get('elapsed') is not None:
s['_elapsed_str'] = self.format_seconds(s['elapsed'])
msg_template += ' in %(_elapsed_str)s'
self._report_progress_status(
msg_template % s, is_last_line=True)
if self.params.get('noprogress'):
return
if s['status'] != 'downloading':
return
if s.get('eta') is not None:
s['_eta_str'] = self.format_eta(s['eta'])
else:
s['_eta_str'] = 'Unknown ETA'
if s.get('total_bytes') and s.get('downloaded_bytes') is not None:
s['_percent_str'] = self.format_percent(100 * s['downloaded_bytes'] / s['total_bytes'])
elif s.get('total_bytes_estimate') and s.get('downloaded_bytes') is not None:
s['_percent_str'] = self.format_percent(100 * s['downloaded_bytes'] / s['total_bytes_estimate'])
else:
if s.get('downloaded_bytes') == 0:
s['_percent_str'] = self.format_percent(0)
else:
s['_percent_str'] = 'Unknown %'
if s.get('speed') is not None:
s['_speed_str'] = self.format_speed(s['speed'])
else:
s['_speed_str'] = 'Unknown speed'
if s.get('total_bytes') is not None:
s['_total_bytes_str'] = format_bytes(s['total_bytes'])
msg_template = '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'
elif s.get('total_bytes_estimate') is not None:
s['_total_bytes_estimate_str'] = format_bytes(s['total_bytes_estimate'])
msg_template = '%(_percent_str)s of ~%(_total_bytes_estimate_str)s at %(_speed_str)s ETA %(_eta_str)s'
else:
if s.get('downloaded_bytes') is not None:
s['_downloaded_bytes_str'] = format_bytes(s['downloaded_bytes'])
if s.get('elapsed'):
s['_elapsed_str'] = self.format_seconds(s['elapsed'])
msg_template = '%(_downloaded_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'
else:
msg_template = '%(_downloaded_bytes_str)s at %(_speed_str)s'
else:
msg_template = '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s'
self._report_progress_status(msg_template % s)
def report_resuming_byte(self, resume_len): def report_resuming_byte(self, resume_len):
"""Report attempt to resume at given byte.""" """Report attempt to resume at given byte."""
self.to_screen('[download] Resuming download at byte %s' % resume_len) self.to_screen('[download] Resuming download at byte %s' % resume_len)
@ -325,72 +350,85 @@ class FileDownloader(object):
'[download] Got server HTTP error: %s. Retrying (attempt %d of %s)...' '[download] Got server HTTP error: %s. Retrying (attempt %d of %s)...'
% (error_to_compat_str(err), count, self.format_retries(retries))) % (error_to_compat_str(err), count, self.format_retries(retries)))
def report_file_already_downloaded(self, file_name): def format_retries(self, retries):
"""Report file has already been fully downloaded.""" return 'inf' if retries == float('inf') else '%.0f' % retries
try:
self.to_screen('[download] %s has already been downloaded' % file_name)
except UnicodeEncodeError:
self.to_screen('[download] The file has already been downloaded')
def report_unable_to_resume(self): def format_eta(self, eta):
"""Report it was impossible to resume download.""" if eta is None:
self.to_screen('[download] Unable to resume') return '--:--'
return self.formatter.format_eta(eta)
def download(self, filename, info_dict): def format_speed(self, speed):
"""Download to a filename using the info from info_dict if speed is None:
Return True on success and False otherwise return '%10s' % '---b/s'
return self.formatter.format_speed(speed)
This method filters the `Cookie` header from the info_dict to prevent leaks. def format_percent(self, percent):
Downloaders have their own way of handling cookies. if percent is None:
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj return '---.-%'
""" return self.formatter.format_percent(percent)
nooverwrites_and_exists = ( def format_seconds(self, seconds):
self.params.get('nooverwrites', False) (mins, secs) = divmod(seconds, 60)
and os.path.exists(encodeFilename(filename)) (hours, mins) = divmod(mins, 60)
) if hours > 99:
return '--:--:--'
if hours == 0:
return '%02d:%02d' % (mins, secs)
else:
return '%02d:%02d:%02d' % (hours, mins, secs)
if not hasattr(filename, 'write'): def calc_percent(self, byte_counter, data_len):
continuedl_and_exists = ( if data_len is None:
self.params.get('continuedl', True) return None
and os.path.isfile(encodeFilename(filename)) return float(byte_counter) / float(data_len) * 100.0
and not self.params.get('nopart', False)
)
# Check file already present def calc_eta(self, start_or_rate, now_or_remaining, *args):
if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists): if len(args) < 2:
self.report_file_already_downloaded(filename) rate, remaining = (start_or_rate, now_or_remaining)
self._hook_progress({ if None in (rate, remaining):
'filename': filename, return None
'status': 'finished', return int(float(remaining) / rate)
'total_bytes': os.path.getsize(encodeFilename(filename)), start, now = (start_or_rate, now_or_remaining)
}) total, current = args[:2]
return True if total is None:
return None
if now is None:
now = time.time()
rate = self.calc_speed(start, now, current)
return rate and int((float(total) - float(current)) / rate)
min_sleep_interval = self.params.get('sleep_interval') def calc_speed(self, start, now, bytes):
if min_sleep_interval: dif = now - start
max_sleep_interval = self.params.get('max_sleep_interval', min_sleep_interval) if bytes == 0 or dif < 0.001: # One millisecond
sleep_interval = random.uniform(min_sleep_interval, max_sleep_interval) return None
self.to_screen( return float(bytes) / dif
'[download] Sleeping %s seconds...' % (
int(sleep_interval) if sleep_interval.is_integer()
else '%.2f' % sleep_interval))
time.sleep(sleep_interval)
return self.real_download(filename, info_dict) def filesize_or_none(self, unencoded_filename):
fn = encodeFilename(unencoded_filename)
if os.path.isfile(fn):
return os.path.getsize(fn)
def real_download(self, filename, info_dict): def best_block_size(self, elapsed_time, bytes):
"""Real download process. Redefine in subclasses.""" new_min = max(bytes / 2.0, 1.0)
raise NotImplementedError('This method must be implemented by subclasses') new_max = min(max(bytes * 2.0, 1.0), 4194304) # Do not surpass 4 MB
if elapsed_time < 0.001:
return int(new_max)
rate = bytes / elapsed_time
if rate > new_max:
return int(new_max)
if rate < new_min:
return int(new_min)
return int(rate)
def _hook_progress(self, status): def parse_bytes(self, bytestr):
for ph in self._progress_hooks: """Parse a string indicating a byte quantity into an integer."""
ph(status) matchobj = re.match(r'(?i)^(\d+(?:\.\d+)?)([kMGTPEZY]?)$', bytestr)
if matchobj is None:
def add_progress_hook(self, ph): return None
# See YoutubeDl.py (search for progress_hooks) for a description of number = float(matchobj.group(1))
# this interface multiplier = 1024.0 ** 'bkmgtpezy'.index(matchobj.group(2).lower())
self._progress_hooks.append(ph) return int(round(number * multiplier))
def _debug_cmd(self, args, exe=None): def _debug_cmd(self, args, exe=None):
if not self.params.get('verbose', False): if not self.params.get('verbose', False):