Update cheroot-8.5.2

This commit is contained in:
JonnyWong16 2021-10-14 21:14:02 -07:00
parent 4ac151d7de
commit 182e5f553e
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
25 changed files with 2171 additions and 602 deletions

View file

@ -1,4 +1,4 @@
"""Tests for TLS/SSL support."""
"""Tests for TLS support."""
# -*- coding: utf-8 -*-
# vim: set fileencoding=utf-8 :
@ -6,11 +6,14 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import functools
import json
import os
import ssl
import subprocess
import sys
import threading
import time
import traceback
import OpenSSL.SSL
import pytest
@ -21,7 +24,7 @@ import trustme
from .._compat import bton, ntob, ntou
from .._compat import IS_ABOVE_OPENSSL10, IS_PYPY
from .._compat import IS_LINUX, IS_MACOS, IS_WINDOWS
from ..server import Gateway, HTTPServer, get_ssl_adapter_class
from ..server import HTTPServer, get_ssl_adapter_class
from ..testing import (
ANY_INTERFACE_IPV4,
ANY_INTERFACE_IPV6,
@ -30,9 +33,17 @@ from ..testing import (
_get_conn_data,
_probe_ipv6_sock,
)
from ..wsgi import Gateway_10
IS_GITHUB_ACTIONS_WORKFLOW = bool(os.getenv('GITHUB_WORKFLOW'))
IS_WIN2016 = (
IS_WINDOWS
# pylint: disable=unsupported-membership-test
and b'Microsoft Windows Server 2016 Datacenter' in subprocess.check_output(
('systeminfo',),
)
)
IS_LIBRESSL_BACKEND = ssl.OPENSSL_VERSION.startswith('LibreSSL')
IS_PYOPENSSL_SSL_VERSION_1_0 = (
OpenSSL.SSL.SSLeay_version(OpenSSL.SSL.SSLEAY_VERSION).
@ -40,6 +51,7 @@ IS_PYOPENSSL_SSL_VERSION_1_0 = (
)
PY27 = sys.version_info[:2] == (2, 7)
PY34 = sys.version_info[:2] == (3, 4)
PY3 = not six.PY2
_stdlib_to_openssl_verify = {
@ -71,7 +83,7 @@ missing_ipv6 = pytest.mark.skipif(
)
class HelloWorldGateway(Gateway):
class HelloWorldGateway(Gateway_10):
"""Gateway responding with Hello World to root URI."""
def respond(self):
@ -83,11 +95,21 @@ class HelloWorldGateway(Gateway):
req.ensure_headers_sent()
req.write(b'Hello world!')
return
if req_uri == '/env':
req.status = b'200 OK'
req.ensure_headers_sent()
env = self.get_environ()
# drop files so that it can be json dumped
env.pop('wsgi.errors')
env.pop('wsgi.input')
print(env)
req.write(json.dumps(env).encode('utf-8'))
return
return super(HelloWorldGateway, self).respond()
def make_tls_http_server(bind_addr, ssl_adapter, request):
"""Create and start an HTTP server bound to bind_addr."""
"""Create and start an HTTP server bound to ``bind_addr``."""
httpserver = HTTPServer(
bind_addr=bind_addr,
gateway=HelloWorldGateway,
@ -128,7 +150,7 @@ def tls_ca_certificate_pem_path(ca):
def tls_certificate(ca):
"""Provide a leaf certificate via fixture."""
interface, host, port = _get_conn_data(ANY_INTERFACE_IPV4)
return ca.issue_server_cert(ntou(interface), )
return ca.issue_server_cert(ntou(interface))
@pytest.fixture
@ -145,6 +167,43 @@ def tls_certificate_private_key_pem_path(tls_certificate):
yield cert_key_pem
def _thread_except_hook(exceptions, args):
"""Append uncaught exception ``args`` in threads to ``exceptions``."""
if issubclass(args.exc_type, SystemExit):
return
# cannot store the exception, it references the thread's stack
exceptions.append((
args.exc_type,
str(args.exc_value),
''.join(
traceback.format_exception(
args.exc_type, args.exc_value, args.exc_traceback,
),
),
))
@pytest.fixture
def thread_exceptions():
"""Provide a list of uncaught exceptions from threads via a fixture.
Only catches exceptions on Python 3.8+.
The list contains: ``(type, str(value), str(traceback))``
"""
exceptions = []
# Python 3.8+
orig_hook = getattr(threading, 'excepthook', None)
if orig_hook is not None:
threading.excepthook = functools.partial(
_thread_except_hook, exceptions,
)
try:
yield exceptions
finally:
if orig_hook is not None:
threading.excepthook = orig_hook
@pytest.mark.parametrize(
'adapter_type',
(
@ -180,7 +239,7 @@ def test_ssl_adapters(
)
resp = requests.get(
'https://' + interface + ':' + str(port) + '/',
'https://{host!s}:{port!s}/'.format(host=interface, port=port),
verify=tls_ca_certificate_pem_path,
)
@ -188,7 +247,7 @@ def test_ssl_adapters(
assert resp.text == 'Hello world!'
@pytest.mark.parametrize(
@pytest.mark.parametrize( # noqa: C901 # FIXME
'adapter_type',
(
'builtin',
@ -196,7 +255,7 @@ def test_ssl_adapters(
),
)
@pytest.mark.parametrize(
'is_trusted_cert,tls_client_identity',
('is_trusted_cert', 'tls_client_identity'),
(
(True, 'localhost'), (True, '127.0.0.1'),
(True, '*.localhost'), (True, 'not_localhost'),
@ -211,7 +270,7 @@ def test_ssl_adapters(
ssl.CERT_REQUIRED, # server should validate if client cert CA is OK
),
)
def test_tls_client_auth(
def test_tls_client_auth( # noqa: C901 # FIXME
# FIXME: remove twisted logic, separate tests
mocker,
tls_http_server, adapter_type,
@ -265,7 +324,7 @@ def test_tls_client_auth(
make_https_request = functools.partial(
requests.get,
'https://' + interface + ':' + str(port) + '/',
'https://{host!s}:{port!s}/'.format(host=interface, port=port),
# Server TLS certificate verification:
verify=tls_ca_certificate_pem_path,
@ -324,36 +383,200 @@ def test_tls_client_auth(
except AttributeError:
if PY34:
pytest.xfail('OpenSSL behaves wierdly under Python 3.4')
elif not six.PY2 and IS_WINDOWS:
elif IS_WINDOWS or IS_GITHUB_ACTIONS_WORKFLOW:
err_text = str(ssl_err.value)
else:
raise
if isinstance(err_text, int):
err_text = str(ssl_err.value)
expected_substrings = (
'sslv3 alert bad certificate' if IS_LIBRESSL_BACKEND
else 'tlsv1 alert unknown ca',
)
if not six.PY2:
if IS_MACOS and IS_PYPY and adapter_type == 'pyopenssl':
expected_substrings = ('tlsv1 alert unknown ca', )
if (
IS_WINDOWS
and tls_verify_mode in (
ssl.CERT_REQUIRED,
ssl.CERT_OPTIONAL,
)
and not is_trusted_cert
and tls_client_identity == 'localhost'
):
expected_substrings += (
'bad handshake: '
"SysCallError(10054, 'WSAECONNRESET')",
"('Connection aborted.', "
'OSError("(10054, \'WSAECONNRESET\')"))',
expected_substrings = ('tlsv1 alert unknown ca',)
if (
tls_verify_mode in (
ssl.CERT_REQUIRED,
ssl.CERT_OPTIONAL,
)
and not is_trusted_cert
and tls_client_identity == 'localhost'
):
expected_substrings += (
'bad handshake: '
"SysCallError(10054, 'WSAECONNRESET')",
"('Connection aborted.', "
'OSError("(10054, \'WSAECONNRESET\')"))',
"('Connection aborted.', "
'OSError("(10054, \'WSAECONNRESET\')",))',
"('Connection aborted.', "
'error("(10054, \'WSAECONNRESET\')",))',
"('Connection aborted.', "
'ConnectionResetError(10054, '
"'An existing connection was forcibly closed "
"by the remote host', None, 10054, None))",
) if IS_WINDOWS else (
"('Connection aborted.', "
'OSError("(104, \'ECONNRESET\')"))',
"('Connection aborted.', "
'OSError("(104, \'ECONNRESET\')",))',
"('Connection aborted.', "
'error("(104, \'ECONNRESET\')",))',
"('Connection aborted.', "
"ConnectionResetError(104, 'Connection reset by peer'))",
"('Connection aborted.', "
"error(104, 'Connection reset by peer'))",
) if (
IS_GITHUB_ACTIONS_WORKFLOW
and IS_LINUX
) else (
"('Connection aborted.', "
"BrokenPipeError(32, 'Broken pipe'))",
)
assert any(e in err_text for e in expected_substrings)
@pytest.mark.parametrize( # noqa: C901 # FIXME
'adapter_type',
(
'builtin',
'pyopenssl',
),
)
@pytest.mark.parametrize(
('tls_verify_mode', 'use_client_cert'),
(
(ssl.CERT_NONE, False),
(ssl.CERT_NONE, True),
(ssl.CERT_OPTIONAL, False),
(ssl.CERT_OPTIONAL, True),
(ssl.CERT_REQUIRED, True),
),
)
def test_ssl_env( # noqa: C901 # FIXME
thread_exceptions,
recwarn,
mocker,
tls_http_server, adapter_type,
ca, tls_verify_mode, tls_certificate,
tls_certificate_chain_pem_path,
tls_certificate_private_key_pem_path,
tls_ca_certificate_pem_path,
use_client_cert,
):
"""Test the SSL environment generated by the SSL adapters."""
interface, _host, port = _get_conn_data(ANY_INTERFACE_IPV4)
with mocker.mock_module.patch(
'idna.core.ulabel',
return_value=ntob('127.0.0.1'),
):
client_cert = ca.issue_cert(ntou('127.0.0.1'))
with client_cert.private_key_and_cert_chain_pem.tempfile() as cl_pem:
tls_adapter_cls = get_ssl_adapter_class(name=adapter_type)
tls_adapter = tls_adapter_cls(
tls_certificate_chain_pem_path,
tls_certificate_private_key_pem_path,
)
if adapter_type == 'pyopenssl':
tls_adapter.context = tls_adapter.get_context()
tls_adapter.context.set_verify(
_stdlib_to_openssl_verify[tls_verify_mode],
lambda conn, cert, errno, depth, preverify_ok: preverify_ok,
)
else:
tls_adapter.context.verify_mode = tls_verify_mode
ca.configure_trust(tls_adapter.context)
tls_certificate.configure_cert(tls_adapter.context)
tlswsgiserver = tls_http_server((interface, port), tls_adapter)
interface, _host, port = _get_conn_data(tlswsgiserver.bind_addr)
resp = requests.get(
'https://' + interface + ':' + str(port) + '/env',
verify=tls_ca_certificate_pem_path,
cert=cl_pem if use_client_cert else None,
)
if PY34 and resp.status_code != 200:
pytest.xfail(
'Python 3.4 has problems with verifying client certs',
)
env = json.loads(resp.content.decode('utf-8'))
# hard coded env
assert env['wsgi.url_scheme'] == 'https'
assert env['HTTPS'] == 'on'
# ensure these are present
for key in {'SSL_VERSION_INTERFACE', 'SSL_VERSION_LIBRARY'}:
assert key in env
# pyOpenSSL generates the env before the handshake completes
if adapter_type == 'pyopenssl':
return
for key in {'SSL_PROTOCOL', 'SSL_CIPHER'}:
assert key in env
# client certificate env
if tls_verify_mode == ssl.CERT_NONE or not use_client_cert:
assert env['SSL_CLIENT_VERIFY'] == 'NONE'
else:
assert env['SSL_CLIENT_VERIFY'] == 'SUCCESS'
with open(cl_pem, 'rt') as f:
assert env['SSL_CLIENT_CERT'] in f.read()
for key in {
'SSL_CLIENT_M_VERSION', 'SSL_CLIENT_M_SERIAL',
'SSL_CLIENT_I_DN', 'SSL_CLIENT_S_DN',
}:
assert key in env
# builtin ssl environment generation may use a loopback socket
# ensure no ResourceWarning was raised during the test
# NOTE: python 2.7 does not emit ResourceWarning for ssl sockets
if IS_PYPY:
# NOTE: PyPy doesn't have ResourceWarning
# Ref: https://doc.pypy.org/en/latest/cpython_differences.html
return
for warn in recwarn:
if not issubclass(warn.category, ResourceWarning):
continue
# the tests can sporadically generate resource warnings
# due to timing issues
# all of these sporadic warnings appear to be about socket.socket
# and have been observed to come from requests connection pool
msg = str(warn.message)
if 'socket.socket' in msg:
pytest.xfail(
'\n'.join((
'Sometimes this test fails due to '
'a socket.socket ResourceWarning:',
msg,
)),
)
pytest.fail(msg)
# to perform the ssl handshake over that loopback socket,
# the builtin ssl environment generation uses a thread
for _, _, trace in thread_exceptions:
print(trace, file=sys.stderr)
assert not thread_exceptions, ': '.join((
thread_exceptions[0][0].__name__,
thread_exceptions[0][1],
))
@pytest.mark.parametrize(
'ip_addr',
(
@ -382,7 +605,16 @@ def test_https_over_http_error(http_server, ip_addr):
@pytest.mark.parametrize(
'adapter_type',
(
'builtin',
pytest.param(
'builtin',
marks=pytest.mark.xfail(
IS_WINDOWS and six.PY2,
raises=requests.exceptions.ConnectionError,
reason='Stdlib `ssl` module behaves weirdly '
'on Windows under Python 2',
strict=False,
),
),
'pyopenssl',
),
)
@ -428,16 +660,41 @@ def test_http_over_https_error(
fqdn = interface
if ip_addr is ANY_INTERFACE_IPV6:
fqdn = '[{}]'.format(fqdn)
fqdn = '[{fqdn}]'.format(**locals())
expect_fallback_response_over_plain_http = (
(adapter_type == 'pyopenssl'
and (IS_ABOVE_OPENSSL10 or not six.PY2))
(
adapter_type == 'pyopenssl'
and (IS_ABOVE_OPENSSL10 or not six.PY2)
)
or PY27
) or (
IS_GITHUB_ACTIONS_WORKFLOW
and IS_WINDOWS
and six.PY2
and not IS_WIN2016
)
if (
IS_GITHUB_ACTIONS_WORKFLOW
and IS_WINDOWS
and six.PY2
and IS_WIN2016
and adapter_type == 'builtin'
and ip_addr is ANY_INTERFACE_IPV6
):
expect_fallback_response_over_plain_http = True
if (
IS_GITHUB_ACTIONS_WORKFLOW
and IS_WINDOWS
and six.PY2
and not IS_WIN2016
and adapter_type == 'builtin'
and ip_addr is not ANY_INTERFACE_IPV6
):
expect_fallback_response_over_plain_http = False
if expect_fallback_response_over_plain_http:
resp = requests.get(
'http://' + fqdn + ':' + str(port) + '/',
'http://{host!s}:{port!s}/'.format(host=fqdn, port=port),
)
assert resp.status_code == 400
assert resp.text == (
@ -448,7 +705,7 @@ def test_http_over_https_error(
with pytest.raises(requests.exceptions.ConnectionError) as ssl_err:
requests.get( # FIXME: make stdlib ssl behave like PyOpenSSL
'http://' + fqdn + ':' + str(port) + '/',
'http://{host!s}:{port!s}/'.format(host=fqdn, port=port),
)
if IS_LINUX:
@ -468,7 +725,7 @@ def test_http_over_https_error(
underlying_error = ssl_err.value.args[0].args[-1]
err_text = str(underlying_error)
assert underlying_error.errno == expected_error_code, (
'The underlying error is {!r}'.
format(underlying_error)
'The underlying error is {underlying_error!r}'.
format(**locals())
)
assert expected_error_text in err_text