Bump requests from 2.28.2 to 2.31.0 (#2078)

* Bump requests from 2.28.2 to 2.31.0

Bumps [requests](https://github.com/psf/requests) from 2.28.2 to 2.31.0.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.28.2...v2.31.0)

---
updated-dependencies:
- dependency-name: requests
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update requests==2.31.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2023-08-23 21:40:02 -07:00 committed by GitHub
parent 478d9e6aa5
commit 6b6d43ef43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
54 changed files with 4861 additions and 4958 deletions

View file

@ -51,7 +51,8 @@ license and by oscrypto's:
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import absolute_import
from __future__ import annotations
import contextlib
import ctypes
@ -62,14 +63,18 @@ import socket
import ssl
import struct
import threading
import typing
import warnings
import weakref
import six
from socket import socket as socket_cls
from .. import util
from ..util.ssl_ import PROTOCOL_TLS_CLIENT
from ._securetransport.bindings import CoreFoundation, Security, SecurityConst
from ._securetransport.bindings import ( # type: ignore[attr-defined]
CoreFoundation,
Security,
)
from ._securetransport.low_level import (
SecurityConst,
_assert_no_error,
_build_tls_unknown_ca_alert,
_cert_array_from_pem,
@ -78,18 +83,19 @@ from ._securetransport.low_level import (
_temporary_keychain,
)
try: # Platform-specific: Python 2
from socket import _fileobject
except ImportError: # Platform-specific: Python 3
_fileobject = None
from ..packages.backports.makefile import backport_makefile
warnings.warn(
"'urllib3.contrib.securetransport' module is deprecated and will be removed "
"in urllib3 v2.1.0. Read more in this issue: "
"https://github.com/urllib3/urllib3/issues/2681",
category=DeprecationWarning,
stacklevel=2,
)
if typing.TYPE_CHECKING:
from typing_extensions import Literal
__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
# SNI always works
HAS_SNI = True
orig_util_HAS_SNI = util.HAS_SNI
orig_util_SSLContext = util.ssl_.SSLContext
# This dictionary is used by the read callback to obtain a handle to the
@ -108,55 +114,24 @@ orig_util_SSLContext = util.ssl_.SSLContext
#
# This is good: if we had to lock in the callbacks we'd drastically slow down
# the performance of this code.
_connection_refs = weakref.WeakValueDictionary()
_connection_refs: weakref.WeakValueDictionary[
int, WrappedSocket
] = weakref.WeakValueDictionary()
_connection_ref_lock = threading.Lock()
# Limit writes to 16kB. This is OpenSSL's limit, but we'll cargo-cult it over
# for no better reason than we need *a* limit, and this one is right there.
SSL_WRITE_BLOCKSIZE = 16384
# This is our equivalent of util.ssl_.DEFAULT_CIPHERS, but expanded out to
# individual cipher suites. We need to do this because this is how
# SecureTransport wants them.
CIPHER_SUITES = [
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
SecurityConst.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
SecurityConst.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
SecurityConst.TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,
SecurityConst.TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
SecurityConst.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
SecurityConst.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
SecurityConst.TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,
SecurityConst.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
SecurityConst.TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,
SecurityConst.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
SecurityConst.TLS_AES_256_GCM_SHA384,
SecurityConst.TLS_AES_128_GCM_SHA256,
SecurityConst.TLS_RSA_WITH_AES_256_GCM_SHA384,
SecurityConst.TLS_RSA_WITH_AES_128_GCM_SHA256,
SecurityConst.TLS_AES_128_CCM_8_SHA256,
SecurityConst.TLS_AES_128_CCM_SHA256,
SecurityConst.TLS_RSA_WITH_AES_256_CBC_SHA256,
SecurityConst.TLS_RSA_WITH_AES_128_CBC_SHA256,
SecurityConst.TLS_RSA_WITH_AES_256_CBC_SHA,
SecurityConst.TLS_RSA_WITH_AES_128_CBC_SHA,
]
# Basically this is simple: for PROTOCOL_SSLv23 we turn it into a low of
# TLSv1 and a high of TLSv1.2. For everything else, we pin to that version.
# TLSv1 to 1.2 are supported on macOS 10.8+
_protocol_to_min_max = {
util.PROTOCOL_TLS: (SecurityConst.kTLSProtocol1, SecurityConst.kTLSProtocol12),
PROTOCOL_TLS_CLIENT: (SecurityConst.kTLSProtocol1, SecurityConst.kTLSProtocol12),
util.ssl_.PROTOCOL_TLS: (SecurityConst.kTLSProtocol1, SecurityConst.kTLSProtocol12), # type: ignore[attr-defined]
util.ssl_.PROTOCOL_TLS_CLIENT: ( # type: ignore[attr-defined]
SecurityConst.kTLSProtocol1,
SecurityConst.kTLSProtocol12,
),
}
if hasattr(ssl, "PROTOCOL_SSLv2"):
@ -186,31 +161,38 @@ if hasattr(ssl, "PROTOCOL_TLSv1_2"):
)
def inject_into_urllib3():
_tls_version_to_st: dict[int, int] = {
ssl.TLSVersion.MINIMUM_SUPPORTED: SecurityConst.kTLSProtocol1,
ssl.TLSVersion.TLSv1: SecurityConst.kTLSProtocol1,
ssl.TLSVersion.TLSv1_1: SecurityConst.kTLSProtocol11,
ssl.TLSVersion.TLSv1_2: SecurityConst.kTLSProtocol12,
ssl.TLSVersion.MAXIMUM_SUPPORTED: SecurityConst.kTLSProtocol12,
}
def inject_into_urllib3() -> None:
"""
Monkey-patch urllib3 with SecureTransport-backed SSL-support.
"""
util.SSLContext = SecureTransportContext
util.ssl_.SSLContext = SecureTransportContext
util.HAS_SNI = HAS_SNI
util.ssl_.HAS_SNI = HAS_SNI
util.SSLContext = SecureTransportContext # type: ignore[assignment]
util.ssl_.SSLContext = SecureTransportContext # type: ignore[assignment]
util.IS_SECURETRANSPORT = True
util.ssl_.IS_SECURETRANSPORT = True
def extract_from_urllib3():
def extract_from_urllib3() -> None:
"""
Undo monkey-patching by :func:`inject_into_urllib3`.
"""
util.SSLContext = orig_util_SSLContext
util.ssl_.SSLContext = orig_util_SSLContext
util.HAS_SNI = orig_util_HAS_SNI
util.ssl_.HAS_SNI = orig_util_HAS_SNI
util.IS_SECURETRANSPORT = False
util.ssl_.IS_SECURETRANSPORT = False
def _read_callback(connection_id, data_buffer, data_length_pointer):
def _read_callback(
connection_id: int, data_buffer: int, data_length_pointer: bytearray
) -> int:
"""
SecureTransport read callback. This is called by ST to request that data
be returned from the socket.
@ -232,7 +214,7 @@ def _read_callback(connection_id, data_buffer, data_length_pointer):
while read_count < requested_length:
if timeout is None or timeout >= 0:
if not util.wait_for_read(base_socket, timeout):
raise socket.error(errno.EAGAIN, "timed out")
raise OSError(errno.EAGAIN, "timed out")
remaining = requested_length - read_count
buffer = (ctypes.c_char * remaining).from_address(
@ -244,7 +226,7 @@ def _read_callback(connection_id, data_buffer, data_length_pointer):
if not read_count:
return SecurityConst.errSSLClosedGraceful
break
except (socket.error) as e:
except OSError as e:
error = e.errno
if error is not None and error != errno.EAGAIN:
@ -265,7 +247,9 @@ def _read_callback(connection_id, data_buffer, data_length_pointer):
return SecurityConst.errSSLInternal
def _write_callback(connection_id, data_buffer, data_length_pointer):
def _write_callback(
connection_id: int, data_buffer: int, data_length_pointer: bytearray
) -> int:
"""
SecureTransport write callback. This is called by ST to request that data
actually be sent on the network.
@ -288,14 +272,14 @@ def _write_callback(connection_id, data_buffer, data_length_pointer):
while sent < bytes_to_write:
if timeout is None or timeout >= 0:
if not util.wait_for_write(base_socket, timeout):
raise socket.error(errno.EAGAIN, "timed out")
raise OSError(errno.EAGAIN, "timed out")
chunk_sent = base_socket.send(data)
sent += chunk_sent
# This has some needless copying here, but I'm not sure there's
# much value in optimising this data path.
data = data[chunk_sent:]
except (socket.error) as e:
except OSError as e:
error = e.errno
if error is not None and error != errno.EAGAIN:
@ -323,22 +307,20 @@ _read_callback_pointer = Security.SSLReadFunc(_read_callback)
_write_callback_pointer = Security.SSLWriteFunc(_write_callback)
class WrappedSocket(object):
class WrappedSocket:
"""
API-compatibility wrapper for Python's OpenSSL wrapped socket object.
Note: _makefile_refs, _drop(), and _reuse() are needed for the garbage
collector of PyPy.
"""
def __init__(self, socket):
def __init__(self, socket: socket_cls) -> None:
self.socket = socket
self.context = None
self._makefile_refs = 0
self._io_refs = 0
self._closed = False
self._exception = None
self._real_closed = False
self._exception: Exception | None = None
self._keychain = None
self._keychain_dir = None
self._keychain_dir: str | None = None
self._client_cert_chain = None
# We save off the previously-configured timeout and then set it to
@ -350,7 +332,7 @@ class WrappedSocket(object):
self.socket.settimeout(0)
@contextlib.contextmanager
def _raise_on_error(self):
def _raise_on_error(self) -> typing.Generator[None, None, None]:
"""
A context manager that can be used to wrap calls that do I/O from
SecureTransport. If any of the I/O callbacks hit an exception, this
@ -367,23 +349,10 @@ class WrappedSocket(object):
yield
if self._exception is not None:
exception, self._exception = self._exception, None
self.close()
self._real_close()
raise exception
def _set_ciphers(self):
"""
Sets up the allowed ciphers. By default this matches the set in
util.ssl_.DEFAULT_CIPHERS, at least as supported by macOS. This is done
custom and doesn't allow changing at this time, mostly because parsing
OpenSSL cipher strings is going to be a freaking nightmare.
"""
ciphers = (Security.SSLCipherSuite * len(CIPHER_SUITES))(*CIPHER_SUITES)
result = Security.SSLSetEnabledCiphers(
self.context, ciphers, len(CIPHER_SUITES)
)
_assert_no_error(result)
def _set_alpn_protocols(self, protocols):
def _set_alpn_protocols(self, protocols: list[bytes] | None) -> None:
"""
Sets up the ALPN protocols on the context.
"""
@ -396,7 +365,7 @@ class WrappedSocket(object):
finally:
CoreFoundation.CFRelease(protocols_arr)
def _custom_validate(self, verify, trust_bundle):
def _custom_validate(self, verify: bool, trust_bundle: bytes | None) -> None:
"""
Called when we have set custom validation. We do this in two cases:
first, when cert validation is entirely disabled; and second, when
@ -404,7 +373,7 @@ class WrappedSocket(object):
Raises an SSLError if the connection is not trusted.
"""
# If we disabled cert validation, just say: cool.
if not verify:
if not verify or trust_bundle is None:
return
successes = (
@ -415,10 +384,12 @@ class WrappedSocket(object):
trust_result = self._evaluate_trust(trust_bundle)
if trust_result in successes:
return
reason = "error code: %d" % (trust_result,)
reason = f"error code: {int(trust_result)}"
exc = None
except Exception as e:
# Do not trust on error
reason = "exception: %r" % (e,)
reason = f"exception: {e!r}"
exc = e
# SecureTransport does not send an alert nor shuts down the connection.
rec = _build_tls_unknown_ca_alert(self.version())
@ -428,10 +399,10 @@ class WrappedSocket(object):
# l_linger = 0, linger for 0 seoncds
opts = struct.pack("ii", 1, 0)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, opts)
self.close()
raise ssl.SSLError("certificate verify failed, %s" % reason)
self._real_close()
raise ssl.SSLError(f"certificate verify failed, {reason}") from exc
def _evaluate_trust(self, trust_bundle):
def _evaluate_trust(self, trust_bundle: bytes) -> int:
# We want data in memory, so load it up.
if os.path.isfile(trust_bundle):
with open(trust_bundle, "rb") as f:
@ -469,20 +440,20 @@ class WrappedSocket(object):
if cert_array is not None:
CoreFoundation.CFRelease(cert_array)
return trust_result.value
return trust_result.value # type: ignore[no-any-return]
def handshake(
self,
server_hostname,
verify,
trust_bundle,
min_version,
max_version,
client_cert,
client_key,
client_key_passphrase,
alpn_protocols,
):
server_hostname: bytes | str | None,
verify: bool,
trust_bundle: bytes | None,
min_version: int,
max_version: int,
client_cert: str | None,
client_key: str | None,
client_key_passphrase: typing.Any,
alpn_protocols: list[bytes] | None,
) -> None:
"""
Actually performs the TLS handshake. This is run automatically by
wrapped socket, and shouldn't be needed in user code.
@ -510,6 +481,8 @@ class WrappedSocket(object):
_assert_no_error(result)
# If we have a server hostname, we should set that too.
# RFC6066 Section 3 tells us not to use SNI when the host is an IP, but we have
# to do it anyway to match server_hostname against the server certificate
if server_hostname:
if not isinstance(server_hostname, bytes):
server_hostname = server_hostname.encode("utf-8")
@ -519,9 +492,6 @@ class WrappedSocket(object):
)
_assert_no_error(result)
# Setup the ciphers.
self._set_ciphers()
# Setup the ALPN protocols.
self._set_alpn_protocols(alpn_protocols)
@ -564,25 +534,27 @@ class WrappedSocket(object):
_assert_no_error(result)
break
def fileno(self):
def fileno(self) -> int:
return self.socket.fileno()
# Copy-pasted from Python 3.5 source code
def _decref_socketios(self):
if self._makefile_refs > 0:
self._makefile_refs -= 1
def _decref_socketios(self) -> None:
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
def recv(self, bufsiz):
def recv(self, bufsiz: int) -> bytes:
buffer = ctypes.create_string_buffer(bufsiz)
bytes_read = self.recv_into(buffer, bufsiz)
data = buffer[:bytes_read]
return data
return typing.cast(bytes, data)
def recv_into(self, buffer, nbytes=None):
def recv_into(
self, buffer: ctypes.Array[ctypes.c_char], nbytes: int | None = None
) -> int:
# Read short on EOF.
if self._closed:
if self._real_closed:
return 0
if nbytes is None:
@ -615,7 +587,7 @@ class WrappedSocket(object):
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self.close()
self._real_close()
else:
_assert_no_error(result)
@ -623,13 +595,13 @@ class WrappedSocket(object):
# was actually read.
return processed_bytes.value
def settimeout(self, timeout):
def settimeout(self, timeout: float) -> None:
self._timeout = timeout
def gettimeout(self):
def gettimeout(self) -> float | None:
return self._timeout
def send(self, data):
def send(self, data: bytes) -> int:
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
@ -646,36 +618,38 @@ class WrappedSocket(object):
# We sent, and probably succeeded. Tell them how much we sent.
return processed_bytes.value
def sendall(self, data):
def sendall(self, data: bytes) -> None:
total_sent = 0
while total_sent < len(data):
sent = self.send(data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE])
total_sent += sent
def shutdown(self):
def shutdown(self) -> None:
with self._raise_on_error():
Security.SSLClose(self.context)
def close(self):
def close(self) -> None:
self._closed = True
# TODO: should I do clean shutdown here? Do I have to?
if self._makefile_refs < 1:
self._closed = True
if self.context:
CoreFoundation.CFRelease(self.context)
self.context = None
if self._client_cert_chain:
CoreFoundation.CFRelease(self._client_cert_chain)
self._client_cert_chain = None
if self._keychain:
Security.SecKeychainDelete(self._keychain)
CoreFoundation.CFRelease(self._keychain)
shutil.rmtree(self._keychain_dir)
self._keychain = self._keychain_dir = None
return self.socket.close()
else:
self._makefile_refs -= 1
if self._io_refs <= 0:
self._real_close()
def getpeercert(self, binary_form=False):
def _real_close(self) -> None:
self._real_closed = True
if self.context:
CoreFoundation.CFRelease(self.context)
self.context = None
if self._client_cert_chain:
CoreFoundation.CFRelease(self._client_cert_chain)
self._client_cert_chain = None
if self._keychain:
Security.SecKeychainDelete(self._keychain)
CoreFoundation.CFRelease(self._keychain)
shutil.rmtree(self._keychain_dir)
self._keychain = self._keychain_dir = None
return self.socket.close()
def getpeercert(self, binary_form: bool = False) -> bytes | None:
# Urgh, annoying.
#
# Here's how we do this:
@ -733,7 +707,7 @@ class WrappedSocket(object):
return der_bytes
def version(self):
def version(self) -> str:
protocol = Security.SSLProtocol()
result = Security.SSLGetNegotiatedProtocolVersion(
self.context, ctypes.byref(protocol)
@ -752,55 +726,50 @@ class WrappedSocket(object):
elif protocol.value == SecurityConst.kSSLProtocol2:
return "SSLv2"
else:
raise ssl.SSLError("Unknown TLS version: %r" % protocol)
def _reuse(self):
self._makefile_refs += 1
def _drop(self):
if self._makefile_refs < 1:
self.close()
else:
self._makefile_refs -= 1
raise ssl.SSLError(f"Unknown TLS version: {protocol!r}")
if _fileobject: # Platform-specific: Python 2
def makefile(self, mode, bufsize=-1):
self._makefile_refs += 1
return _fileobject(self, mode, bufsize, close=True)
else: # Platform-specific: Python 3
def makefile(self, mode="r", buffering=None, *args, **kwargs):
# We disable buffering with SecureTransport because it conflicts with
# the buffering that ST does internally (see issue #1153 for more).
buffering = 0
return backport_makefile(self, mode, buffering, *args, **kwargs)
def makefile(
self: socket_cls,
mode: (
Literal["r"] | Literal["w"] | Literal["rw"] | Literal["wr"] | Literal[""]
) = "r",
buffering: int | None = None,
*args: typing.Any,
**kwargs: typing.Any,
) -> typing.BinaryIO | typing.TextIO:
# We disable buffering with SecureTransport because it conflicts with
# the buffering that ST does internally (see issue #1153 for more).
buffering = 0
return socket_cls.makefile(self, mode, buffering, *args, **kwargs)
WrappedSocket.makefile = makefile
WrappedSocket.makefile = makefile # type: ignore[attr-defined]
class SecureTransportContext(object):
class SecureTransportContext:
"""
I am a wrapper class for the SecureTransport library, to translate the
interface of the standard library ``SSLContext`` object to calls into
SecureTransport.
"""
def __init__(self, protocol):
self._min_version, self._max_version = _protocol_to_min_max[protocol]
def __init__(self, protocol: int) -> None:
self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
if protocol not in (None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT):
self._min_version, self._max_version = _protocol_to_min_max[protocol]
self._options = 0
self._verify = False
self._trust_bundle = None
self._client_cert = None
self._client_key = None
self._trust_bundle: bytes | None = None
self._client_cert: str | None = None
self._client_key: str | None = None
self._client_key_passphrase = None
self._alpn_protocols = None
self._alpn_protocols: list[bytes] | None = None
@property
def check_hostname(self):
def check_hostname(self) -> Literal[True]:
"""
SecureTransport cannot have its hostname checking disabled. For more,
see the comment on getpeercert() in this file.
@ -808,15 +777,14 @@ class SecureTransportContext(object):
return True
@check_hostname.setter
def check_hostname(self, value):
def check_hostname(self, value: typing.Any) -> None:
"""
SecureTransport cannot have its hostname checking disabled. For more,
see the comment on getpeercert() in this file.
"""
pass
@property
def options(self):
def options(self) -> int:
# TODO: Well, crap.
#
# So this is the bit of the code that is the most likely to cause us
@ -826,19 +794,19 @@ class SecureTransportContext(object):
return self._options
@options.setter
def options(self, value):
def options(self, value: int) -> None:
# TODO: Update in line with above.
self._options = value
@property
def verify_mode(self):
def verify_mode(self) -> int:
return ssl.CERT_REQUIRED if self._verify else ssl.CERT_NONE
@verify_mode.setter
def verify_mode(self, value):
self._verify = True if value == ssl.CERT_REQUIRED else False
def verify_mode(self, value: int) -> None:
self._verify = value == ssl.CERT_REQUIRED
def set_default_verify_paths(self):
def set_default_verify_paths(self) -> None:
# So, this has to do something a bit weird. Specifically, what it does
# is nothing.
#
@ -850,15 +818,18 @@ class SecureTransportContext(object):
# ignoring it.
pass
def load_default_certs(self):
def load_default_certs(self) -> None:
return self.set_default_verify_paths()
def set_ciphers(self, ciphers):
# For now, we just require the default cipher string.
if ciphers != util.ssl_.DEFAULT_CIPHERS:
raise ValueError("SecureTransport doesn't support custom cipher strings")
def set_ciphers(self, ciphers: typing.Any) -> None:
raise ValueError("SecureTransport doesn't support custom cipher strings")
def load_verify_locations(self, cafile=None, capath=None, cadata=None):
def load_verify_locations(
self,
cafile: str | None = None,
capath: str | None = None,
cadata: bytes | None = None,
) -> None:
# OK, we only really support cadata and cafile.
if capath is not None:
raise ValueError("SecureTransport does not support cert directories")
@ -868,14 +839,19 @@ class SecureTransportContext(object):
with open(cafile):
pass
self._trust_bundle = cafile or cadata
self._trust_bundle = cafile or cadata # type: ignore[assignment]
def load_cert_chain(self, certfile, keyfile=None, password=None):
def load_cert_chain(
self,
certfile: str,
keyfile: str | None = None,
password: str | None = None,
) -> None:
self._client_cert = certfile
self._client_key = keyfile
self._client_cert_passphrase = password
def set_alpn_protocols(self, protocols):
def set_alpn_protocols(self, protocols: list[str | bytes]) -> None:
"""
Sets the ALPN protocols that will later be set on the context.
@ -885,16 +861,16 @@ class SecureTransportContext(object):
raise NotImplementedError(
"SecureTransport supports ALPN only in macOS 10.12+"
)
self._alpn_protocols = [six.ensure_binary(p) for p in protocols]
self._alpn_protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
def wrap_socket(
self,
sock,
server_side=False,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
server_hostname=None,
):
sock: socket_cls,
server_side: bool = False,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
server_hostname: bytes | str | None = None,
) -> WrappedSocket:
# So, what do we do here? Firstly, we assert some properties. This is a
# stripped down shim, so there is some functionality we don't support.
# See PEP 543 for the real deal.
@ -911,11 +887,27 @@ class SecureTransportContext(object):
server_hostname,
self._verify,
self._trust_bundle,
self._min_version,
self._max_version,
_tls_version_to_st[self._minimum_version],
_tls_version_to_st[self._maximum_version],
self._client_cert,
self._client_key,
self._client_key_passphrase,
self._alpn_protocols,
)
return wrapped_socket
@property
def minimum_version(self) -> int:
return self._minimum_version
@minimum_version.setter
def minimum_version(self, minimum_version: int) -> None:
self._minimum_version = minimum_version
@property
def maximum_version(self) -> int:
return self._maximum_version
@maximum_version.setter
def maximum_version(self, maximum_version: int) -> None:
self._maximum_version = maximum_version