Downgraade urllib3==1.26.16

This commit is contained in:
JonnyWong16 2023-08-23 21:52:33 -07:00
commit d6b3ed178e
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
40 changed files with 4877 additions and 4445 deletions

View file

@ -1,21 +1,9 @@
from __future__ import annotations
import io
import socket
import ssl
import typing
from ..exceptions import ProxySchemeUnsupported
if typing.TYPE_CHECKING:
from typing_extensions import Literal
from .ssl_ import _TYPE_PEER_CERT_RET, _TYPE_PEER_CERT_RET_DICT
_SelfT = typing.TypeVar("_SelfT", bound="SSLTransport")
_WriteBuffer = typing.Union[bytearray, memoryview]
_ReturnValue = typing.TypeVar("_ReturnValue")
from ..packages import six
SSL_BLOCKSIZE = 16384
@ -32,7 +20,7 @@ class SSLTransport:
"""
@staticmethod
def _validate_ssl_context_for_tls_in_tls(ssl_context: ssl.SSLContext) -> None:
def _validate_ssl_context_for_tls_in_tls(ssl_context):
"""
Raises a ProxySchemeUnsupported if the provided ssl_context can't be used
for TLS in TLS.
@ -42,18 +30,20 @@ class SSLTransport:
"""
if not hasattr(ssl_context, "wrap_bio"):
raise ProxySchemeUnsupported(
"TLS in TLS requires SSLContext.wrap_bio() which isn't "
"available on non-native SSLContext"
)
if six.PY2:
raise ProxySchemeUnsupported(
"TLS in TLS requires SSLContext.wrap_bio() which isn't "
"supported on Python 2"
)
else:
raise ProxySchemeUnsupported(
"TLS in TLS requires SSLContext.wrap_bio() which isn't "
"available on non-native SSLContext"
)
def __init__(
self,
socket: socket.socket,
ssl_context: ssl.SSLContext,
server_hostname: str | None = None,
suppress_ragged_eofs: bool = True,
) -> None:
self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True
):
"""
Create an SSLTransport around socket using the provided ssl_context.
"""
@ -70,36 +60,33 @@ class SSLTransport:
# Perform initial handshake.
self._ssl_io_loop(self.sslobj.do_handshake)
def __enter__(self: _SelfT) -> _SelfT:
def __enter__(self):
return self
def __exit__(self, *_: typing.Any) -> None:
def __exit__(self, *_):
self.close()
def fileno(self) -> int:
def fileno(self):
return self.socket.fileno()
def read(self, len: int = 1024, buffer: typing.Any | None = None) -> int | bytes:
def read(self, len=1024, buffer=None):
return self._wrap_ssl_read(len, buffer)
def recv(self, buflen: int = 1024, flags: int = 0) -> int | bytes:
def recv(self, len=1024, flags=0):
if flags != 0:
raise ValueError("non-zero flags not allowed in calls to recv")
return self._wrap_ssl_read(buflen)
return self._wrap_ssl_read(len)
def recv_into(
self,
buffer: _WriteBuffer,
nbytes: int | None = None,
flags: int = 0,
) -> None | int | bytes:
def recv_into(self, buffer, nbytes=None, flags=0):
if flags != 0:
raise ValueError("non-zero flags not allowed in calls to recv_into")
if nbytes is None:
if buffer and (nbytes is None):
nbytes = len(buffer)
elif nbytes is None:
nbytes = 1024
return self.read(nbytes, buffer)
def sendall(self, data: bytes, flags: int = 0) -> None:
def sendall(self, data, flags=0):
if flags != 0:
raise ValueError("non-zero flags not allowed in calls to sendall")
count = 0
@ -109,20 +96,15 @@ class SSLTransport:
v = self.send(byte_view[count:])
count += v
def send(self, data: bytes, flags: int = 0) -> int:
def send(self, data, flags=0):
if flags != 0:
raise ValueError("non-zero flags not allowed in calls to send")
return self._ssl_io_loop(self.sslobj.write, data)
response = self._ssl_io_loop(self.sslobj.write, data)
return response
def makefile(
self,
mode: str,
buffering: int | None = None,
*,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
) -> typing.BinaryIO | typing.TextIO | socket.SocketIO:
self, mode="r", buffering=None, encoding=None, errors=None, newline=None
):
"""
Python's httpclient uses makefile and buffered io when reading HTTP
messages and we need to support it.
@ -131,7 +113,7 @@ class SSLTransport:
changes to point to the socket directly.
"""
if not set(mode) <= {"r", "w", "b"}:
raise ValueError(f"invalid mode {mode!r} (only r, w, b allowed)")
raise ValueError("invalid mode %r (only r, w, b allowed)" % (mode,))
writing = "w" in mode
reading = "r" in mode or not writing
@ -142,8 +124,8 @@ class SSLTransport:
rawmode += "r"
if writing:
rawmode += "w"
raw = socket.SocketIO(self, rawmode) # type: ignore[arg-type]
self.socket._io_refs += 1 # type: ignore[attr-defined]
raw = socket.SocketIO(self, rawmode)
self.socket._io_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
@ -152,9 +134,8 @@ class SSLTransport:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
buffer: typing.BinaryIO
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering) # type: ignore[assignment]
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
@ -163,56 +144,46 @@ class SSLTransport:
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode # type: ignore[misc]
text.mode = mode
return text
def unwrap(self) -> None:
def unwrap(self):
self._ssl_io_loop(self.sslobj.unwrap)
def close(self) -> None:
def close(self):
self.socket.close()
@typing.overload
def getpeercert(
self, binary_form: Literal[False] = ...
) -> _TYPE_PEER_CERT_RET_DICT | None:
...
def getpeercert(self, binary_form=False):
return self.sslobj.getpeercert(binary_form)
@typing.overload
def getpeercert(self, binary_form: Literal[True]) -> bytes | None:
...
def getpeercert(self, binary_form: bool = False) -> _TYPE_PEER_CERT_RET:
return self.sslobj.getpeercert(binary_form) # type: ignore[return-value]
def version(self) -> str | None:
def version(self):
return self.sslobj.version()
def cipher(self) -> tuple[str, str, int] | None:
def cipher(self):
return self.sslobj.cipher()
def selected_alpn_protocol(self) -> str | None:
def selected_alpn_protocol(self):
return self.sslobj.selected_alpn_protocol()
def selected_npn_protocol(self) -> str | None:
def selected_npn_protocol(self):
return self.sslobj.selected_npn_protocol()
def shared_ciphers(self) -> list[tuple[str, str, int]] | None:
def shared_ciphers(self):
return self.sslobj.shared_ciphers()
def compression(self) -> str | None:
def compression(self):
return self.sslobj.compression()
def settimeout(self, value: float | None) -> None:
def settimeout(self, value):
self.socket.settimeout(value)
def gettimeout(self) -> float | None:
def gettimeout(self):
return self.socket.gettimeout()
def _decref_socketios(self) -> None:
self.socket._decref_socketios() # type: ignore[attr-defined]
def _decref_socketios(self):
self.socket._decref_socketios()
def _wrap_ssl_read(self, len: int, buffer: bytearray | None = None) -> int | bytes:
def _wrap_ssl_read(self, len, buffer=None):
try:
return self._ssl_io_loop(self.sslobj.read, len, buffer)
except ssl.SSLError as e:
@ -221,32 +192,7 @@ class SSLTransport:
else:
raise
# func is sslobj.do_handshake or sslobj.unwrap
@typing.overload
def _ssl_io_loop(self, func: typing.Callable[[], None]) -> None:
...
# func is sslobj.write, arg1 is data
@typing.overload
def _ssl_io_loop(self, func: typing.Callable[[bytes], int], arg1: bytes) -> int:
...
# func is sslobj.read, arg1 is len, arg2 is buffer
@typing.overload
def _ssl_io_loop(
self,
func: typing.Callable[[int, bytearray | None], bytes],
arg1: int,
arg2: bytearray | None,
) -> bytes:
...
def _ssl_io_loop(
self,
func: typing.Callable[..., _ReturnValue],
arg1: None | bytes | int = None,
arg2: bytearray | None = None,
) -> _ReturnValue:
def _ssl_io_loop(self, func, *args):
"""Performs an I/O loop between incoming/outgoing and the socket."""
should_loop = True
ret = None
@ -254,12 +200,7 @@ class SSLTransport:
while should_loop:
errno = None
try:
if arg1 is None and arg2 is None:
ret = func()
elif arg2 is None:
ret = func(arg1)
else:
ret = func(arg1, arg2)
ret = func(*args)
except ssl.SSLError as e:
if e.errno not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
# WANT_READ, and WANT_WRITE are expected, others are not.
@ -277,4 +218,4 @@ class SSLTransport:
self.incoming.write(buf)
else:
self.incoming.write_eof()
return typing.cast(_ReturnValue, ret)
return ret