Update cherrypy-18.6.1

This commit is contained in:
JonnyWong16 2021-10-14 21:17:18 -07:00
parent b3ae6bd695
commit ebffd124f6
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
57 changed files with 1269 additions and 1509 deletions

View file

@ -1,9 +1,7 @@
"""Checker for CherryPy sites and mounted apps."""
import os
import warnings
import six
from six.moves import builtins
import builtins
import cherrypy
@ -70,14 +68,14 @@ class Checker(object):
def check_site_config_entries_in_app_config(self):
"""Check for mounted Applications that have site-scoped config."""
for sn, app in six.iteritems(cherrypy.tree.apps):
for sn, app in cherrypy.tree.apps.items():
if not isinstance(app, cherrypy.Application):
continue
msg = []
for section, entries in six.iteritems(app.config):
for section, entries in app.config.items():
if section.startswith('/'):
for key, value in six.iteritems(entries):
for key, value in entries.items():
for n in ('engine.', 'server.', 'tree.', 'checker.'):
if key.startswith(n):
msg.append('[%s] %s = %s' %

View file

@ -18,74 +18,33 @@ Instead, use unicode literals (from __future__) and bytes literals
and their .encode/.decode methods as needed.
"""
import re
import sys
import threading
import six
from six.moves import urllib
import http.client
if six.PY3:
def ntob(n, encoding='ISO-8859-1'):
"""Return the given native string as a byte string in the given
encoding.
"""
assert_native(n)
# In Python 3, the native string type is unicode
return n.encode(encoding)
def ntob(n, encoding='ISO-8859-1'):
"""Return the given native string as a byte string in the given
encoding.
"""
assert_native(n)
# In Python 3, the native string type is unicode
return n.encode(encoding)
def ntou(n, encoding='ISO-8859-1'):
"""Return the given native string as a unicode string with the given
encoding.
"""
assert_native(n)
# In Python 3, the native string type is unicode
return n
def tonative(n, encoding='ISO-8859-1'):
"""Return the given string as a native string in the given encoding."""
# In Python 3, the native string type is unicode
if isinstance(n, bytes):
return n.decode(encoding)
return n
else:
# Python 2
def ntob(n, encoding='ISO-8859-1'):
"""Return the given native string as a byte string in the given
encoding.
"""
assert_native(n)
# In Python 2, the native string type is bytes. Assume it's already
# in the given encoding, which for ISO-8859-1 is almost always what
# was intended.
return n
def ntou(n, encoding='ISO-8859-1'):
"""Return the given native string as a unicode string with the given
encoding.
"""
assert_native(n)
# In Python 3, the native string type is unicode
return n
def ntou(n, encoding='ISO-8859-1'):
"""Return the given native string as a unicode string with the given
encoding.
"""
assert_native(n)
# In Python 2, the native string type is bytes.
# First, check for the special encoding 'escape'. The test suite uses
# this to signal that it wants to pass a string with embedded \uXXXX
# escapes, but without having to prefix it with u'' for Python 2,
# but no prefix for Python 3.
if encoding == 'escape':
return six.text_type( # unicode for Python 2
re.sub(r'\\u([0-9a-zA-Z]{4})',
lambda m: six.unichr(int(m.group(1), 16)),
n.decode('ISO-8859-1')))
# Assume it's already in the given encoding, which for ISO-8859-1
# is almost always what was intended.
def tonative(n, encoding='ISO-8859-1'):
"""Return the given string as a native string in the given encoding."""
# In Python 3, the native string type is unicode
if isinstance(n, bytes):
return n.decode(encoding)
def tonative(n, encoding='ISO-8859-1'):
"""Return the given string as a native string in the given encoding."""
# In Python 2, the native string type is bytes.
if isinstance(n, six.text_type): # unicode for Python 2
return n.encode(encoding)
return n
return n
def assert_native(n):
@ -94,69 +53,7 @@ def assert_native(n):
# Some platforms don't expose HTTPSConnection, so handle it separately
HTTPSConnection = getattr(six.moves.http_client, 'HTTPSConnection', None)
HTTPSConnection = getattr(http.client, 'HTTPSConnection', None)
def _unquote_plus_compat(string, encoding='utf-8', errors='replace'):
return urllib.parse.unquote_plus(string).decode(encoding, errors)
def _unquote_compat(string, encoding='utf-8', errors='replace'):
return urllib.parse.unquote(string).decode(encoding, errors)
def _quote_compat(string, encoding='utf-8', errors='replace'):
return urllib.parse.quote(string.encode(encoding, errors))
unquote_plus = urllib.parse.unquote_plus if six.PY3 else _unquote_plus_compat
unquote = urllib.parse.unquote if six.PY3 else _unquote_compat
quote = urllib.parse.quote if six.PY3 else _quote_compat
try:
# Prefer simplejson
import simplejson as json
except ImportError:
import json
json_decode = json.JSONDecoder().decode
_json_encode = json.JSONEncoder().iterencode
if six.PY3:
# Encode to bytes on Python 3
def json_encode(value):
for chunk in _json_encode(value):
yield chunk.encode('utf-8')
else:
json_encode = _json_encode
text_or_bytes = six.text_type, bytes
if sys.version_info >= (3, 3):
Timer = threading.Timer
Event = threading.Event
else:
# Python 3.2 and earlier
Timer = threading._Timer
Event = threading._Event
# html module come in 3.2 version
try:
from html import escape
except ImportError:
from cgi import escape
# html module needed the argument quote=False because in cgi the default
# is False. With quote=True the results differ.
def escape_html(s, escape_quote=False):
"""Replace special characters "&", "<" and ">" to HTML-safe sequences.
When escape_quote=True, escape (') and (") chars.
"""
return escape(s, quote=escape_quote)
text_or_bytes = str, bytes

View file

@ -34,6 +34,7 @@ user:
POST should not raise this error
305 Use Proxy Confirm with the user
307 Temporary Redirect Confirm with the user
308 Permanent Redirect No confirmation
===== ================================= ===========
However, browsers have historically implemented these restrictions poorly;
@ -119,17 +120,15 @@ and not simply return an error message as a result.
import io
import contextlib
import urllib.parse
from sys import exc_info as _exc_info
from traceback import format_exception as _format_exception
from xml.sax import saxutils
import six
from six.moves import urllib
import html
from more_itertools import always_iterable
import cherrypy
from cherrypy._cpcompat import escape_html
from cherrypy._cpcompat import ntob
from cherrypy._cpcompat import tonative
from cherrypy._helper import classproperty
@ -256,7 +255,7 @@ class HTTPRedirect(CherryPyException):
response = cherrypy.serving.response
response.status = status = self.status
if status in (300, 301, 302, 303, 307):
if status in (300, 301, 302, 303, 307, 308):
response.headers['Content-Type'] = 'text/html;charset=utf-8'
# "The ... URI SHOULD be given by the Location field
# in the response."
@ -271,10 +270,11 @@ class HTTPRedirect(CherryPyException):
302: 'This resource resides temporarily at ',
303: 'This resource can be found at ',
307: 'This resource has moved temporarily to ',
308: 'This resource has been moved to ',
}[status]
msg += '<a href=%s>%s</a>.'
msgs = [
msg % (saxutils.quoteattr(u), escape_html(u))
msg % (saxutils.quoteattr(u), html.escape(u, quote=False))
for u in self.urls
]
response.body = ntob('<br />\n'.join(msgs), 'utf-8')
@ -496,11 +496,11 @@ def get_error_page(status, **kwargs):
if kwargs.get('version') is None:
kwargs['version'] = cherrypy.__version__
for k, v in six.iteritems(kwargs):
for k, v in kwargs.items():
if v is None:
kwargs[k] = ''
else:
kwargs[k] = escape_html(kwargs[k])
kwargs[k] = html.escape(kwargs[k], quote=False)
# Use a custom template or callable for the error page?
pages = cherrypy.serving.request.error_page
@ -520,13 +520,13 @@ def get_error_page(status, **kwargs):
if cherrypy.lib.is_iterator(result):
from cherrypy.lib.encoding import UTF8StreamEncoder
return UTF8StreamEncoder(result)
elif isinstance(result, six.text_type):
elif isinstance(result, str):
return result.encode('utf-8')
else:
if not isinstance(result, bytes):
raise ValueError(
'error page function did not '
'return a bytestring, six.text_type or an '
'return a bytestring, str or an '
'iterator - returned object of type %s.'
% (type(result).__name__))
return result

View file

@ -113,8 +113,6 @@ import logging
import os
import sys
import six
import cherrypy
from cherrypy import _cperror
@ -155,11 +153,7 @@ class LogManager(object):
access_log = None
"""The actual :class:`logging.Logger` instance for access messages."""
access_log_format = (
'{h} {l} {u} {t} "{r}" {s} {b} "{f}" "{a}"'
if six.PY3 else
'%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
)
access_log_format = '{h} {l} {u} {t} "{r}" {s} {b} "{f}" "{a}"'
logger_root = None
"""The "top-level" logger name.
@ -254,8 +248,7 @@ class LogManager(object):
status = '-'
else:
status = response.output_status.split(b' ', 1)[0]
if six.PY3:
status = status.decode('ISO-8859-1')
status = status.decode('ISO-8859-1')
atoms = {'h': remote.name or remote.ip,
'l': '-',
@ -270,45 +263,27 @@ class LogManager(object):
'i': request.unique_id,
'z': LazyRfc3339UtcTime(),
}
if six.PY3:
for k, v in atoms.items():
if not isinstance(v, str):
v = str(v)
v = v.replace('"', '\\"').encode('utf8')
# Fortunately, repr(str) escapes unprintable chars, \n, \t, etc
# and backslash for us. All we have to do is strip the quotes.
v = repr(v)[2:-1]
for k, v in atoms.items():
if not isinstance(v, str):
v = str(v)
v = v.replace('"', '\\"').encode('utf8')
# Fortunately, repr(str) escapes unprintable chars, \n, \t, etc
# and backslash for us. All we have to do is strip the quotes.
v = repr(v)[2:-1]
# in python 3.0 the repr of bytes (as returned by encode)
# uses double \'s. But then the logger escapes them yet, again
# resulting in quadruple slashes. Remove the extra one here.
v = v.replace('\\\\', '\\')
# in python 3.0 the repr of bytes (as returned by encode)
# uses double \'s. But then the logger escapes them yet, again
# resulting in quadruple slashes. Remove the extra one here.
v = v.replace('\\\\', '\\')
# Escape double-quote.
atoms[k] = v
# Escape double-quote.
atoms[k] = v
try:
self.access_log.log(
logging.INFO, self.access_log_format.format(**atoms))
except Exception:
self(traceback=True)
else:
for k, v in atoms.items():
if isinstance(v, six.text_type):
v = v.encode('utf8')
elif not isinstance(v, str):
v = str(v)
# Fortunately, repr(str) escapes unprintable chars, \n, \t, etc
# and backslash for us. All we have to do is strip the quotes.
v = repr(v)[1:-1]
# Escape double-quote.
atoms[k] = v.replace('"', '\\"')
try:
self.access_log.log(
logging.INFO, self.access_log_format % atoms)
except Exception:
self(traceback=True)
try:
self.access_log.log(
logging.INFO, self.access_log_format.format(**atoms))
except Exception:
self(traceback=True)
def time(self):
"""Return now() in Apache Common Log Format (no timezone)."""

View file

@ -61,8 +61,6 @@ import os
import re
import sys
import six
from more_itertools import always_iterable
import cherrypy
@ -197,7 +195,7 @@ def handler(req):
path = req.uri
qs = req.args or ''
reqproto = req.protocol
headers = list(six.iteritems(req.headers_in))
headers = list(req.headers_in.copy().items())
rfile = _ReadOnlyRequest(req)
prev = None

View file

@ -115,30 +115,29 @@ except ImportError:
import re
import sys
import tempfile
try:
from urllib import unquote_plus
except ImportError:
def unquote_plus(bs):
"""Bytes version of urllib.parse.unquote_plus."""
bs = bs.replace(b'+', b' ')
atoms = bs.split(b'%')
for i in range(1, len(atoms)):
item = atoms[i]
try:
pct = int(item[:2], 16)
atoms[i] = bytes([pct]) + item[2:]
except ValueError:
pass
return b''.join(atoms)
from urllib.parse import unquote
import six
import cheroot.server
import cherrypy
from cherrypy._cpcompat import ntou, unquote
from cherrypy._cpcompat import ntou
from cherrypy.lib import httputil
def unquote_plus(bs):
"""Bytes version of urllib.parse.unquote_plus."""
bs = bs.replace(b'+', b' ')
atoms = bs.split(b'%')
for i in range(1, len(atoms)):
item = atoms[i]
try:
pct = int(item[:2], 16)
atoms[i] = bytes([pct]) + item[2:]
except ValueError:
pass
return b''.join(atoms)
# ------------------------------- Processors -------------------------------- #
def process_urlencoded(entity):
@ -986,12 +985,6 @@ class RequestBody(Entity):
# add them in here.
request_params = self.request_params
for key, value in self.params.items():
# Python 2 only: keyword arguments must be byte strings (type
# 'str').
if sys.version_info < (3, 0):
if isinstance(key, six.text_type):
key = key.encode('ISO-8859-1')
if key in request_params:
if not isinstance(request_params[key], list):
request_params[key] = [request_params[key]]

View file

@ -1,11 +1,11 @@
import sys
import time
import collections
import operator
from http.cookies import SimpleCookie, CookieError
import uuid
import six
from six.moves.http_cookies import SimpleCookie, CookieError
from more_itertools import consume
import cherrypy
@ -92,28 +92,36 @@ class HookMap(dict):
def run(self, point):
"""Execute all registered Hooks (callbacks) for the given point."""
exc = None
hooks = self[point]
hooks.sort()
self.run_hooks(iter(sorted(self[point])))
@classmethod
def run_hooks(cls, hooks):
"""Execute the indicated hooks, trapping errors.
Hooks with ``.failsafe == True`` are guaranteed to run
even if others at the same hookpoint fail. In this case,
log the failure and proceed on to the next hook. The only
way to stop all processing from one of these hooks is
to raise a BaseException like SystemExit or
KeyboardInterrupt and stop the whole server.
"""
assert isinstance(hooks, collections.abc.Iterator)
quiet_errors = (
cherrypy.HTTPError,
cherrypy.HTTPRedirect,
cherrypy.InternalRedirect,
)
safe = filter(operator.attrgetter('failsafe'), hooks)
for hook in hooks:
# Some hooks are guaranteed to run even if others at
# the same hookpoint fail. We will still log the failure,
# but proceed on to the next hook. The only way
# to stop all processing from one of these hooks is
# to raise SystemExit and stop the whole server.
if exc is None or hook.failsafe:
try:
hook()
except (KeyboardInterrupt, SystemExit):
raise
except (cherrypy.HTTPError, cherrypy.HTTPRedirect,
cherrypy.InternalRedirect):
exc = sys.exc_info()[1]
except Exception:
exc = sys.exc_info()[1]
cherrypy.log(traceback=True, severity=40)
if exc:
raise exc
try:
hook()
except quiet_errors:
cls.run_hooks(safe)
raise
except Exception:
cherrypy.log(traceback=True, severity=40)
cls.run_hooks(safe)
raise
def __copy__(self):
newmap = self.__class__()
@ -141,7 +149,7 @@ def hooks_namespace(k, v):
# hookpoint per path (e.g. "hooks.before_handler.1").
# Little-known fact you only get from reading source ;)
hookpoint = k.split('.', 1)[0]
if isinstance(v, six.string_types):
if isinstance(v, str):
v = cherrypy.lib.reprconf.attributes(v)
if not isinstance(v, Hook):
v = Hook(v)
@ -704,12 +712,6 @@ class Request(object):
'strings for this resource must be encoded with %r.' %
self.query_string_encoding)
# Python 2 only: keyword arguments must be byte strings (type 'str').
if six.PY2:
for key, value in p.items():
if isinstance(key, six.text_type):
del p[key]
p[key.encode(self.query_string_encoding)] = value
self.params.update(p)
def process_headers(self):
@ -786,11 +788,11 @@ class ResponseBody(object):
def __set__(self, obj, value):
# Convert the given value to an iterable object.
if isinstance(value, six.text_type):
if isinstance(value, str):
raise ValueError(self.unicode_err)
elif isinstance(value, list):
# every item in a list must be bytes...
if any(isinstance(item, six.text_type) for item in value):
if any(isinstance(item, str) for item in value):
raise ValueError(self.unicode_err)
obj._body = encoding.prepare_iter(value)
@ -903,9 +905,9 @@ class Response(object):
if cookie:
for line in cookie.split('\r\n'):
name, value = line.split(': ', 1)
if isinstance(name, six.text_type):
if isinstance(name, str):
name = name.encode('ISO-8859-1')
if isinstance(value, six.text_type):
if isinstance(value, str):
value = headers.encode(value)
h.append((name, value))

View file

@ -1,7 +1,5 @@
"""Manage HTTP servers with CherryPy."""
import six
import cherrypy
from cherrypy.lib.reprconf import attributes
from cherrypy._cpcompat import text_or_bytes
@ -116,21 +114,12 @@ class Server(ServerAdapter):
ssl_ciphers = None
"""The ciphers list of SSL."""
if six.PY3:
ssl_module = 'builtin'
"""The name of a registered SSL adaptation module to use with
the builtin WSGI server. Builtin options are: 'builtin' (to
use the SSL library built into recent versions of Python).
You may also register your own classes in the
cheroot.server.ssl_adapters dict."""
else:
ssl_module = 'pyopenssl'
"""The name of a registered SSL adaptation module to use with the
builtin WSGI server. Builtin options are 'builtin' (to use the SSL
library built into recent versions of Python) and 'pyopenssl' (to
use the PyOpenSSL project, which you must install separately). You
may also register your own classes in the cheroot.server.ssl_adapters
dict."""
ssl_module = 'builtin'
"""The name of a registered SSL adaptation module to use with
the builtin WSGI server. Builtin options are: 'builtin' (to
use the SSL library built into recent versions of Python).
You may also register your own classes in the
cheroot.server.ssl_adapters dict."""
statistics = False
"""Turns statistics-gathering on or off for aware HTTP servers."""

View file

@ -22,8 +22,6 @@ Tools may be implemented as any object with a namespace. The builtins
are generally either modules or instances of the tools.Tool class.
"""
import six
import cherrypy
from cherrypy._helper import expose
@ -37,14 +35,9 @@ def _getargs(func):
"""Return the names of all static arguments to the given function."""
# Use this instead of importing inspect for less mem overhead.
import types
if six.PY3:
if isinstance(func, types.MethodType):
func = func.__func__
co = func.__code__
else:
if isinstance(func, types.MethodType):
func = func.im_func
co = func.func_code
if isinstance(func, types.MethodType):
func = func.__func__
co = func.__code__
return co.co_varnames[:co.co_argcount]

View file

@ -2,10 +2,7 @@
import os
import six
import cherrypy
from cherrypy._cpcompat import ntou
from cherrypy import _cpconfig, _cplogging, _cprequest, _cpwsgi, tools
from cherrypy.lib import httputil, reprconf
@ -289,8 +286,6 @@ class Tree(object):
# to '' (some WSGI servers always set SCRIPT_NAME to '').
# Try to look up the app using the full path.
env1x = environ
if six.PY2 and environ.get(ntou('wsgi.version')) == (ntou('u'), 0):
env1x = _cpwsgi.downgrade_wsgi_ux_to_1x(environ)
path = httputil.urljoin(env1x.get('SCRIPT_NAME', ''),
env1x.get('PATH_INFO', ''))
sn = self.script_name(path or '/')
@ -302,12 +297,6 @@ class Tree(object):
# Correct the SCRIPT_NAME and PATH_INFO environ entries.
environ = environ.copy()
if six.PY2 and environ.get(ntou('wsgi.version')) == (ntou('u'), 0):
# Python 2/WSGI u.0: all strings MUST be of type unicode
enc = environ[ntou('wsgi.url_encoding')]
environ[ntou('SCRIPT_NAME')] = sn.decode(enc)
environ[ntou('PATH_INFO')] = path[len(sn.rstrip('/')):].decode(enc)
else:
environ['SCRIPT_NAME'] = sn
environ['PATH_INFO'] = path[len(sn.rstrip('/')):]
environ['SCRIPT_NAME'] = sn
environ['PATH_INFO'] = path[len(sn.rstrip('/')):]
return app(environ, start_response)

View file

@ -10,8 +10,6 @@ still be translatable to bytes via the Latin-1 encoding!"
import sys as _sys
import io
import six
import cherrypy as _cherrypy
from cherrypy._cpcompat import ntou
from cherrypy import _cperror
@ -25,10 +23,10 @@ def downgrade_wsgi_ux_to_1x(environ):
env1x = {}
url_encoding = environ[ntou('wsgi.url_encoding')]
for k, v in list(environ.items()):
for k, v in environ.copy().items():
if k in [ntou('PATH_INFO'), ntou('SCRIPT_NAME'), ntou('QUERY_STRING')]:
v = v.encode(url_encoding)
elif isinstance(v, six.text_type):
elif isinstance(v, str):
v = v.encode('ISO-8859-1')
env1x[k.encode('ISO-8859-1')] = v
@ -177,10 +175,6 @@ class _TrappedResponse(object):
def __next__(self):
return self.trap(next, self.iter_response)
# todo: https://pythonhosted.org/six/#six.Iterator
if six.PY2:
next = __next__
def close(self):
if hasattr(self.response, 'close'):
self.response.close()
@ -198,7 +192,7 @@ class _TrappedResponse(object):
if not _cherrypy.request.show_tracebacks:
tb = ''
s, h, b = _cperror.bare_error(tb)
if six.PY3:
if True:
# What fun.
s = s.decode('ISO-8859-1')
h = [
@ -238,9 +232,6 @@ class AppResponse(object):
def __init__(self, environ, start_response, cpapp):
self.cpapp = cpapp
try:
if six.PY2:
if environ.get(ntou('wsgi.version')) == (ntou('u'), 0):
environ = downgrade_wsgi_ux_to_1x(environ)
self.environ = environ
self.run()
@ -262,7 +253,7 @@ class AppResponse(object):
raise TypeError(tmpl % v)
outheaders.append((k, v))
if six.PY3:
if True:
# According to PEP 3333, when using Python 3, the response
# status and headers must be bytes masquerading as unicode;
# that is, they must be of type "str" but are restricted to
@ -285,10 +276,6 @@ class AppResponse(object):
def __next__(self):
return next(self.iter_response)
# todo: https://pythonhosted.org/six/#six.Iterator
if six.PY2:
next = __next__
def close(self):
"""Close and de-reference the current request and response. (Core)"""
streaming = _cherrypy.serving.response.stream
@ -356,9 +343,6 @@ class AppResponse(object):
}
def recode_path_qs(self, path, qs):
if not six.PY3:
return
# This isn't perfect; if the given PATH_INFO is in the
# wrong encoding, it may fail to match the appropriate config
# section URI. But meh.

View file

@ -1,7 +1,6 @@
"""Helper functions for CP apps."""
import six
from six.moves import urllib
import urllib.parse
from cherrypy._cpcompat import text_or_bytes
@ -26,9 +25,6 @@ def expose(func=None, alias=None):
import sys
import types
decoratable_types = types.FunctionType, types.MethodType, type,
if six.PY2:
# Old-style classes are type types.ClassType.
decoratable_types += types.ClassType,
if isinstance(func, decoratable_types):
if alias is None:
# @expose
@ -87,53 +83,61 @@ def popargs(*args, **kwargs):
This decorator may be used in one of two ways:
As a class decorator:
@cherrypy.popargs('year', 'month', 'day')
class Blog:
def index(self, year=None, month=None, day=None):
#Process the parameters here; any url like
#/, /2009, /2009/12, or /2009/12/31
#will fill in the appropriate parameters.
def create(self):
#This link will still be available at /create. Defined functions
#take precedence over arguments.
.. code-block:: python
@cherrypy.popargs('year', 'month', 'day')
class Blog:
def index(self, year=None, month=None, day=None):
#Process the parameters here; any url like
#/, /2009, /2009/12, or /2009/12/31
#will fill in the appropriate parameters.
def create(self):
#This link will still be available at /create.
#Defined functions take precedence over arguments.
Or as a member of a class:
class Blog:
_cp_dispatch = cherrypy.popargs('year', 'month', 'day')
#...
.. code-block:: python
class Blog:
_cp_dispatch = cherrypy.popargs('year', 'month', 'day')
#...
The handler argument may be used to mix arguments with built in functions.
For instance, the following setup allows different activities at the
day, month, and year level:
class DayHandler:
def index(self, year, month, day):
#Do something with this day; probably list entries
.. code-block:: python
def delete(self, year, month, day):
#Delete all entries for this day
class DayHandler:
def index(self, year, month, day):
#Do something with this day; probably list entries
@cherrypy.popargs('day', handler=DayHandler())
class MonthHandler:
def index(self, year, month):
#Do something with this month; probably list entries
def delete(self, year, month, day):
#Delete all entries for this day
def delete(self, year, month):
#Delete all entries for this month
@cherrypy.popargs('day', handler=DayHandler())
class MonthHandler:
def index(self, year, month):
#Do something with this month; probably list entries
@cherrypy.popargs('month', handler=MonthHandler())
class YearHandler:
def index(self, year):
#Do something with this year
def delete(self, year, month):
#Delete all entries for this month
#...
@cherrypy.popargs('month', handler=MonthHandler())
class YearHandler:
def index(self, year):
#Do something with this year
@cherrypy.popargs('year', handler=YearHandler())
class Root:
def index(self):
#...
@cherrypy.popargs('year', handler=YearHandler())
class Root:
def index(self):
#...
"""
# Since keyword arg comes after *args, we have to process it ourselves
# for lower versions of python.

25
lib/cherrypy/_json.py Normal file
View file

@ -0,0 +1,25 @@
"""
JSON support.
Expose preferred json module as json and provide encode/decode
convenience functions.
"""
try:
# Prefer simplejson
import simplejson as json
except ImportError:
import json
__all__ = ['json', 'encode', 'decode']
decode = json.JSONDecoder().decode
_encode = json.JSONEncoder().iterencode
def encode(value):
"""Encode to bytes."""
for chunk in _encode(value):
yield chunk.encode('utf-8')

View file

@ -70,6 +70,11 @@ class file_generator(object):
raise StopIteration()
next = __next__
def __del__(self):
"""Close input on descturct."""
if hasattr(self.input, 'close'):
self.input.close()
def file_generator_limited(fileobj, count, chunk_size=65536):
"""Yield the given file object in chunks.

View file

@ -23,8 +23,7 @@ of plaintext passwords as the credentials store::
import time
import functools
from hashlib import md5
from six.moves.urllib.request import parse_http_list, parse_keqv_list
from urllib.request import parse_http_list, parse_keqv_list
import cherrypy
from cherrypy._cpcompat import ntob, tonative

View file

@ -37,11 +37,8 @@ import sys
import threading
import time
import six
import cherrypy
from cherrypy.lib import cptools, httputil
from cherrypy._cpcompat import Event
class Cache(object):
@ -82,7 +79,7 @@ class AntiStampedeCache(dict):
If timeout is None, no waiting is performed nor sentinels used.
"""
value = self.get(key)
if isinstance(value, Event):
if isinstance(value, threading.Event):
if timeout is None:
# Ignore the other thread and recalc it ourselves.
if debug:
@ -122,7 +119,7 @@ class AntiStampedeCache(dict):
"""Set the cached value for the given key."""
existing = self.get(key)
dict.__setitem__(self, key, value)
if isinstance(existing, Event):
if isinstance(existing, threading.Event):
# Set Event.result so other threads waiting on it have
# immediate access without needing to poll the cache again.
existing.result = value
@ -199,8 +196,7 @@ class MemoryCache(Cache):
now = time.time()
# Must make a copy of expirations so it doesn't change size
# during iteration
items = list(six.iteritems(self.expirations))
for expiration_time, objects in items:
for expiration_time, objects in self.expirations.copy().items():
if expiration_time <= now:
for obj_size, uri, sel_header_values in objects:
try:

View file

@ -25,8 +25,7 @@ import sys
import cgi
import os
import os.path
from six.moves import urllib
import urllib.parse
import cherrypy

View file

@ -193,10 +193,8 @@ import sys
import threading
import time
import six
import cherrypy
from cherrypy._cpcompat import json
from cherrypy._json import json
# ------------------------------- Statistics -------------------------------- #
@ -207,7 +205,7 @@ if not hasattr(logging, 'statistics'):
def extrapolate_statistics(scope):
"""Return an extrapolated copy of the given scope."""
c = {}
for k, v in list(scope.items()):
for k, v in scope.copy().items():
if isinstance(v, dict):
v = extrapolate_statistics(v)
elif isinstance(v, (list, tuple)):
@ -366,8 +364,8 @@ class StatsTool(cherrypy.Tool):
w['Bytes Written'] = cl
appstats['Total Bytes Written'] += cl
w['Response Status'] = getattr(
resp, 'output_status', None) or resp.status
w['Response Status'] = \
getattr(resp, 'output_status', resp.status).decode()
w['End Time'] = time.time()
p = w['End Time'] - w['Start Time']
@ -613,7 +611,7 @@ table.stats2 th {
"""Return ([headers], [rows]) for the given collection."""
# E.g., the 'Requests' dict.
headers = []
vals = six.itervalues(v)
vals = v.values()
for record in vals:
for k3 in record:
format = formatting.get(k3, missing)
@ -679,7 +677,7 @@ table.stats2 th {
def data(self):
s = extrapolate_statistics(logging.statistics)
cherrypy.response.headers['Content-Type'] = 'application/json'
return json.dumps(s, sort_keys=True, indent=4)
return json.dumps(s, sort_keys=True, indent=4).encode('utf-8')
@cherrypy.expose
def pause(self, namespace):

View file

@ -3,9 +3,7 @@
import logging
import re
from hashlib import md5
import six
from six.moves import urllib
import urllib.parse
import cherrypy
from cherrypy._cpcompat import text_or_bytes
@ -307,7 +305,7 @@ class SessionAuth(object):
def login_screen(self, from_page='..', username='', error_msg='',
**kwargs):
return (six.text_type("""<html><body>
return (str("""<html><body>
Message: %(error_msg)s
<form method="post" action="do_login">
Login: <input type="text" name="username" value="%(username)s" size="10" />
@ -406,23 +404,22 @@ Message: %(error_msg)s
def session_auth(**kwargs):
"""Session authentication hook.
Any attribute of the SessionAuth class may be overridden
via a keyword arg to this function:
""" + '\n '.join(
'{!s}: {!s}'.format(k, type(getattr(SessionAuth, k)).__name__)
for k in dir(SessionAuth)
if not k.startswith('__')
)
sa = SessionAuth()
for k, v in kwargs.items():
setattr(sa, k, v)
return sa.run()
session_auth.__doc__ = (
"""Session authentication hook.
Any attribute of the SessionAuth class may be overridden via a keyword arg
to this function:
""" + '\n'.join(['%s: %s' % (k, type(getattr(SessionAuth, k)).__name__)
for k in dir(SessionAuth) if not k.startswith('__')])
)
def log_traceback(severity=logging.ERROR, debug=False):
"""Write the last error's traceback to the cherrypy error log."""
cherrypy.log('', 'HTTP', severity=severity, traceback=True)

View file

@ -2,8 +2,6 @@ import struct
import time
import io
import six
import cherrypy
from cherrypy._cpcompat import text_or_bytes
from cherrypy.lib import file_generator
@ -11,6 +9,10 @@ from cherrypy.lib import is_closable_iterator
from cherrypy.lib import set_vary_header
_COMPRESSION_LEVEL_FAST = 1
_COMPRESSION_LEVEL_BEST = 9
def decode(encoding=None, default_encoding='utf-8'):
"""Replace or extend the list of charsets used to decode a request entity.
@ -50,7 +52,7 @@ class UTF8StreamEncoder:
def __next__(self):
res = next(self._iterator)
if isinstance(res, six.text_type):
if isinstance(res, str):
res = res.encode('utf-8')
return res
@ -99,7 +101,7 @@ class ResponseEncoder:
def encoder(body):
for chunk in body:
if isinstance(chunk, six.text_type):
if isinstance(chunk, str):
chunk = chunk.encode(encoding, self.errors)
yield chunk
self.body = encoder(self.body)
@ -112,7 +114,7 @@ class ResponseEncoder:
self.attempted_charsets.add(encoding)
body = []
for chunk in self.body:
if isinstance(chunk, six.text_type):
if isinstance(chunk, str):
try:
chunk = chunk.encode(encoding, self.errors)
except (LookupError, UnicodeError):
@ -287,13 +289,29 @@ def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
import zlib
# See http://www.gzip.org/zlib/rfc-gzip.html
# See https://tools.ietf.org/html/rfc1952
yield b'\x1f\x8b' # ID1 and ID2: gzip marker
yield b'\x08' # CM: compression method
yield b'\x00' # FLG: none set
# MTIME: 4 bytes
yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16))
yield b'\x02' # XFL: max compression, slowest algo
# RFC 1952, section 2.3.1:
#
# XFL (eXtra FLags)
# These flags are available for use by specific compression
# methods. The "deflate" method (CM = 8) sets these flags as
# follows:
#
# XFL = 2 - compressor used maximum compression,
# slowest algorithm
# XFL = 4 - compressor used fastest algorithm
if compress_level == _COMPRESSION_LEVEL_BEST:
yield b'\x02' # XFL: max compression, slowest algo
elif compress_level == _COMPRESSION_LEVEL_FAST:
yield b'\x04' # XFL: min compression, fastest algo
else:
yield b'\x00' # XFL: compression unset/tradeoff
yield b'\xff' # OS: unknown
crc = zlib.crc32(b'')

View file

@ -10,17 +10,17 @@ to a public caning.
import functools
import email.utils
import re
import builtins
from binascii import b2a_base64
from cgi import parse_header
from email.header import decode_header
from http.server import BaseHTTPRequestHandler
from urllib.parse import unquote_plus
import six
from six.moves import range, builtins, map
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
import jaraco.collections
import cherrypy
from cherrypy._cpcompat import ntob, ntou
from cherrypy._cpcompat import unquote_plus
response_codes = BaseHTTPRequestHandler.responses.copy()
@ -143,7 +143,7 @@ class HeaderElement(object):
return self.value < other.value
def __str__(self):
p = [';%s=%s' % (k, v) for k, v in six.iteritems(self.params)]
p = [';%s=%s' % (k, v) for k, v in self.params.items()]
return str('%s%s' % (self.value, ''.join(p)))
def __bytes__(self):
@ -209,14 +209,11 @@ class AcceptElement(HeaderElement):
Ref: https://github.com/cherrypy/cherrypy/issues/1370
"""
six.raise_from(
cherrypy.HTTPError(
400,
'Malformed HTTP header: `{}`'.
format(str(self)),
),
val_err,
)
raise cherrypy.HTTPError(
400,
'Malformed HTTP header: `{}`'.
format(str(self)),
) from val_err
def __cmp__(self, other):
diff = builtins.cmp(self.qvalue, other.qvalue)
@ -283,11 +280,11 @@ def valid_status(status):
If status has no reason-phrase is supplied, a default reason-
phrase will be provided.
>>> from six.moves import http_client
>>> from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
>>> valid_status(http_client.ACCEPTED) == (
... int(http_client.ACCEPTED),
... ) + BaseHTTPRequestHandler.responses[http_client.ACCEPTED]
>>> import http.client
>>> from http.server import BaseHTTPRequestHandler
>>> valid_status(http.client.ACCEPTED) == (
... int(http.client.ACCEPTED),
... ) + BaseHTTPRequestHandler.responses[http.client.ACCEPTED]
True
"""
@ -295,7 +292,7 @@ def valid_status(status):
status = 200
code, reason = status, None
if isinstance(status, six.string_types):
if isinstance(status, str):
code, _, reason = status.partition(' ')
reason = reason.strip() or None
@ -390,77 +387,19 @@ def parse_query_string(query_string, keep_blank_values=True, encoding='utf-8'):
return pm
####
# Inlined from jaraco.collections 1.5.2
# Ref #1673
class KeyTransformingDict(dict):
"""
A dict subclass that transforms the keys before they're used.
Subclasses may override the default transform_key to customize behavior.
"""
@staticmethod
def transform_key(key):
return key
def __init__(self, *args, **kargs):
super(KeyTransformingDict, self).__init__()
# build a dictionary using the default constructs
d = dict(*args, **kargs)
# build this dictionary using transformed keys.
for item in d.items():
self.__setitem__(*item)
def __setitem__(self, key, val):
key = self.transform_key(key)
super(KeyTransformingDict, self).__setitem__(key, val)
def __getitem__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__getitem__(key)
def __contains__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__contains__(key)
def __delitem__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__delitem__(key)
def get(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).get(key, *args, **kwargs)
def setdefault(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).setdefault(
key, *args, **kwargs)
def pop(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).pop(key, *args, **kwargs)
def matching_key_for(self, key):
"""
Given a key, return the actual key stored in self that matches.
Raise KeyError if the key isn't found.
"""
try:
return next(e_key for e_key in self.keys() if e_key == key)
except StopIteration:
raise KeyError(key)
####
class CaseInsensitiveDict(KeyTransformingDict):
class CaseInsensitiveDict(jaraco.collections.KeyTransformingDict):
"""A case-insensitive dict subclass.
Each key is changed on entry to str(key).title().
Each key is changed on entry to title case.
"""
@staticmethod
def transform_key(key):
return str(key).title()
if key is None:
# TODO(#1830): why?
return 'None'
return key.title()
# TEXT = <any OCTET except CTLs, but including LWS>
@ -499,9 +438,7 @@ class HeaderMap(CaseInsensitiveDict):
def elements(self, key):
"""Return a sorted list of HeaderElements for the given header."""
key = str(key).title()
value = self.get(key)
return header_elements(key, value)
return header_elements(self.transform_key(key), self.get(key))
def values(self, key):
"""Return a sorted list of HeaderElement.value for the given header."""
@ -518,15 +455,14 @@ class HeaderMap(CaseInsensitiveDict):
transmitting on the wire for HTTP.
"""
for k, v in header_items:
if not isinstance(v, six.string_types) and \
not isinstance(v, six.binary_type):
v = six.text_type(v)
if not isinstance(v, str) and not isinstance(v, bytes):
v = str(v)
yield tuple(map(cls.encode_header_item, (k, v)))
@classmethod
def encode_header_item(cls, item):
if isinstance(item, six.text_type):
if isinstance(item, str):
item = cls.encode(item)
# See header_translate_* constants above.

View file

@ -1,5 +1,6 @@
import cherrypy
from cherrypy._cpcompat import text_or_bytes, ntou, json_encode, json_decode
from cherrypy import _json as json
from cherrypy._cpcompat import text_or_bytes, ntou
def json_processor(entity):
@ -9,7 +10,7 @@ def json_processor(entity):
body = entity.fp.read()
with cherrypy.HTTPError.handle(ValueError, 400, 'Invalid JSON document'):
cherrypy.serving.request.json = json_decode(body.decode('utf-8'))
cherrypy.serving.request.json = json.decode(body.decode('utf-8'))
def json_in(content_type=[ntou('application/json'), ntou('text/javascript')],
@ -56,7 +57,7 @@ def json_in(content_type=[ntou('application/json'), ntou('text/javascript')],
def json_handler(*args, **kwargs):
value = cherrypy.serving.request._json_inner_handler(*args, **kwargs)
return json_encode(value)
return json.encode(value)
def json_out(content_type='application/json', debug=False,

View file

@ -18,13 +18,13 @@ by adding a named handler to Config.namespaces. The name can be any string,
and the handler must be either a callable or a context manager.
"""
from cherrypy._cpcompat import text_or_bytes
from six.moves import configparser
from six.moves import builtins
import builtins
import configparser
import operator
import sys
from cherrypy._cpcompat import text_or_bytes
class NamespaceSet(dict):
@ -36,7 +36,7 @@ class NamespaceSet(dict):
namespace removed) and the config value.
Namespace handlers may be any Python callable; they may also be
Python 2.5-style 'context managers', in which case their __enter__
context managers, in which case their __enter__
method should return a callable to be used as the handler.
See cherrypy.tools (the Toolbox class) for an example.
"""
@ -61,10 +61,10 @@ class NamespaceSet(dict):
bucket[name] = config[k]
# I chose __enter__ and __exit__ so someday this could be
# rewritten using Python 2.5's 'with' statement:
# for ns, handler in six.iteritems(self):
# rewritten using 'with' statement:
# for ns, handler in self.items():
# with handler as callable:
# for k, v in six.iteritems(ns_confs.get(ns, {})):
# for k, v in ns_confs.get(ns, {}).items():
# callable(k, v)
for ns, handler in self.items():
exit = getattr(handler, '__exit__', None)
@ -211,122 +211,7 @@ class Parser(configparser.ConfigParser):
# public domain "unrepr" implementation, found on the web and then improved.
class _Builder2:
def build(self, o):
m = getattr(self, 'build_' + o.__class__.__name__, None)
if m is None:
raise TypeError('unrepr does not recognize %s' %
repr(o.__class__.__name__))
return m(o)
def astnode(self, s):
"""Return a Python2 ast Node compiled from a string."""
try:
import compiler
except ImportError:
# Fallback to eval when compiler package is not available,
# e.g. IronPython 1.0.
return eval(s)
p = compiler.parse('__tempvalue__ = ' + s)
return p.getChildren()[1].getChildren()[0].getChildren()[1]
def build_Subscript(self, o):
expr, flags, subs = o.getChildren()
expr = self.build(expr)
subs = self.build(subs)
return expr[subs]
def build_CallFunc(self, o):
children = o.getChildren()
# Build callee from first child
callee = self.build(children[0])
# Build args and kwargs from remaining children
args = []
kwargs = {}
for child in children[1:]:
class_name = child.__class__.__name__
# None is ignored
if class_name == 'NoneType':
continue
# Keywords become kwargs
if class_name == 'Keyword':
kwargs.update(self.build(child))
# Everything else becomes args
else:
args.append(self.build(child))
return callee(*args, **kwargs)
def build_Keyword(self, o):
key, value_obj = o.getChildren()
value = self.build(value_obj)
kw_dict = {key: value}
return kw_dict
def build_List(self, o):
return map(self.build, o.getChildren())
def build_Const(self, o):
return o.value
def build_Dict(self, o):
d = {}
i = iter(map(self.build, o.getChildren()))
for el in i:
d[el] = i.next()
return d
def build_Tuple(self, o):
return tuple(self.build_List(o))
def build_Name(self, o):
name = o.name
if name == 'None':
return None
if name == 'True':
return True
if name == 'False':
return False
# See if the Name is a package or module. If it is, import it.
try:
return modules(name)
except ImportError:
pass
# See if the Name is in builtins.
try:
return getattr(builtins, name)
except AttributeError:
pass
raise TypeError('unrepr could not resolve the name %s' % repr(name))
def build_Add(self, o):
left, right = map(self.build, o.getChildren())
return left + right
def build_Mul(self, o):
left, right = map(self.build, o.getChildren())
return left * right
def build_Getattr(self, o):
parent = self.build(o.expr)
return getattr(parent, o.attrname)
def build_NoneType(self, o):
return None
def build_UnarySub(self, o):
return -self.build(o.getChildren()[0])
def build_UnaryAdd(self, o):
return self.build(o.getChildren()[0])
class _Builder3:
class _Builder:
def build(self, o):
m = getattr(self, 'build_' + o.__class__.__name__, None)
@ -441,7 +326,6 @@ class _Builder3:
# See if the Name is in builtins.
try:
import builtins
return getattr(builtins, name)
except AttributeError:
pass
@ -482,10 +366,7 @@ def unrepr(s):
"""Return a Python object compiled from a string."""
if not s:
return s
if sys.version_info < (3, 0):
b = _Builder2()
else:
b = _Builder3()
b = _Builder()
obj = b.astnode(s)
return b.build(obj)

View file

@ -106,10 +106,7 @@ import os
import time
import threading
import binascii
import six
from six.moves import cPickle as pickle
import contextlib2
import pickle
import zc.lockfile
@ -119,10 +116,6 @@ from cherrypy.lib import locking
from cherrypy.lib import is_iterator
if six.PY2:
FileNotFoundError = OSError
missing = object()
@ -410,7 +403,7 @@ class RamSession(Session):
"""Clean up expired sessions."""
now = self.now()
for _id, (data, expiration_time) in list(six.iteritems(self.cache)):
for _id, (data, expiration_time) in self.cache.copy().items():
if expiration_time <= now:
try:
del self.cache[_id]
@ -572,8 +565,6 @@ class FileSession(Session):
def release_lock(self, path=None):
"""Release the lock on the currently-loaded session data."""
self.lock.close()
with contextlib2.suppress(FileNotFoundError):
os.remove(self.lock._path)
self.locked = False
def clean_up(self):
@ -624,7 +615,7 @@ class MemcachedSession(Session):
# This is a separate set of locks per session id.
locks = {}
servers = ['127.0.0.1:11211']
servers = ['localhost:11211']
@classmethod
def setup(cls, **kwargs):

View file

@ -5,12 +5,12 @@ import platform
import re
import stat
import mimetypes
import urllib.parse
import unicodedata
from email.generator import _make_boundary as make_boundary
from io import UnsupportedOperation
from six.moves import urllib
import cherrypy
from cherrypy._cpcompat import ntob
from cherrypy.lib import cptools, httputil, file_generator_limited
@ -29,6 +29,30 @@ def _setup_mimetypes():
_setup_mimetypes()
def _make_content_disposition(disposition, file_name):
"""Create HTTP header for downloading a file with a UTF-8 filename.
This function implements the recommendations of :rfc:`6266#appendix-D`.
See this and related answers: https://stackoverflow.com/a/8996249/2173868.
"""
# As normalization algorithm for `unicodedata` is used composed form (NFC
# and NFKC) with compatibility equivalence criteria (NFK), so "NFKC" is the
# one. It first applies the compatibility decomposition, followed by the
# canonical composition. Should be displayed in the same manner, should be
# treated in the same way by applications such as alphabetizing names or
# searching, and may be substituted for each other.
# See: https://en.wikipedia.org/wiki/Unicode_equivalence.
ascii_name = (
unicodedata.normalize('NFKC', file_name).
encode('ascii', errors='ignore').decode()
)
header = '{}; filename="{}"'.format(disposition, ascii_name)
if ascii_name != file_name:
quoted_name = urllib.parse.quote(file_name)
header += '; filename*=UTF-8\'\'{}'.format(quoted_name)
return header
def serve_file(path, content_type=None, disposition=None, name=None,
debug=False):
"""Set status, headers, and body in order to serve the given path.
@ -38,9 +62,10 @@ def serve_file(path, content_type=None, disposition=None, name=None,
of the 'path' argument.
If disposition is not None, the Content-Disposition header will be set
to "<disposition>; filename=<name>". If name is None, it will be set
to the basename of path. If disposition is None, no Content-Disposition
header will be written.
to "<disposition>; filename=<name>; filename*=utf-8''<name>"
as described in :rfc:`6266#appendix-D`.
If name is None, it will be set to the basename of path.
If disposition is None, no Content-Disposition header will be written.
"""
response = cherrypy.serving.response
@ -93,7 +118,7 @@ def serve_file(path, content_type=None, disposition=None, name=None,
if disposition is not None:
if name is None:
name = os.path.basename(path)
cd = '%s; filename="%s"' % (disposition, name)
cd = _make_content_disposition(disposition, name)
response.headers['Content-Disposition'] = cd
if debug:
cherrypy.log('Content-Disposition: %r' % cd, 'TOOLS.STATIC')
@ -112,9 +137,10 @@ def serve_fileobj(fileobj, content_type=None, disposition=None, name=None,
The Content-Type header will be set to the content_type arg, if provided.
If disposition is not None, the Content-Disposition header will be set
to "<disposition>; filename=<name>". If name is None, 'filename' will
not be set. If disposition is None, no Content-Disposition header will
be written.
to "<disposition>; filename=<name>; filename*=utf-8''<name>"
as described in :rfc:`6266#appendix-D`.
If name is None, 'filename' will not be set.
If disposition is None, no Content-Disposition header will be written.
CAUTION: If the request contains a 'Range' header, one or more seek()s will
be performed on the file object. This may cause undesired behavior if
@ -150,7 +176,7 @@ def serve_fileobj(fileobj, content_type=None, disposition=None, name=None,
if name is None:
cd = disposition
else:
cd = '%s; filename="%s"' % (disposition, name)
cd = _make_content_disposition(disposition, name)
response.headers['Content-Disposition'] = cd
if debug:
cherrypy.log('Content-Disposition: %r' % cd, 'TOOLS.STATIC')

View file

@ -1,7 +1,6 @@
"""XML-RPC tool helpers."""
import sys
from six.moves.xmlrpc_client import (
from xmlrpc.client import (
loads as xmlrpc_loads, dumps as xmlrpc_dumps,
Fault as XMLRPCFault
)

View file

@ -6,11 +6,10 @@ import signal as _signal
import sys
import time
import threading
from six.moves import _thread
import _thread
from cherrypy._cpcompat import text_or_bytes
from cherrypy._cpcompat import ntob, Timer
from cherrypy._cpcompat import ntob
# _module__file__base is used by Autoreload to make
# absolute any filenames retrieved from sys.modules which are not
@ -367,7 +366,7 @@ class Daemonizer(SimplePlugin):
# "The general problem with making fork() work in a multi-threaded
# world is what to do with all of the threads..."
# So we check for active threads:
if threading.activeCount() != 1:
if threading.active_count() != 1:
self.bus.log('There are %r active threads. '
'Daemonizing now may cause strange failures.' %
threading.enumerate(), level=30)
@ -452,7 +451,7 @@ class PIDFile(SimplePlugin):
pass
class PerpetualTimer(Timer):
class PerpetualTimer(threading.Timer):
"""A responsive subclass of threading.Timer whose run() method repeats.
@ -553,7 +552,7 @@ class Monitor(SimplePlugin):
if self.thread is None:
self.thread = BackgroundTask(self.frequency, self.callback,
bus=self.bus)
self.thread.setName(threadname)
self.thread.name = threadname
self.thread.start()
self.bus.log('Started monitor thread %r.' % threadname)
else:
@ -566,8 +565,8 @@ class Monitor(SimplePlugin):
self.bus.log('No thread running for %s.' %
self.name or self.__class__.__name__)
else:
if self.thread is not threading.currentThread():
name = self.thread.getName()
if self.thread is not threading.current_thread():
name = self.thread.name
self.thread.cancel()
if not self.thread.daemon:
self.bus.log('Joining %r' % name)
@ -627,7 +626,10 @@ class Autoreloader(Monitor):
def sysfiles(self):
"""Return a Set of sys.modules filenames to monitor."""
search_mod_names = filter(re.compile(self.match).match, sys.modules)
search_mod_names = filter(
re.compile(self.match).match,
list(sys.modules.keys()),
)
mods = map(sys.modules.get, search_mod_names)
return set(filter(None, map(self._file_for_module, mods)))
@ -690,7 +692,7 @@ class Autoreloader(Monitor):
filename)
self.thread.cancel()
self.bus.log('Stopped thread %r.' %
self.thread.getName())
self.thread.name)
self.bus.restart()
return

View file

@ -178,7 +178,7 @@ class ServerAdapter(object):
import threading
t = threading.Thread(target=self._start_http_thread)
t.setName('HTTPServer ' + t.getName())
t.name = 'HTTPServer ' + t.name
t.start()
self.wait()

View file

@ -20,7 +20,7 @@ class ConsoleCtrlHandler(plugins.SimplePlugin):
def start(self):
if self.is_set:
self.bus.log('Handler for console events already set.', level=40)
self.bus.log('Handler for console events already set.', level=20)
return
result = win32api.SetConsoleCtrlHandler(self.handle, 1)
@ -28,12 +28,12 @@ class ConsoleCtrlHandler(plugins.SimplePlugin):
self.bus.log('Could not SetConsoleCtrlHandler (error %r)' %
win32api.GetLastError(), level=40)
else:
self.bus.log('Set handler for console events.', level=40)
self.bus.log('Set handler for console events.', level=20)
self.is_set = True
def stop(self):
if not self.is_set:
self.bus.log('Handler for console events already off.', level=40)
self.bus.log('Handler for console events already off.', level=20)
return
try:
@ -46,7 +46,7 @@ class ConsoleCtrlHandler(plugins.SimplePlugin):
self.bus.log('Could not remove SetConsoleCtrlHandler (error %r)' %
win32api.GetLastError(), level=40)
else:
self.bus.log('Removed handler for console events.', level=40)
self.bus.log('Removed handler for console events.', level=20)
self.is_set = False
def handle(self, event):

View file

@ -81,7 +81,7 @@ import warnings
import subprocess
import functools
import six
from more_itertools import always_iterable
# Here I save the value of os.getcwd(), which, if I am imported early enough,
@ -356,13 +356,13 @@ class Bus(object):
# implemented as a windows service and in any other case
# that another thread executes cherrypy.engine.exit()
if (
t != threading.currentThread() and
t != threading.current_thread() and
not isinstance(t, threading._MainThread) and
# Note that any dummy (external) threads are
# always daemonic.
not t.daemon
):
self.log('Waiting for thread %s.' % t.getName())
self.log('Waiting for thread %s.' % t.name)
t.join()
if self.execv:
@ -370,10 +370,7 @@ class Bus(object):
def wait(self, state, interval=0.1, channel=None):
"""Poll for the given state(s) at intervals; publish to channel."""
if isinstance(state, (tuple, list)):
states = state
else:
states = [state]
states = set(always_iterable(state))
while self.state not in states:
time.sleep(interval)
@ -436,7 +433,7 @@ class Bus(object):
:seealso: http://stackoverflow.com/a/28414807/595220
"""
try:
char_p = ctypes.c_char_p if six.PY2 else ctypes.c_wchar_p
char_p = ctypes.c_wchar_p
argv = ctypes.POINTER(char_p)()
argc = ctypes.c_int()
@ -573,7 +570,7 @@ class Bus(object):
self.wait(states.STARTED)
func(*a, **kw)
t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
t.setName('Bus Callback ' + t.getName())
t.name = 'Bus Callback ' + t.name
t.start()
self.start()

View file

@ -10,10 +10,10 @@ import sys
import time
import unittest
import warnings
import contextlib
import portend
import pytest
import six
from cheroot.test import webtest
@ -93,8 +93,7 @@ class LocalSupervisor(Supervisor):
cherrypy.engine.exit()
servers_copy = list(six.iteritems(getattr(cherrypy, 'servers', {})))
for name, server in servers_copy:
for name, server in getattr(cherrypy, 'servers', {}).copy().items():
server.unsubscribe()
del cherrypy.servers[name]
@ -311,19 +310,12 @@ class CPWebCase(webtest.WebCase):
def exit(self):
sys.exit()
def getPage(self, url, headers=None, method='GET', body=None,
protocol=None, raise_subcls=None):
"""Open the url. Return status, headers, body.
`raise_subcls` must be a tuple with the exceptions classes
or a single exception class that are not going to be considered
a socket.error regardless that they were are subclass of a
socket.error and therefore not considered for a connection retry.
def getPage(self, url, *args, **kwargs):
"""Open the url.
"""
if self.script_name:
url = httputil.urljoin(self.script_name, url)
return webtest.WebCase.getPage(self, url, headers, method, body,
protocol, raise_subcls)
return webtest.WebCase.getPage(self, url, *args, **kwargs)
def skip(self, msg='skipped '):
pytest.skip(msg)
@ -449,7 +441,7 @@ server.ssl_private_key: r'%s'
'extra': extra,
}
with io.open(self.config_file, 'w', encoding='utf-8') as f:
f.write(six.text_type(conf))
f.write(str(conf))
def start(self, imports=None):
"""Start cherryd in a subprocess."""
@ -523,20 +515,5 @@ server.ssl_private_key: r'%s'
self._proc.wait()
def _join_daemon(self):
try:
try:
# Mac, UNIX
os.wait()
except AttributeError:
# Windows
try:
pid = self.get_pid()
except IOError:
# Assume the subprocess deleted the pidfile on shutdown.
pass
else:
os.waitpid(pid, 0)
except OSError:
x = sys.exc_info()[1]
if x.args != (10, 'No child processes'):
raise
with contextlib.suppress(IOError):
os.waitpid(self.get_pid(), 0)

View file

@ -4,9 +4,9 @@ import sys
import time
from uuid import UUID
import six
import pytest
from cherrypy._cpcompat import text_or_bytes, ntob
from cherrypy._cpcompat import text_or_bytes
try:
@ -45,6 +45,7 @@ class LogCase(object):
unique enough from normal log output to use for marker identification.
"""
interactive = False
logfile = None
lastmarker = None
markerPrefix = b'test suite marker: '
@ -54,7 +55,7 @@ class LogCase(object):
print(' ERROR: %s' % msg)
if not self.interactive:
raise self.failureException(msg)
raise pytest.fail(msg)
p = (' Show: '
'[L]og [M]arker [P]attern; '
@ -86,7 +87,7 @@ class LogCase(object):
# return without raising the normal exception
return
elif i == 'R':
raise self.failureException(msg)
raise pytest.fail(msg)
elif i == 'X':
self.exit()
sys.stdout.write(p + ' ')
@ -105,7 +106,9 @@ class LogCase(object):
self.lastmarker = key
open(self.logfile, 'ab+').write(
ntob('%s%s\n' % (self.markerPrefix, key), 'utf-8'))
b'%s%s\n'
% (self.markerPrefix, key.encode('utf-8'))
)
def _read_marked_region(self, marker=None):
"""Return lines from self.logfile in the marked region.
@ -121,7 +124,7 @@ class LogCase(object):
if marker is None:
return open(logfile, 'rb').readlines()
if isinstance(marker, six.text_type):
if isinstance(marker, str):
marker = marker.encode('utf-8')
data = []
in_region = False
@ -201,7 +204,7 @@ class LogCase(object):
# Single arg. Use __getitem__ and allow lines to be str or list.
if isinstance(lines, (tuple, list)):
lines = lines[0]
if isinstance(lines, six.text_type):
if isinstance(lines, str):
lines = lines.encode('utf-8')
if lines not in data[sliceargs]:
msg = '%r not found on log line %r' % (lines, sliceargs)
@ -221,7 +224,7 @@ class LogCase(object):
start, stop = sliceargs
for line, logline in zip(lines, data[start:stop]):
if isinstance(line, six.text_type):
if isinstance(line, str):
line = line.encode('utf-8')
if line not in logline:
msg = '%r not found in log' % line

View file

@ -9,18 +9,18 @@ create a symlink to them if needed.
KNOWN BUGS
==========
##1. Apache processes Range headers automatically; CherryPy's truncated
## output is then truncated again by Apache. See test_core.testRanges.
## This was worked around in http://www.cherrypy.org/changeset/1319.
1. Apache processes Range headers automatically; CherryPy's truncated
output is then truncated again by Apache. See test_core.testRanges.
This was worked around in http://www.cherrypy.org/changeset/1319.
2. Apache does not allow custom HTTP methods like CONNECT as per the spec.
See test_core.testHTTPMethods.
3. Max request header and body settings do not work with Apache.
##4. Apache replaces status "reason phrases" automatically. For example,
## CherryPy may set "304 Not modified" but Apache will write out
## "304 Not Modified" (capital "M").
##5. Apache does not allow custom error codes as per the spec.
##6. Apache (or perhaps modpython, or modpython_gateway) unquotes %xx in the
## Request-URI too early.
4. Apache replaces status "reason phrases" automatically. For example,
CherryPy may set "304 Not modified" but Apache will write out
"304 Not Modified" (capital "M").
5. Apache does not allow custom error codes as per the spec.
6. Apache (or perhaps modpython, or modpython_gateway) unquotes %xx in the
Request-URI too early.
7. mod_wsgi will not read request bodies which use the "chunked"
transfer-coding (it passes REQUEST_CHUNKED_ERROR to ap_setup_client_block
instead of REQUEST_CHUNKED_DECHUNK, see Apache2's http_protocol.c and

View file

@ -5,8 +5,6 @@ import calendar
from datetime import datetime
import sys
import six
import cherrypy
from cherrypy.lib import sessions
@ -123,7 +121,7 @@ class Root(object):
'changemsg': '<br>'.join(changemsg),
'respcookie': cherrypy.response.cookie.output(),
'reqcookie': cherrypy.request.cookie.output(),
'sessiondata': list(six.iteritems(cherrypy.session)),
'sessiondata': list(cherrypy.session.items()),
'servertime': (
datetime.utcnow().strftime('%Y/%m/%d %H:%M') + ' UTC'
),

View file

@ -2,8 +2,6 @@
# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
import six
import cherrypy
from cherrypy.lib import auth_digest
@ -92,8 +90,7 @@ class DigestAuthTest(helper.CPWebCase):
'cnonce="1522e61005789929"')
encoded_user = username
if six.PY3:
encoded_user = encoded_user.encode('utf-8')
encoded_user = encoded_user.encode('utf-8')
encoded_user = encoded_user.decode('latin1')
auth_header = base_auth % (
encoded_user, realm, nonce, test_uri,

View file

@ -1,274 +1,327 @@
"""Publish-subscribe bus tests."""
# pylint: disable=redefined-outer-name
import os
import sys
import threading
import time
import unittest
import unittest.mock
import pytest
from cherrypy.process import wspbus
msg = 'Listener %d on channel %s: %s.'
CI_ON_MACOS = bool(os.getenv('CI')) and sys.platform == 'darwin'
msg = 'Listener %d on channel %s: %s.' # pylint: disable=invalid-name
class PublishSubscribeTests(unittest.TestCase):
def get_listener(self, channel, index):
def listener(arg=None):
self.responses.append(msg % (index, channel, arg))
return listener
def test_builtin_channels(self):
b = wspbus.Bus()
self.responses, expected = [], []
for channel in b.listeners:
for index, priority in enumerate([100, 50, 0, 51]):
b.subscribe(channel,
self.get_listener(channel, index), priority)
for channel in b.listeners:
b.publish(channel)
expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
b.publish(channel, arg=79347)
expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
self.assertEqual(self.responses, expected)
def test_custom_channels(self):
b = wspbus.Bus()
self.responses, expected = [], []
custom_listeners = ('hugh', 'louis', 'dewey')
for channel in custom_listeners:
for index, priority in enumerate([None, 10, 60, 40]):
b.subscribe(channel,
self.get_listener(channel, index), priority)
for channel in custom_listeners:
b.publish(channel, 'ah so')
expected.extend([msg % (i, channel, 'ah so')
for i in (1, 3, 0, 2)])
b.publish(channel)
expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
self.assertEqual(self.responses, expected)
def test_listener_errors(self):
b = wspbus.Bus()
self.responses, expected = [], []
channels = [c for c in b.listeners if c != 'log']
for channel in channels:
b.subscribe(channel, self.get_listener(channel, 1))
# This will break since the lambda takes no args.
b.subscribe(channel, lambda: None, priority=20)
for channel in channels:
self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
expected.append(msg % (1, channel, 123))
self.assertEqual(self.responses, expected)
@pytest.fixture
def bus():
"""Return a wspbus instance."""
return wspbus.Bus()
class BusMethodTests(unittest.TestCase):
@pytest.fixture
def log_tracker(bus):
"""Return an instance of bus log tracker."""
class LogTracker: # pylint: disable=too-few-public-methods
"""Bus log tracker."""
def log(self, bus):
self._log_entries = []
log_entries = []
def logit(msg, level):
self._log_entries.append(msg)
bus.subscribe('log', logit)
def __init__(self, bus):
def logit(msg, level): # pylint: disable=unused-argument
self.log_entries.append(msg)
bus.subscribe('log', logit)
def assertLog(self, entries):
self.assertEqual(self._log_entries, entries)
return LogTracker(bus)
def get_listener(self, channel, index):
def listener(arg=None):
self.responses.append(msg % (index, channel, arg))
return listener
def test_start(self):
b = wspbus.Bus()
self.log(b)
@pytest.fixture
def listener():
"""Return an instance of bus response tracker."""
class Listner: # pylint: disable=too-few-public-methods
"""Bus handler return value tracker."""
self.responses = []
num = 3
for index in range(num):
b.subscribe('start', self.get_listener('start', index))
responses = []
b.start()
try:
# The start method MUST call all 'start' listeners.
self.assertEqual(
set(self.responses),
set([msg % (i, 'start', None) for i in range(num)]))
# The start method MUST move the state to STARTED
# (or EXITING, if errors occur)
self.assertEqual(b.state, b.states.STARTED)
# The start method MUST log its states.
self.assertLog(['Bus STARTING', 'Bus STARTED'])
finally:
# Exit so the atexit handler doesn't complain.
b.exit()
def get_listener(self, channel, index):
"""Return an argument tracking listener."""
def listener(arg=None):
self.responses.append(msg % (index, channel, arg))
return listener
def test_stop(self):
b = wspbus.Bus()
self.log(b)
return Listner()
self.responses = []
num = 3
for index in range(num):
b.subscribe('stop', self.get_listener('stop', index))
b.stop()
def test_builtin_channels(bus, listener):
"""Test that built-in channels trigger corresponding listeners."""
expected = []
# The stop method MUST call all 'stop' listeners.
self.assertEqual(set(self.responses),
set([msg % (i, 'stop', None) for i in range(num)]))
# The stop method MUST move the state to STOPPED
self.assertEqual(b.state, b.states.STOPPED)
# The stop method MUST log its states.
self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
for channel in bus.listeners:
for index, priority in enumerate([100, 50, 0, 51]):
bus.subscribe(
channel,
listener.get_listener(channel, index),
priority,
)
def test_graceful(self):
b = wspbus.Bus()
self.log(b)
for channel in bus.listeners:
bus.publish(channel)
expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
bus.publish(channel, arg=79347)
expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
self.responses = []
num = 3
for index in range(num):
b.subscribe('graceful', self.get_listener('graceful', index))
assert listener.responses == expected
b.graceful()
# The graceful method MUST call all 'graceful' listeners.
self.assertEqual(
set(self.responses),
set([msg % (i, 'graceful', None) for i in range(num)]))
# The graceful method MUST log its states.
self.assertLog(['Bus graceful'])
def test_custom_channels(bus, listener):
"""Test that custom pub-sub channels work as built-in ones."""
expected = []
def test_exit(self):
b = wspbus.Bus()
self.log(b)
custom_listeners = ('hugh', 'louis', 'dewey')
for channel in custom_listeners:
for index, priority in enumerate([None, 10, 60, 40]):
bus.subscribe(
channel,
listener.get_listener(channel, index),
priority,
)
self.responses = []
num = 3
for index in range(num):
b.subscribe('stop', self.get_listener('stop', index))
b.subscribe('exit', self.get_listener('exit', index))
for channel in custom_listeners:
bus.publish(channel, 'ah so')
expected.extend(msg % (i, channel, 'ah so') for i in (1, 3, 0, 2))
bus.publish(channel)
expected.extend(msg % (i, channel, None) for i in (1, 3, 0, 2))
b.exit()
assert listener.responses == expected
# The exit method MUST call all 'stop' listeners,
# and then all 'exit' listeners.
self.assertEqual(set(self.responses),
set([msg % (i, 'stop', None) for i in range(num)] +
[msg % (i, 'exit', None) for i in range(num)]))
# The exit method MUST move the state to EXITING
self.assertEqual(b.state, b.states.EXITING)
# The exit method MUST log its states.
self.assertLog(
def test_listener_errors(bus, listener):
"""Test that unhandled exceptions raise channel failures."""
expected = []
channels = [c for c in bus.listeners if c != 'log']
for channel in channels:
bus.subscribe(channel, listener.get_listener(channel, 1))
# This will break since the lambda takes no args.
bus.subscribe(channel, lambda: None, priority=20)
for channel in channels:
with pytest.raises(wspbus.ChannelFailures):
bus.publish(channel, 123)
expected.append(msg % (1, channel, 123))
assert listener.responses == expected
def test_start(bus, listener, log_tracker):
"""Test that bus start sequence calls all listeners."""
num = 3
for index in range(num):
bus.subscribe('start', listener.get_listener('start', index))
bus.start()
try:
# The start method MUST call all 'start' listeners.
assert (
set(listener.responses) ==
set(msg % (i, 'start', None) for i in range(num)))
# The start method MUST move the state to STARTED
# (or EXITING, if errors occur)
assert bus.state == bus.states.STARTED
# The start method MUST log its states.
assert log_tracker.log_entries == ['Bus STARTING', 'Bus STARTED']
finally:
# Exit so the atexit handler doesn't complain.
bus.exit()
def test_stop(bus, listener, log_tracker):
"""Test that bus stop sequence calls all listeners."""
num = 3
for index in range(num):
bus.subscribe('stop', listener.get_listener('stop', index))
bus.stop()
# The stop method MUST call all 'stop' listeners.
assert (set(listener.responses) ==
set(msg % (i, 'stop', None) for i in range(num)))
# The stop method MUST move the state to STOPPED
assert bus.state == bus.states.STOPPED
# The stop method MUST log its states.
assert log_tracker.log_entries == ['Bus STOPPING', 'Bus STOPPED']
def test_graceful(bus, listener, log_tracker):
"""Test that bus graceful state triggers all listeners."""
num = 3
for index in range(num):
bus.subscribe('graceful', listener.get_listener('graceful', index))
bus.graceful()
# The graceful method MUST call all 'graceful' listeners.
assert (
set(listener.responses) ==
set(msg % (i, 'graceful', None) for i in range(num)))
# The graceful method MUST log its states.
assert log_tracker.log_entries == ['Bus graceful']
def test_exit(bus, listener, log_tracker):
"""Test that bus exit sequence is correct."""
num = 3
for index in range(num):
bus.subscribe('stop', listener.get_listener('stop', index))
bus.subscribe('exit', listener.get_listener('exit', index))
bus.exit()
# The exit method MUST call all 'stop' listeners,
# and then all 'exit' listeners.
assert (set(listener.responses) ==
set([msg % (i, 'stop', None) for i in range(num)] +
[msg % (i, 'exit', None) for i in range(num)]))
# The exit method MUST move the state to EXITING
assert bus.state == bus.states.EXITING
# The exit method MUST log its states.
assert (log_tracker.log_entries ==
['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
def test_wait(self):
b = wspbus.Bus()
def f(method):
time.sleep(0.2)
getattr(b, method)()
def test_wait(bus):
"""Test that bus wait awaits for states."""
def f(method): # pylint: disable=invalid-name
time.sleep(0.2)
getattr(bus, method)()
for method, states in [('start', [b.states.STARTED]),
('stop', [b.states.STOPPED]),
('start',
[b.states.STARTING, b.states.STARTED]),
('exit', [b.states.EXITING]),
]:
threading.Thread(target=f, args=(method,)).start()
b.wait(states)
flow = [
('start', [bus.states.STARTED]),
('stop', [bus.states.STOPPED]),
('start', [bus.states.STARTING, bus.states.STARTED]),
('exit', [bus.states.EXITING]),
]
# The wait method MUST wait for the given state(s).
if b.state not in states:
self.fail('State %r not in %r' % (b.state, states))
for method, states in flow:
threading.Thread(target=f, args=(method,)).start()
bus.wait(states)
def test_block(self):
b = wspbus.Bus()
self.log(b)
def f():
time.sleep(0.2)
b.exit()
def g():
time.sleep(0.4)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threads = [t for t in threading.enumerate() if not t.daemon]
self.assertEqual(len(threads), 3)
b.block()
# The block method MUST wait for the EXITING state.
self.assertEqual(b.state, b.states.EXITING)
# The block method MUST wait for ALL non-main, non-daemon threads to
# finish.
threads = [t for t in threading.enumerate() if not t.daemon]
self.assertEqual(len(threads), 1)
# The last message will mention an indeterminable thread name; ignore
# it
self.assertEqual(self._log_entries[:-1],
['Bus STOPPING', 'Bus STOPPED',
'Bus EXITING', 'Bus EXITED',
'Waiting for child threads to terminate...'])
def test_start_with_callback(self):
b = wspbus.Bus()
self.log(b)
try:
events = []
def f(*args, **kwargs):
events.append(('f', args, kwargs))
def g():
events.append('g')
b.subscribe('start', g)
b.start_with_callback(f, (1, 3, 5), {'foo': 'bar'})
# Give wait() time to run f()
time.sleep(0.2)
# The callback method MUST wait for the STARTED state.
self.assertEqual(b.state, b.states.STARTED)
# The callback method MUST run after all start methods.
self.assertEqual(events, ['g', ('f', (1, 3, 5), {'foo': 'bar'})])
finally:
b.exit()
def test_log(self):
b = wspbus.Bus()
self.log(b)
self.assertLog([])
# Try a normal message.
expected = []
for msg in ["O mah darlin'"] * 3 + ['Clementiiiiiiiine']:
b.log(msg)
expected.append(msg)
self.assertLog(expected)
# Try an error message
try:
foo
except NameError:
b.log('You are lost and gone forever', traceback=True)
lastmsg = self._log_entries[-1]
if 'Traceback' not in lastmsg or 'NameError' not in lastmsg:
self.fail('Last log message %r did not contain '
'the expected traceback.' % lastmsg)
else:
self.fail('NameError was not raised as expected.')
# The wait method MUST wait for the given state(s).
assert bus.state in states, 'State %r not in %r' % (bus.state, states)
if __name__ == '__main__':
unittest.main()
@pytest.mark.xfail(CI_ON_MACOS, reason='continuous integration on macOS fails')
def test_wait_publishes_periodically(bus):
"""Test that wait publishes each tick."""
callback = unittest.mock.MagicMock()
bus.subscribe('main', callback)
def set_start():
time.sleep(0.05)
bus.start()
threading.Thread(target=set_start).start()
bus.wait(bus.states.STARTED, interval=0.01, channel='main')
assert callback.call_count > 3
def test_block(bus, log_tracker):
"""Test that bus block waits for exiting."""
def f(): # pylint: disable=invalid-name
time.sleep(0.2)
bus.exit()
def g(): # pylint: disable=invalid-name
time.sleep(0.4)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threads = [t for t in threading.enumerate() if not t.daemon]
assert len(threads) == 3
bus.block()
# The block method MUST wait for the EXITING state.
assert bus.state == bus.states.EXITING
# The block method MUST wait for ALL non-main, non-daemon threads to
# finish.
threads = [t for t in threading.enumerate() if not t.daemon]
assert len(threads) == 1
# The last message will mention an indeterminable thread name; ignore
# it
expected_bus_messages = [
'Bus STOPPING',
'Bus STOPPED',
'Bus EXITING',
'Bus EXITED',
'Waiting for child threads to terminate...',
]
bus_msg_num = len(expected_bus_messages)
# If the last message mentions an indeterminable thread name then ignore it
assert log_tracker.log_entries[:bus_msg_num] == expected_bus_messages
assert len(log_tracker.log_entries[bus_msg_num:]) <= 1, (
'No more than one extra log line with the thread name expected'
)
def test_start_with_callback(bus):
"""Test that callback fires on bus start."""
try:
events = []
def f(*args, **kwargs): # pylint: disable=invalid-name
events.append(('f', args, kwargs))
def g(): # pylint: disable=invalid-name
events.append('g')
bus.subscribe('start', g)
bus.start_with_callback(f, (1, 3, 5), {'foo': 'bar'})
# Give wait() time to run f()
time.sleep(0.2)
# The callback method MUST wait for the STARTED state.
assert bus.state == bus.states.STARTED
# The callback method MUST run after all start methods.
assert events == ['g', ('f', (1, 3, 5), {'foo': 'bar'})]
finally:
bus.exit()
def test_log(bus, log_tracker):
"""Test that bus messages and errors are logged."""
assert log_tracker.log_entries == []
# Try a normal message.
expected = []
for msg_ in ["O mah darlin'"] * 3 + ['Clementiiiiiiiine']:
bus.log(msg_)
expected.append(msg_)
assert log_tracker.log_entries == expected
# Try an error message
try:
foo
except NameError:
bus.log('You are lost and gone forever', traceback=True)
lastmsg = log_tracker.log_entries[-1]
assert 'Traceback' in lastmsg and 'NameError' in lastmsg, (
'Last log message %r did not contain '
'the expected traceback.' % lastmsg
)
else:
pytest.fail('NameError was not raised as expected.')

View file

@ -3,9 +3,7 @@ from itertools import count
import os
import threading
import time
from six.moves import range
from six.moves import urllib
import urllib.parse
import pytest
@ -153,7 +151,7 @@ class CacheTest(helper.CPWebCase):
self.assertBody('visit #1')
if trial != 0:
age = int(self.assertHeader('Age'))
self.assert_(age >= elapsed)
assert age >= elapsed
elapsed = age
# POST, PUT, DELETE should not be cached.

View file

@ -1,34 +0,0 @@
"""Test Python 2/3 compatibility module."""
from __future__ import unicode_literals
import unittest
import pytest
import six
from cherrypy import _cpcompat as compat
class StringTester(unittest.TestCase):
"""Tests for string conversion."""
@pytest.mark.skipif(six.PY3, reason='Only useful on Python 2')
def test_ntob_non_native(self):
"""ntob should raise an Exception on unicode.
(Python 2 only)
See #1132 for discussion.
"""
self.assertRaises(TypeError, compat.ntob, 'fight')
class EscapeTester(unittest.TestCase):
"""Class to test escape_html function from _cpcompat."""
def test_escape_quote(self):
"""test_escape_quote - Verify the output for &<>"' chars."""
self.assertEqual(
"""xx&amp;&lt;&gt;"aa'""",
compat.escape_html("""xx&<>"aa'"""),
)

View file

@ -5,8 +5,6 @@ import os
import sys
import unittest
import six
import cherrypy
from cherrypy.test import helper
@ -16,7 +14,7 @@ localDir = os.path.join(os.getcwd(), os.path.dirname(__file__))
def StringIOFromNative(x):
return io.StringIO(six.text_type(x))
return io.StringIO(str(x))
def setup_server():
@ -82,7 +80,7 @@ def setup_server():
def wrapper():
params = cherrypy.request.params
for name, coercer in list(value.items()):
for name, coercer in value.copy().items():
try:
params[name] = coercer(params[name])
except KeyError:
@ -105,18 +103,12 @@ def setup_server():
def incr(self, num):
return num + 1
if not six.PY3:
thing3 = "thing3: unicode('test', errors='ignore')"
else:
thing3 = ''
ioconf = StringIOFromNative("""
[/]
neg: -1234
filename: os.path.join(sys.prefix, "hello.py")
thing1: cherrypy.lib.httputil.response_codes[404]
thing2: __import__('cherrypy.tutorial', globals(), locals(), ['']).thing2
%s
complex: 3+2j
mul: 6*3
ones: "11"
@ -125,7 +117,7 @@ stradd: %%(ones)s + %%(twos)s + "33"
[/favicon.ico]
tools.staticfile.filename = %r
""" % (thing3, os.path.join(localDir, 'static/dirback.jpg')))
""" % os.path.join(localDir, 'static/dirback.jpg'))
root = Root()
root.foo = Foo()
@ -203,10 +195,6 @@ class ConfigTests(helper.CPWebCase):
from cherrypy.tutorial import thing2
self.assertBody(repr(thing2))
if not six.PY3:
self.getPage('/repr?key=thing3')
self.assertBody(repr(six.text_type('test')))
self.getPage('/repr?key=complex')
self.assertBody('(3+2j)')
@ -233,8 +221,8 @@ class ConfigTests(helper.CPWebCase):
# the favicon in the page handler to be '../favicon.ico',
# but then overrode it in config to be './static/dirback.jpg'.
self.getPage('/favicon.ico')
self.assertBody(open(os.path.join(localDir, 'static/dirback.jpg'),
'rb').read())
with open(os.path.join(localDir, 'static/dirback.jpg'), 'rb') as tf:
self.assertBody(tf.read())
def test_request_body_namespace(self):
self.getPage('/plain', method='POST', headers=[

View file

@ -4,12 +4,8 @@ import errno
import socket
import sys
import time
import six
from six.moves import urllib
from six.moves.http_client import BadStatusLine, HTTPConnection, NotConnected
import pytest
import urllib.parse
from http.client import BadStatusLine, HTTPConnection, NotConnected
from cheroot.test import webtest
@ -91,7 +87,7 @@ def setup_server():
body = [body]
newbody = []
for chunk in body:
if isinstance(chunk, six.text_type):
if isinstance(chunk, str):
chunk = chunk.encode('ISO-8859-1')
newbody.append(chunk)
return newbody
@ -354,18 +350,17 @@ class PipelineTests(helper.CPWebCase):
conn._output(ntob('Host: %s' % self.HOST, 'ascii'))
conn._send_output()
response = conn.response_class(conn.sock, method='GET')
msg = (
"Writing to timed out socket didn't fail as it should have: %s")
try:
response.begin()
except Exception:
if not isinstance(sys.exc_info()[1],
(socket.error, BadStatusLine)):
self.fail("Writing to timed out socket didn't fail"
' as it should have: %s' % sys.exc_info()[1])
self.fail(msg % sys.exc_info()[1])
else:
if response.status != 408:
self.fail("Writing to timed out socket didn't fail"
' as it should have: %s' %
response.read())
self.fail(msg % response.read())
conn.close()
@ -392,12 +387,10 @@ class PipelineTests(helper.CPWebCase):
except Exception:
if not isinstance(sys.exc_info()[1],
(socket.error, BadStatusLine)):
self.fail("Writing to timed out socket didn't fail"
' as it should have: %s' % sys.exc_info()[1])
self.fail(msg % sys.exc_info()[1])
else:
self.fail("Writing to timed out socket didn't fail"
' as it should have: %s' %
response.read())
if response.status != 408:
self.fail(msg % response.read())
conn.close()
@ -441,8 +434,7 @@ class PipelineTests(helper.CPWebCase):
# ``conn.sock``. Until that bug get's fixed we will
# monkey patch the ``response`` instance.
# https://bugs.python.org/issue23377
if six.PY3:
response.fp = conn.sock.makefile('rb', 0)
response.fp = conn.sock.makefile('rb', 0)
response.begin()
body = response.read(13)
self.assertEqual(response.status, 200)
@ -784,7 +776,6 @@ socket_reset_errors += [
class LimitedRequestQueueTests(helper.CPWebCase):
setup_server = staticmethod(setup_upload_server)
@pytest.mark.xfail(reason='#1535')
def test_queue_full(self):
conns = []
overflow_conn = None

View file

@ -6,8 +6,6 @@ import os
import sys
import types
import six
import cherrypy
from cherrypy._cpcompat import ntou
from cherrypy import _cptools, tools
@ -57,7 +55,7 @@ class CoreRequestHandlingTest(helper.CPWebCase):
"""
def __init__(cls, name, bases, dct):
type.__init__(cls, name, bases, dct)
for value in six.itervalues(dct):
for value in dct.values():
if isinstance(value, types.FunctionType):
value.exposed = True
setattr(root, name.lower(), cls())
@ -387,6 +385,11 @@ class CoreRequestHandlingTest(helper.CPWebCase):
r"<a href=(['\"])(.*)somewhere%20else\1>\2somewhere%20else</a>")
self.assertStatus(307)
self.getPage('/redirect/by_code?code=308')
self.assertMatchesBody(
r"<a href=(['\"])(.*)somewhere%20else\1>\2somewhere%20else</a>")
self.assertStatus(308)
self.getPage('/redirect/nomodify')
self.assertBody('')
self.assertStatus(304)
@ -551,7 +554,7 @@ class CoreRequestHandlingTest(helper.CPWebCase):
self.assertStatus(206)
ct = self.assertHeader('Content-Type')
expected_type = 'multipart/byteranges; boundary='
self.assert_(ct.startswith(expected_type))
assert ct.startswith(expected_type)
boundary = ct[len(expected_type):]
expected_body = ('\r\n--%s\r\n'
'Content-type: text/html\r\n'

View file

@ -1,5 +1,3 @@
import six
import cherrypy
from cherrypy.test import helper
@ -79,7 +77,7 @@ def setup_server():
self.name = name
def __unicode__(self):
return six.text_type(self.name)
return str(self.name)
def __str__(self):
return str(self.name)
@ -105,7 +103,7 @@ def setup_server():
return 'POST %d' % make_user(name)
def GET(self):
return six.text_type(sorted(user_lookup.keys()))
return str(sorted(user_lookup.keys()))
def dynamic_dispatch(self, vpath):
try:
@ -130,7 +128,7 @@ def setup_server():
"""
Return the appropriate representation of the instance.
"""
return six.text_type(self.user)
return str(self.user)
def POST(self, name):
"""

View file

@ -3,9 +3,8 @@
import gzip
import io
from unittest import mock
from six.moves.http_client import IncompleteRead
from six.moves.urllib.parse import quote as url_quote
from http.client import IncompleteRead
from urllib.parse import quote as url_quote
import cherrypy
from cherrypy._cpcompat import ntob, ntou

View file

@ -6,13 +6,11 @@ import mimetypes
import socket
import sys
from unittest import mock
import six
from six.moves.http_client import HTTPConnection
from six.moves import urllib
import urllib.parse
from http.client import HTTPConnection
import cherrypy
from cherrypy._cpcompat import HTTPSConnection, quote
from cherrypy._cpcompat import HTTPSConnection
from cherrypy.test import helper
@ -36,7 +34,7 @@ def encode_filename(filename):
"""
if is_ascii(filename):
return 'filename', '"{filename}"'.format(**locals())
encoded = quote(filename, encoding='utf-8')
encoded = urllib.parse.quote(filename, encoding='utf-8')
return 'filename*', "'".join((
'UTF-8',
'', # lang
@ -105,14 +103,12 @@ class HTTPTests(helper.CPWebCase):
count += 1
else:
if count:
if six.PY3:
curchar = chr(curchar)
curchar = chr(curchar)
summary.append('%s * %d' % (curchar, count))
count = 1
curchar = c
if count:
if six.PY3:
curchar = chr(curchar)
curchar = chr(curchar)
summary.append('%s * %d' % (curchar, count))
return ', '.join(summary)
@ -189,12 +185,14 @@ class HTTPTests(helper.CPWebCase):
self.assertBody(', '.join(parts))
def test_post_filename_with_special_characters(self):
'''Testing that we can handle filenames with special characters. This
was reported as a bug in:
https://github.com/cherrypy/cherrypy/issues/1146/
https://github.com/cherrypy/cherrypy/issues/1397/
https://github.com/cherrypy/cherrypy/issues/1694/
'''
"""Testing that we can handle filenames with special characters.
This was reported as a bug in:
* https://github.com/cherrypy/cherrypy/issues/1146/
* https://github.com/cherrypy/cherrypy/issues/1397/
* https://github.com/cherrypy/cherrypy/issues/1694/
"""
# We'll upload a bunch of files with differing names.
fnames = [
'boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv',

View file

@ -1,6 +1,6 @@
"""Test helpers from ``cherrypy.lib.httputil`` module."""
import pytest
from six.moves import http_client
import http.client
from cherrypy.lib import httputil
@ -49,12 +49,12 @@ EXPECTED_444 = (444, 'Non-existent reason', '')
(None, EXPECTED_200),
(200, EXPECTED_200),
('500', EXPECTED_500),
(http_client.NOT_FOUND, EXPECTED_404),
(http.client.NOT_FOUND, EXPECTED_404),
('444 Non-existent reason', EXPECTED_444),
]
)
def test_valid_status(status, expected_status):
"""Check valid int, string and http_client-constants
"""Check valid int, string and http.client-constants
statuses processing."""
assert httputil.valid_status(status) == expected_status
@ -62,19 +62,20 @@ def test_valid_status(status, expected_status):
@pytest.mark.parametrize(
'status_code,error_msg',
[
('hey', "Illegal response status from server ('hey' is non-numeric)."),
(
'hey',
r"Illegal response status from server \('hey' is non-numeric\)."
),
(
{'hey': 'hi'},
'Illegal response status from server '
"({'hey': 'hi'} is non-numeric).",
r'Illegal response status from server '
r"\(\{'hey': 'hi'\} is non-numeric\).",
),
(1, 'Illegal response status from server (1 is out of range).'),
(600, 'Illegal response status from server (600 is out of range).'),
(1, r'Illegal response status from server \(1 is out of range\).'),
(600, r'Illegal response status from server \(600 is out of range\).'),
]
)
def test_invalid_status(status_code, error_msg):
"""Check that invalid status cause certain errors."""
with pytest.raises(ValueError) as excinfo:
with pytest.raises(ValueError, match=error_msg):
httputil.valid_status(status_code)
assert error_msg in str(excinfo)

View file

@ -1,5 +1,3 @@
import six
import cherrypy
from cherrypy.test import helper
@ -88,7 +86,7 @@ class IteratorTest(helper.CPWebCase):
@cherrypy.expose
def count(self, clsname):
cherrypy.response.headers['Content-Type'] = 'text/plain'
return six.text_type(globals()[clsname].created)
return str(globals()[clsname].created)
@cherrypy.expose
def getall(self, clsname):
@ -139,7 +137,7 @@ class IteratorTest(helper.CPWebCase):
headers = response.getheaders()
for header_name, header_value in headers:
if header_name.lower() == 'content-length':
expected = six.text_type(1024 * 16 * 256)
expected = str(1024 * 16 * 256)
assert header_value == expected, header_value
break
else:

View file

@ -1,7 +1,6 @@
import cherrypy
from cherrypy.test import helper
from cherrypy._cpcompat import json
from cherrypy._json import json
json_out = cherrypy.config(**{'tools.json_out.on': True})

View file

@ -1,24 +1,51 @@
"""Basic tests for the CherryPy core: request handling."""
import os
from unittest import mock
import logging
import six
from cheroot.test import webtest
import pytest
import requests # FIXME: Temporary using it directly, better switch
import cherrypy
from cherrypy._cpcompat import ntou
from cherrypy.test import helper, logtest
from cherrypy.test.logtest import LogCase
localDir = os.path.dirname(__file__)
access_log = os.path.join(localDir, 'access.log')
error_log = os.path.join(localDir, 'error.log')
# Some unicode strings.
tartaros = ntou('\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2', 'escape')
erebos = ntou('\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com', 'escape')
tartaros = u'\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2'
erebos = u'\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com'
def setup_server():
@pytest.fixture
def access_log_file(tmp_path_factory):
return tmp_path_factory.mktemp('logs') / 'access.log'
@pytest.fixture
def error_log_file(tmp_path_factory):
return tmp_path_factory.mktemp('logs') / 'access.log'
@pytest.fixture
def server(configure_server):
cherrypy.engine.start()
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
yield
shutdown_server()
def shutdown_server():
cherrypy.engine.exit()
cherrypy.engine.block()
for name, server in getattr(cherrypy, 'servers', {}).copy().items():
server.unsubscribe()
del cherrypy.servers[name]
@pytest.fixture
def configure_server(access_log_file, error_log_file):
class Root:
@cherrypy.expose
@ -58,152 +85,204 @@ def setup_server():
root = Root()
cherrypy.config.reset()
cherrypy.config.update({
'log.error_file': error_log,
'log.access_file': access_log,
'server.socket_host': webtest.WebCase.HOST,
'server.socket_port': webtest.WebCase.PORT,
'server.protocol_version': webtest.WebCase.PROTOCOL,
'environment': 'test_suite',
})
cherrypy.config.update({
'log.error_file': str(error_log_file),
'log.access_file': str(access_log_file),
})
cherrypy.tree.mount(root)
class AccessLogTests(helper.CPWebCase, logtest.LogCase):
setup_server = staticmethod(setup_server)
@pytest.fixture
def log_tracker(access_log_file):
class LogTracker(LogCase):
logfile = str(access_log_file)
return LogTracker()
logfile = access_log
def testNormalReturn(self):
self.markLog()
self.getPage('/as_string',
headers=[('Referer', 'http://www.cherrypy.org/'),
('User-Agent', 'Mozilla/5.0')])
self.assertBody('content')
self.assertStatus(200)
intro = '%s - - [' % self.interface()
self.assertLog(-1, intro)
if [k for k, v in self.headers if k.lower() == 'content-length']:
self.assertLog(-1, '] "GET %s/as_string HTTP/1.1" 200 7 '
'"http://www.cherrypy.org/" "Mozilla/5.0"'
% self.prefix())
else:
self.assertLog(-1, '] "GET %s/as_string HTTP/1.1" 200 - '
'"http://www.cherrypy.org/" "Mozilla/5.0"'
% self.prefix())
def testNormalYield(self):
self.markLog()
self.getPage('/as_yield')
self.assertBody('content')
self.assertStatus(200)
intro = '%s - - [' % self.interface()
self.assertLog(-1, intro)
if [k for k, v in self.headers if k.lower() == 'content-length']:
self.assertLog(-1, '] "GET %s/as_yield HTTP/1.1" 200 7 "" ""' %
self.prefix())
else:
self.assertLog(-1, '] "GET %s/as_yield HTTP/1.1" 200 - "" ""'
% self.prefix())
@mock.patch(
'cherrypy._cplogging.LogManager.access_log_format',
'{h} {l} {u} {t} "{r}" {s} {b} "{f}" "{a}" {o}'
if six.PY3 else
'%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(o)s'
def test_normal_return(log_tracker, server):
log_tracker.markLog()
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
resp = requests.get(
'http://%s:%s/as_string' % (host, port),
headers={
'Referer': 'http://www.cherrypy.org/',
'User-Agent': 'Mozilla/5.0',
},
)
def testCustomLogFormat(self):
"""Test a customized access_log_format string, which is a
feature of _cplogging.LogManager.access()."""
self.markLog()
self.getPage('/as_string', headers=[('Referer', 'REFERER'),
('User-Agent', 'USERAGENT'),
('Host', 'HOST')])
self.assertLog(-1, '%s - - [' % self.interface())
self.assertLog(-1, '] "GET /as_string HTTP/1.1" '
'200 7 "REFERER" "USERAGENT" HOST')
expected_body = 'content'
assert resp.text == expected_body
assert resp.status_code == 200
@mock.patch(
'cherrypy._cplogging.LogManager.access_log_format',
'{h} {l} {u} {z} "{r}" {s} {b} "{f}" "{a}" {o}'
if six.PY3 else
'%(h)s %(l)s %(u)s %(z)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(o)s'
intro = '%s - - [' % host
log_tracker.assertLog(-1, intro)
content_length = len(expected_body)
if not any(
k for k, v in resp.headers.items()
if k.lower() == 'content-length'
):
content_length = '-'
log_tracker.assertLog(
-1,
'] "GET /as_string HTTP/1.1" 200 %s '
'"http://www.cherrypy.org/" "Mozilla/5.0"'
% content_length,
)
def testTimezLogFormat(self):
"""Test a customized access_log_format string, which is a
feature of _cplogging.LogManager.access()."""
self.markLog()
expected_time = str(cherrypy._cplogging.LazyRfc3339UtcTime())
with mock.patch(
'cherrypy._cplogging.LazyRfc3339UtcTime',
lambda: expected_time):
self.getPage('/as_string', headers=[('Referer', 'REFERER'),
('User-Agent', 'USERAGENT'),
('Host', 'HOST')])
self.assertLog(-1, '%s - - ' % self.interface())
self.assertLog(-1, expected_time)
self.assertLog(-1, ' "GET /as_string HTTP/1.1" '
'200 7 "REFERER" "USERAGENT" HOST')
@mock.patch(
'cherrypy._cplogging.LogManager.access_log_format',
'{i}' if six.PY3 else '%(i)s'
def test_normal_yield(log_tracker, server):
log_tracker.markLog()
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
resp = requests.get(
'http://%s:%s/as_yield' % (host, port),
headers={
'User-Agent': '',
},
)
def testUUIDv4ParameterLogFormat(self):
"""Test rendering of UUID4 within access log."""
self.markLog()
self.getPage('/as_string')
self.assertValidUUIDv4()
expected_body = 'content'
assert resp.text == expected_body
assert resp.status_code == 200
def testEscapedOutput(self):
# Test unicode in access log pieces.
self.markLog()
self.getPage('/uni_code')
self.assertStatus(200)
if six.PY3:
# The repr of a bytestring in six.PY3 includes a b'' prefix
self.assertLog(-1, repr(tartaros.encode('utf8'))[2:-1])
else:
self.assertLog(-1, repr(tartaros.encode('utf8'))[1:-1])
# Test the erebos value. Included inline for your enlightenment.
# Note the 'r' prefix--those backslashes are literals.
self.assertLog(-1, r'\xce\x88\xcf\x81\xce\xb5\xce\xb2\xce\xbf\xcf\x82')
intro = '%s - - [' % host
# Test backslashes in output.
self.markLog()
self.getPage('/slashes')
self.assertStatus(200)
if six.PY3:
self.assertLog(-1, b'"GET /slashed\\path HTTP/1.1"')
else:
self.assertLog(-1, r'"GET /slashed\\path HTTP/1.1"')
log_tracker.assertLog(-1, intro)
content_length = len(expected_body)
if not any(
k for k, v in resp.headers.items()
if k.lower() == 'content-length'
):
content_length = '-'
# Test whitespace in output.
self.markLog()
self.getPage('/whitespace')
self.assertStatus(200)
# Again, note the 'r' prefix.
self.assertLog(-1, r'"Browzuh (1.0\r\n\t\t.3)"')
log_tracker.assertLog(
-1,
'] "GET /as_yield HTTP/1.1" 200 %s "" ""'
% content_length,
)
class ErrorLogTests(helper.CPWebCase, logtest.LogCase):
setup_server = staticmethod(setup_server)
def test_custom_log_format(log_tracker, monkeypatch, server):
"""Test a customized access_log_format string, which is a
feature of _cplogging.LogManager.access()."""
monkeypatch.setattr(
'cherrypy._cplogging.LogManager.access_log_format',
'{h} {l} {u} {t} "{r}" {s} {b} "{f}" "{a}" {o}',
)
log_tracker.markLog()
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
requests.get(
'http://%s:%s/as_string' % (host, port),
headers={
'Referer': 'REFERER',
'User-Agent': 'USERAGENT',
'Host': 'HOST',
},
)
log_tracker.assertLog(-1, '%s - - [' % host)
log_tracker.assertLog(
-1,
'] "GET /as_string HTTP/1.1" '
'200 7 "REFERER" "USERAGENT" HOST',
)
logfile = error_log
def testTracebacks(self):
# Test that tracebacks get written to the error log.
self.markLog()
ignore = helper.webtest.ignored_exceptions
ignore.append(ValueError)
try:
self.getPage('/error')
self.assertInBody('raise ValueError()')
self.assertLog(0, 'HTTP')
self.assertLog(1, 'Traceback (most recent call last):')
self.assertLog(-2, 'raise ValueError()')
finally:
ignore.pop()
def test_timez_log_format(log_tracker, monkeypatch, server):
"""Test a customized access_log_format string, which is a
feature of _cplogging.LogManager.access()."""
monkeypatch.setattr(
'cherrypy._cplogging.LogManager.access_log_format',
'{h} {l} {u} {z} "{r}" {s} {b} "{f}" "{a}" {o}',
)
log_tracker.markLog()
expected_time = str(cherrypy._cplogging.LazyRfc3339UtcTime())
monkeypatch.setattr(
'cherrypy._cplogging.LazyRfc3339UtcTime',
lambda: expected_time,
)
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
requests.get(
'http://%s:%s/as_string' % (host, port),
headers={
'Referer': 'REFERER',
'User-Agent': 'USERAGENT',
'Host': 'HOST',
},
)
log_tracker.assertLog(-1, '%s - - ' % host)
log_tracker.assertLog(-1, expected_time)
log_tracker.assertLog(
-1,
' "GET /as_string HTTP/1.1" '
'200 7 "REFERER" "USERAGENT" HOST',
)
def test_UUIDv4_parameter_log_format(log_tracker, monkeypatch, server):
"""Test rendering of UUID4 within access log."""
monkeypatch.setattr(
'cherrypy._cplogging.LogManager.access_log_format',
'{i}',
)
log_tracker.markLog()
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
requests.get('http://%s:%s/as_string' % (host, port))
log_tracker.assertValidUUIDv4()
def test_escaped_output(log_tracker, server):
# Test unicode in access log pieces.
log_tracker.markLog()
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
resp = requests.get('http://%s:%s/uni_code' % (host, port))
assert resp.status_code == 200
# The repr of a bytestring includes a b'' prefix
log_tracker.assertLog(-1, repr(tartaros.encode('utf8'))[2:-1])
# Test the erebos value. Included inline for your enlightenment.
# Note the 'r' prefix--those backslashes are literals.
log_tracker.assertLog(
-1,
r'\xce\x88\xcf\x81\xce\xb5\xce\xb2\xce\xbf\xcf\x82',
)
# Test backslashes in output.
log_tracker.markLog()
resp = requests.get('http://%s:%s/slashes' % (host, port))
assert resp.status_code == 200
log_tracker.assertLog(-1, b'"GET /slashed\\path HTTP/1.1"')
# Test whitespace in output.
log_tracker.markLog()
resp = requests.get('http://%s:%s/whitespace' % (host, port))
assert resp.status_code == 200
# Again, note the 'r' prefix.
log_tracker.assertLog(-1, r'"Browzuh (1.0\r\n\t\t.3)"')
def test_tracebacks(server, caplog):
host = webtest.interface(webtest.WebCase.HOST)
port = webtest.WebCase.PORT
with caplog.at_level(logging.ERROR, logger='cherrypy.error'):
resp = requests.get('http://%s:%s/error' % (host, port))
rec = caplog.records[0]
exc_cls, exc_msg = rec.exc_info[0], rec.message
assert 'raise ValueError()' in resp.text
assert 'HTTP' in exc_msg
assert exc_cls is ValueError

View file

@ -3,8 +3,7 @@
import itertools
import platform
import threading
from six.moves.http_client import HTTPConnection
from http.client import HTTPConnection
import cherrypy
from cherrypy._cpcompat import HTTPSConnection

View file

@ -5,9 +5,7 @@ import os
import sys
import types
import uuid
import six
from six.moves.http_client import IncompleteRead
from http.client import IncompleteRead
import cherrypy
from cherrypy._cpcompat import ntou
@ -243,7 +241,7 @@ class RequestObjectTests(helper.CPWebCase):
def ifmatch(self):
val = cherrypy.request.headers['If-Match']
assert isinstance(val, six.text_type)
assert isinstance(val, str)
cherrypy.response.headers['ETag'] = val
return val
@ -251,7 +249,7 @@ class RequestObjectTests(helper.CPWebCase):
def get_elements(self, headername):
e = cherrypy.request.headers.elements(headername)
return '\n'.join([six.text_type(x) for x in e])
return '\n'.join([str(x) for x in e])
class Method(Test):

View file

@ -1,25 +1,24 @@
import os
import platform
import threading
import time
import socket
import importlib
from six.moves.http_client import HTTPConnection
from http.client import HTTPConnection
from distutils.spawn import find_executable
import pytest
from path import Path
from more_itertools import consume
import portend
import cherrypy
from cherrypy._cpcompat import (
json_decode,
HTTPSConnection,
)
from cherrypy._cpcompat import HTTPSConnection
from cherrypy.lib import sessions
from cherrypy.lib import reprconf
from cherrypy.lib.httputil import response_codes
from cherrypy.test import helper
from cherrypy import _json as json
localDir = os.path.dirname(__file__)
localDir = Path(__file__).dirname()
def http_methods_allowed(methods=['GET', 'HEAD']):
@ -48,9 +47,10 @@ def setup_server():
cherrypy.session.cache.clear()
@cherrypy.expose
@cherrypy.tools.json_out()
def data(self):
cherrypy.session['aha'] = 'foo'
return repr(cherrypy.session._data)
return cherrypy.session._data
@cherrypy.expose
def testGen(self):
@ -142,14 +142,18 @@ def setup_server():
class SessionTest(helper.CPWebCase):
setup_server = staticmethod(setup_server)
def tearDown(self):
# Clean up sessions.
for fname in os.listdir(localDir):
if fname.startswith(sessions.FileSession.SESSION_PREFIX):
path = Path(localDir) / fname
path.remove_p()
@classmethod
def teardown_class(cls):
"""Clean up sessions."""
super(cls, cls).teardown_class()
consume(
file.remove_p()
for file in localDir.listdir()
if file.basename().startswith(
sessions.FileSession.SESSION_PREFIX
)
)
@pytest.mark.xfail(reason='#1534')
def test_0_Session(self):
self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession')
self.getPage('/clear')
@ -157,82 +161,81 @@ class SessionTest(helper.CPWebCase):
# Test that a normal request gets the same id in the cookies.
# Note: this wouldn't work if /data didn't load the session.
self.getPage('/data')
self.assertBody("{'aha': 'foo'}")
assert self.body == b'{"aha": "foo"}'
c = self.cookies[0]
self.getPage('/data', self.cookies)
self.assertEqual(self.cookies[0], c)
self.cookies[0] == c
self.getPage('/testStr')
self.assertBody('1')
assert self.body == b'1'
cookie_parts = dict([p.strip().split('=')
for p in self.cookies[0][1].split(';')])
# Assert there is an 'expires' param
self.assertEqual(set(cookie_parts.keys()),
set(['session_id', 'expires', 'Path']))
expected_cookie_keys = {'session_id', 'expires', 'Path', 'Max-Age'}
assert set(cookie_parts.keys()) == expected_cookie_keys
self.getPage('/testGen', self.cookies)
self.assertBody('2')
assert self.body == b'2'
self.getPage('/testStr', self.cookies)
self.assertBody('3')
assert self.body == b'3'
self.getPage('/data', self.cookies)
self.assertDictEqual(json_decode(self.body),
{'counter': 3, 'aha': 'foo'})
expected_data = {'counter': 3, 'aha': 'foo'}
assert json.decode(self.body.decode('utf-8')) == expected_data
self.getPage('/length', self.cookies)
self.assertBody('2')
assert self.body == b'2'
self.getPage('/delkey?key=counter', self.cookies)
self.assertStatus(200)
assert self.status_code == 200
self.getPage('/set_session_cls/cherrypy.lib.sessions.FileSession')
self.getPage('/testStr')
self.assertBody('1')
assert self.body == b'1'
self.getPage('/testGen', self.cookies)
self.assertBody('2')
assert self.body == b'2'
self.getPage('/testStr', self.cookies)
self.assertBody('3')
assert self.body == b'3'
self.getPage('/delkey?key=counter', self.cookies)
self.assertStatus(200)
assert self.status_code == 200
# Wait for the session.timeout (1 second)
time.sleep(2)
self.getPage('/')
self.assertBody('1')
assert self.body == b'1'
self.getPage('/length', self.cookies)
self.assertBody('1')
assert self.body == b'1'
# Test session __contains__
self.getPage('/keyin?key=counter', self.cookies)
self.assertBody('True')
assert self.body == b'True'
cookieset1 = self.cookies
# Make a new session and test __len__ again
self.getPage('/')
self.getPage('/length', self.cookies)
self.assertBody('2')
assert self.body == b'2'
# Test session delete
self.getPage('/delete', self.cookies)
self.assertBody('done')
assert self.body == b'done'
self.getPage('/delete', cookieset1)
self.assertBody('done')
assert self.body == b'done'
def f():
return [
x
for x in os.listdir(localDir)
if x.startswith('session-')
if x.startswith('session-') and not x.endswith('.lock')
]
self.assertEqual(f(), [])
assert f() == []
# Wait for the cleanup thread to delete remaining session files
self.getPage('/')
self.assertNotEqual(f(), [])
assert f() != []
time.sleep(2)
self.assertEqual(f(), [])
assert f() == []
def test_1_Ram_Concurrency(self):
self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession')
self._test_Concurrency()
@pytest.mark.xfail(reason='#1306')
def test_2_File_Concurrency(self):
self.getPage('/set_session_cls/cherrypy.lib.sessions.FileSession')
self._test_Concurrency()
@ -243,7 +246,7 @@ class SessionTest(helper.CPWebCase):
# Get initial cookie
self.getPage('/')
self.assertBody('1')
assert self.body == b'1'
cookies = self.cookies
data_dict = {}
@ -285,13 +288,14 @@ class SessionTest(helper.CPWebCase):
for e in errors:
print(e)
self.assertEqual(hitcount, expected)
assert len(errors) == 0
assert hitcount == expected
def test_3_Redirect(self):
# Start a new session
self.getPage('/testStr')
self.getPage('/iredir', self.cookies)
self.assertBody('FileSession')
assert self.body == b'FileSession'
def test_4_File_deletion(self):
# Start a new session
@ -319,9 +323,9 @@ class SessionTest(helper.CPWebCase):
# grab the cookie ID
id1 = self.cookies[0][1].split(';', 1)[0].split('=', 1)[1]
self.getPage('/regen')
self.assertBody('logged in')
assert self.body == b'logged in'
id2 = self.cookies[0][1].split(';', 1)[0].split('=', 1)[1]
self.assertNotEqual(id1, id2)
assert id1 != id2
self.getPage('/testStr')
# grab the cookie ID
@ -332,8 +336,8 @@ class SessionTest(helper.CPWebCase):
'session_id=maliciousid; '
'expires=Sat, 27 Oct 2017 04:18:28 GMT; Path=/;')])
id2 = self.cookies[0][1].split(';', 1)[0].split('=', 1)[1]
self.assertNotEqual(id1, id2)
self.assertNotEqual(id2, 'maliciousid')
assert id1 != id2
assert id2 != 'maliciousid'
def test_7_session_cookies(self):
self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession')
@ -343,18 +347,18 @@ class SessionTest(helper.CPWebCase):
cookie_parts = dict([p.strip().split('=')
for p in self.cookies[0][1].split(';')])
# Assert there is no 'expires' param
self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
assert set(cookie_parts.keys()) == {'temp', 'Path'}
id1 = cookie_parts['temp']
self.assertEqual(list(sessions.RamSession.cache), [id1])
assert list(sessions.RamSession.cache) == [id1]
# Send another request in the same "browser session".
self.getPage('/session_cookie', self.cookies)
cookie_parts = dict([p.strip().split('=')
for p in self.cookies[0][1].split(';')])
# Assert there is no 'expires' param
self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
self.assertBody(id1)
self.assertEqual(list(sessions.RamSession.cache), [id1])
assert set(cookie_parts.keys()) == {'temp', 'Path'}
assert self.body.decode('utf-8') == id1
assert list(sessions.RamSession.cache) == [id1]
# Simulate a browser close by just not sending the cookies
self.getPage('/session_cookie')
@ -362,12 +366,11 @@ class SessionTest(helper.CPWebCase):
cookie_parts = dict([p.strip().split('=')
for p in self.cookies[0][1].split(';')])
# Assert there is no 'expires' param
self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
assert set(cookie_parts.keys()) == {'temp', 'Path'}
# Assert a new id has been generated...
id2 = cookie_parts['temp']
self.assertNotEqual(id1, id2)
self.assertEqual(set(sessions.RamSession.cache.keys()),
set([id1, id2]))
assert id1 != id2
assert set(sessions.RamSession.cache.keys()) == {id1, id2}
# Wait for the session.timeout on both sessions
time.sleep(2.5)
@ -398,115 +401,147 @@ class SessionTest(helper.CPWebCase):
t.join()
try:
importlib.import_module('memcache')
def is_memcached_present():
executable = find_executable('memcached')
return bool(executable)
host, port = '127.0.0.1', 11211
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
s = None
@pytest.fixture(scope='session')
def memcached_server_present():
is_memcached_present() or pytest.skip('memcached not available')
@pytest.fixture()
def memcached_client_present():
pytest.importorskip('memcache')
@pytest.fixture(scope='session')
def memcached_instance(request, watcher_getter, memcached_server_present):
"""
Start up an instance of memcached.
"""
port = portend.find_available_local_port()
def is_occupied():
try:
s = socket.socket(af, socktype, proto)
# See http://groups.google.com/group/cherrypy-users/
# browse_frm/thread/bbfe5eb39c904fe0
s.settimeout(1.0)
s.connect((host, port))
s.close()
except socket.error:
if s:
s.close()
raise
break
except (ImportError, socket.error):
class MemcachedSessionTest(helper.CPWebCase):
setup_server = staticmethod(setup_server)
portend.Checker().assert_free('localhost', port)
except Exception:
return True
return False
def test(self):
return self.skip('memcached not reachable ')
else:
class MemcachedSessionTest(helper.CPWebCase):
setup_server = staticmethod(setup_server)
proc = watcher_getter(
name='memcached',
arguments=['-p', str(port)],
checker=is_occupied,
request=request,
)
return locals()
def test_0_Session(self):
self.getPage('/set_session_cls/cherrypy.Sessions.MemcachedSession')
self.getPage('/testStr')
self.assertBody('1')
self.getPage('/testGen', self.cookies)
self.assertBody('2')
self.getPage('/testStr', self.cookies)
self.assertBody('3')
self.getPage('/length', self.cookies)
self.assertErrorPage(500)
self.assertInBody('NotImplementedError')
self.getPage('/delkey?key=counter', self.cookies)
self.assertStatus(200)
@pytest.fixture
def memcached_configured(
memcached_instance, monkeypatch,
memcached_client_present,
):
server = 'localhost:{port}'.format_map(memcached_instance)
monkeypatch.setattr(
sessions.MemcachedSession,
'servers',
[server],
)
# Wait for the session.timeout (1 second)
time.sleep(1.25)
self.getPage('/')
self.assertBody('1')
# Test session __contains__
self.getPage('/keyin?key=counter', self.cookies)
self.assertBody('True')
@pytest.mark.skipif(
platform.system() == 'Windows',
reason='pytest-services helper does not work under Windows',
)
@pytest.mark.usefixtures('memcached_configured')
class MemcachedSessionTest(helper.CPWebCase):
setup_server = staticmethod(setup_server)
# Test session delete
self.getPage('/delete', self.cookies)
self.assertBody('done')
def test_0_Session(self):
self.getPage(
'/set_session_cls/cherrypy.lib.sessions.MemcachedSession'
)
def test_1_Concurrency(self):
client_thread_count = 5
request_count = 30
self.getPage('/testStr')
assert self.body == b'1'
self.getPage('/testGen', self.cookies)
assert self.body == b'2'
self.getPage('/testStr', self.cookies)
assert self.body == b'3'
self.getPage('/length', self.cookies)
self.assertErrorPage(500)
assert b'NotImplementedError' in self.body
self.getPage('/delkey?key=counter', self.cookies)
assert self.status_code == 200
# Get initial cookie
self.getPage('/')
self.assertBody('1')
cookies = self.cookies
# Wait for the session.timeout (1 second)
time.sleep(1.25)
self.getPage('/')
assert self.body == b'1'
data_dict = {}
# Test session __contains__
self.getPage('/keyin?key=counter', self.cookies)
assert self.body == b'True'
def request(index):
for i in range(request_count):
self.getPage('/', cookies)
# Uncomment the following line to prove threads overlap.
# sys.stdout.write("%d " % index)
if not self.body.isdigit():
self.fail(self.body)
data_dict[index] = int(self.body)
# Test session delete
self.getPage('/delete', self.cookies)
assert self.body == b'done'
# Start <request_count> concurrent requests from
# each of <client_thread_count> clients
ts = []
for c in range(client_thread_count):
data_dict[c] = 0
t = threading.Thread(target=request, args=(c,))
ts.append(t)
t.start()
def test_1_Concurrency(self):
client_thread_count = 5
request_count = 30
for t in ts:
t.join()
# Get initial cookie
self.getPage('/')
assert self.body == b'1'
cookies = self.cookies
hitcount = max(data_dict.values())
expected = 1 + (client_thread_count * request_count)
self.assertEqual(hitcount, expected)
data_dict = {}
def test_3_Redirect(self):
# Start a new session
self.getPage('/testStr')
self.getPage('/iredir', self.cookies)
self.assertBody('memcached')
def request(index):
for i in range(request_count):
self.getPage('/', cookies)
# Uncomment the following line to prove threads overlap.
# sys.stdout.write("%d " % index)
if not self.body.isdigit():
self.fail(self.body)
data_dict[index] = int(self.body)
def test_5_Error_paths(self):
self.getPage('/unknown/page')
self.assertErrorPage(
404, "The path '/unknown/page' was not found.")
# Start <request_count> concurrent requests from
# each of <client_thread_count> clients
ts = []
for c in range(client_thread_count):
data_dict[c] = 0
t = threading.Thread(target=request, args=(c,))
ts.append(t)
t.start()
# Note: this path is *not* the same as above. The above
# takes a normal route through the session code; this one
# skips the session code's before_handler and only calls
# before_finalize (save) and on_end (close). So the session
# code has to survive calling save/close without init.
self.getPage('/restricted', self.cookies, method='POST')
self.assertErrorPage(405, response_codes[405][1])
for t in ts:
t.join()
hitcount = max(data_dict.values())
expected = 1 + (client_thread_count * request_count)
assert hitcount == expected
def test_3_Redirect(self):
# Start a new session
self.getPage('/testStr')
self.getPage('/iredir', self.cookies)
assert self.body == b'MemcachedSession'
def test_5_Error_paths(self):
self.getPage('/unknown/page')
self.assertErrorPage(
404, "The path '/unknown/page' was not found.")
# Note: this path is *not* the same as above. The above
# takes a normal route through the session code; this one
# skips the session code's before_handler and only calls
# before_finalize (save) and on_end (close). So the session
# code has to survive calling save/close without init.
self.getPage('/restricted', self.cookies, method='POST')
self.assertErrorPage(405, response_codes[405][1])

View file

@ -1,10 +1,7 @@
import os
import signal
import time
import unittest
import warnings
from six.moves.http_client import BadStatusLine
from http.client import BadStatusLine
import pytest
import portend
@ -13,6 +10,7 @@ import cherrypy
import cherrypy.process.servers
from cherrypy.test import helper
engine = cherrypy.engine
thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
@ -433,41 +431,41 @@ test_case_name: "test_signal_handler_unsubscribe"
)
class WaitTests(unittest.TestCase):
def test_safe_wait_INADDR_ANY(): # pylint: disable=invalid-name
"""
Wait on INADDR_ANY should not raise IOError
def test_safe_wait_INADDR_ANY(self):
"""
Wait on INADDR_ANY should not raise IOError
In cases where the loopback interface does not exist, CherryPy cannot
effectively determine if a port binding to INADDR_ANY was effected.
In this situation, CherryPy should assume that it failed to detect
the binding (not that the binding failed) and only warn that it could
not verify it.
"""
# At such a time that CherryPy can reliably determine one or more
# viable IP addresses of the host, this test may be removed.
In cases where the loopback interface does not exist, CherryPy cannot
effectively determine if a port binding to INADDR_ANY was effected.
In this situation, CherryPy should assume that it failed to detect
the binding (not that the binding failed) and only warn that it could
not verify it.
"""
# At such a time that CherryPy can reliably determine one or more
# viable IP addresses of the host, this test may be removed.
# Simulate the behavior we observe when no loopback interface is
# present by: finding a port that's not occupied, then wait on it.
# Simulate the behavior we observe when no loopback interface is
# present by: finding a port that's not occupied, then wait on it.
free_port = portend.find_available_local_port()
free_port = portend.find_available_local_port()
servers = cherrypy.process.servers
servers = cherrypy.process.servers
inaddr_any = '0.0.0.0'
inaddr_any = '0.0.0.0'
# Wait on the free port that's unbound
with pytest.warns(
UserWarning,
match='Unable to verify that the server is bound on ',
) as warnings:
# pylint: disable=protected-access
with servers._safe_wait(inaddr_any, free_port):
portend.occupied(inaddr_any, free_port, timeout=1)
assert len(warnings) == 1
# Wait on the free port that's unbound
with warnings.catch_warnings(record=True) as w:
with servers._safe_wait(inaddr_any, free_port):
portend.occupied(inaddr_any, free_port, timeout=1)
self.assertEqual(len(w), 1)
self.assertTrue(isinstance(w[0], warnings.WarningMessage))
self.assertTrue(
'Unable to verify that the server is bound on ' in str(w[0]))
# The wait should still raise an IO error if INADDR_ANY was
# not supplied.
with pytest.raises(IOError):
with servers._safe_wait('127.0.0.1', free_port):
portend.occupied('127.0.0.1', free_port, timeout=1)
# The wait should still raise an IO error if INADDR_ANY was
# not supplied.
with pytest.raises(IOError):
# pylint: disable=protected-access
with servers._safe_wait('127.0.0.1', free_port):
portend.occupied('127.0.0.1', free_port, timeout=1)

View file

@ -1,17 +1,17 @@
# -*- coding: utf-8 -*-
import contextlib
import io
import os
import sys
import re
import platform
import tempfile
from six import text_type as str
from six.moves import urllib
from six.moves.http_client import HTTPConnection
import urllib.parse
import unittest.mock
from http.client import HTTPConnection
import pytest
import py.path
import path
import cherrypy
from cherrypy.lib import static
@ -46,9 +46,9 @@ def ensure_unicode_filesystem():
tmpdir.remove()
curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
has_space_filepath = os.path.join(curdir, 'static', 'has space.html')
bigfile_filepath = os.path.join(curdir, 'static', 'bigfile.log')
curdir = path.Path(__file__).dirname()
has_space_filepath = curdir / 'static' / 'has space.html'
bigfile_filepath = curdir / 'static' / 'bigfile.log'
# The file size needs to be big enough such that half the size of it
# won't be socket-buffered (or server-buffered) all in one go. See
@ -58,6 +58,7 @@ BIGFILE_SIZE = 32 * MB
class StaticTest(helper.CPWebCase):
files_to_remove = []
@staticmethod
def setup_server():
@ -96,6 +97,20 @@ class StaticTest(helper.CPWebCase):
f = io.BytesIO(b'Fee\nfie\nfo\nfum')
return static.serve_fileobj(f, content_type='text/plain')
@cherrypy.expose
def serve_file_utf8_filename(self):
return static.serve_file(
__file__,
disposition='attachment',
name='has_utf-8_character_☃.html')
@cherrypy.expose
def serve_fileobj_utf8_filename(self):
return static.serve_fileobj(
io.BytesIO('\nfie\nfo\nfum'.encode('utf-8')),
disposition='attachment',
name='has_utf-8_character_☃.html')
class Static:
@cherrypy.expose
@ -157,14 +172,13 @@ class StaticTest(helper.CPWebCase):
vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp})
cherrypy.tree.graft(vhost)
@staticmethod
def teardown_server():
for f in (has_space_filepath, bigfile_filepath):
if os.path.exists(f):
try:
os.unlink(f)
except Exception:
pass
@classmethod
def teardown_class(cls):
super(cls, cls).teardown_class()
files_to_remove = has_space_filepath, bigfile_filepath
files_to_remove += tuple(cls.files_to_remove)
for file in files_to_remove:
file.remove_p()
def test_static(self):
self.getPage('/static/index.html')
@ -193,6 +207,22 @@ class StaticTest(helper.CPWebCase):
# we just check the content
self.assertMatchesBody('^Dummy stylesheet')
# Check a filename with utf-8 characters in it
ascii_fn = 'has_utf-8_character_.html'
url_quote_fn = 'has_utf-8_character_%E2%98%83.html' # %E2%98%83 == ☃
expected_content_disposition = (
'attachment; filename="{!s}"; filename*=UTF-8\'\'{!s}'.
format(ascii_fn, url_quote_fn)
)
self.getPage('/serve_file_utf8_filename')
self.assertStatus('200 OK')
self.assertHeader('Content-Disposition', expected_content_disposition)
self.getPage('/serve_fileobj_utf8_filename')
self.assertStatus('200 OK')
self.assertHeader('Content-Disposition', expected_content_disposition)
@pytest.mark.skipif(platform.system() != 'Windows', reason='Windows only')
def test_static_longpath(self):
"""Test serving of a file in subdir of a Windows long-path
@ -399,38 +429,34 @@ class StaticTest(helper.CPWebCase):
self.assertStatus(404)
self.assertInBody("I couldn't find that thing")
@unittest.mock.patch(
'http.client._contains_disallowed_url_pchar_re',
re.compile(r'[\n]'),
create=True,
)
def test_null_bytes(self):
self.getPage('/static/\x00')
self.assertStatus('404 Not Found')
@staticmethod
@contextlib.contextmanager
def unicode_file():
@classmethod
def unicode_file(cls):
filename = ntou('Слава Україні.html', 'utf-8')
filepath = os.path.join(curdir, 'static', filename)
with io.open(filepath, 'w', encoding='utf-8') as strm:
filepath = curdir / 'static' / filename
with filepath.open('w', encoding='utf-8')as strm:
strm.write(ntou('Героям Слава!', 'utf-8'))
try:
yield
finally:
os.remove(filepath)
cls.files_to_remove.append(filepath)
py27_on_windows = (
platform.system() == 'Windows' and
sys.version_info < (3,)
)
@pytest.mark.xfail(py27_on_windows, reason='#1544') # noqa: E301
def test_unicode(self):
ensure_unicode_filesystem()
with self.unicode_file():
url = ntou('/static/Слава Україні.html', 'utf-8')
# quote function requires str
url = tonative(url, 'utf-8')
url = urllib.parse.quote(url)
self.getPage(url)
self.unicode_file()
url = ntou('/static/Слава Україні.html', 'utf-8')
# quote function requires str
url = tonative(url, 'utf-8')
url = urllib.parse.quote(url)
self.getPage(url)
expected = ntou('Героям Слава!', 'utf-8')
self.assertInBody(expected)
expected = ntou('Героям Слава!', 'utf-8')
self.assertInBody(expected)
def error_page_404(status, message, traceback, version):

View file

@ -7,10 +7,7 @@ import time
import types
import unittest
import operator
import six
from six.moves import range, map
from six.moves.http_client import IncompleteRead
from http.client import IncompleteRead
import cherrypy
from cherrypy import tools
@ -18,6 +15,16 @@ from cherrypy._cpcompat import ntou
from cherrypy.test import helper, _test_decorators
*PY_VER_MINOR, _ = PY_VER_PATCH = sys.version_info[:3]
# Refs:
# bugs.python.org/issue39389
# docs.python.org/3.7/whatsnew/changelog.html#python-3-7-7-release-candidate-1
# docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-release-candidate-1
HAS_GZIP_COMPRESSION_HEADER_FIXED = PY_VER_PATCH >= (3, 8, 2) or (
PY_VER_MINOR == (3, 7) and PY_VER_PATCH >= (3, 7, 7)
)
timeout = 0.2
europoundUnicode = ntou('\x80\xa3')
@ -52,7 +59,7 @@ class ToolTests(helper.CPWebCase):
def _setup(self):
def makemap():
m = self._merged_args().get('map', {})
cherrypy.request.numerify_map = list(six.iteritems(m))
cherrypy.request.numerify_map = list(m.items())
cherrypy.request.hooks.attach('on_start_resource', makemap)
def critical():
@ -105,10 +112,7 @@ class ToolTests(helper.CPWebCase):
def __call__(self, scale):
r = cherrypy.response
r.collapse_body()
if six.PY3:
r.body = [bytes([(x + scale) % 256 for x in r.body[0]])]
else:
r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]]
r.body = [bytes([(x + scale) % 256 for x in r.body[0]])]
cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator())
def stream_handler(next_handler, *args, **kwargs):
@ -179,7 +183,7 @@ class ToolTests(helper.CPWebCase):
"""
def __init__(cls, name, bases, dct):
type.__init__(cls, name, bases, dct)
for value in six.itervalues(dct):
for value in dct.values():
if isinstance(value, types.FunctionType):
cherrypy.expose(value)
setattr(root, name.lower(), cls())
@ -346,7 +350,7 @@ class ToolTests(helper.CPWebCase):
self.getPage('/demo/err_in_onstart')
self.assertErrorPage(502)
tmpl = "AttributeError: 'str' object has no attribute '{attr}'"
expected_msg = tmpl.format(attr='items' if six.PY3 else 'iteritems')
expected_msg = tmpl.format(attr='items')
self.assertInBody(expected_msg)
def testCombinedTools(self):
@ -363,6 +367,13 @@ class ToolTests(helper.CPWebCase):
('Accept-Charset', 'ISO-8859-1,utf-8;q=0.7,*;q=0.7')])
self.assertInBody(zbuf.getvalue()[:3])
if not HAS_GZIP_COMPRESSION_HEADER_FIXED:
# NOTE: CherryPy adopts a fix from the CPython bug 39389
# NOTE: introducing a variable compression XFL flag that
# NOTE: was hardcoded to "best compression" before. And so
# NOTE: we can only test it on CPython versions that also
# NOTE: implement this fix.
return
zbuf = io.BytesIO()
zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
zfile.write(expectedResult)
@ -377,11 +388,7 @@ class ToolTests(helper.CPWebCase):
# but it proves the priority was changed.
self.getPage('/decorated_euro/subpath',
headers=[('Accept-Encoding', 'gzip')])
if six.PY3:
self.assertInBody(bytes([(x + 3) % 256 for x in zbuf.getvalue()]))
else:
self.assertInBody(''.join([chr((ord(x) + 3) % 256)
for x in zbuf.getvalue()]))
self.assertInBody(bytes([(x + 3) % 256 for x in zbuf.getvalue()]))
def testBareHooks(self):
content = 'bit of a pain in me gulliver'
@ -429,7 +436,7 @@ class ToolTests(helper.CPWebCase):
@cherrypy.tools.register( # noqa: F811
'before_finalize', name='renamed', priority=60,
)
def example():
def example(): # noqa: F811
pass
self.assertTrue(isinstance(cherrypy.tools.renamed, cherrypy.Tool))
self.assertEqual(cherrypy.tools.renamed._point, 'before_finalize')
@ -446,8 +453,8 @@ class SessionAuthTest(unittest.TestCase):
username and password were unicode.
"""
sa = cherrypy.lib.cptools.SessionAuth()
res = sa.login_screen(None, username=six.text_type('nobody'),
password=six.text_type('anypass'))
res = sa.login_screen(None, username=str('nobody'),
password=str('anypass'))
self.assertTrue(isinstance(res, bytes))

View file

@ -1,10 +1,6 @@
import sys
import imp
import types
import importlib
import six
import cherrypy
from cherrypy.test import helper
@ -27,7 +23,7 @@ class TutorialTest(helper.CPWebCase):
"""
target = 'cherrypy.tutorial.' + name
if target in sys.modules:
module = imp.reload(sys.modules[target])
module = importlib.reload(sys.modules[target])
else:
module = importlib.import_module(target)
return module
@ -39,8 +35,6 @@ class TutorialTest(helper.CPWebCase):
root = getattr(module, root_name)
conf = getattr(module, 'tutconf')
class_types = type,
if six.PY2:
class_types += types.ClassType,
if isinstance(root, class_types):
root = root()
cherrypy.tree.mount(root, config=conf)

View file

@ -2,8 +2,7 @@ import os
import socket
import atexit
import tempfile
from six.moves.http_client import HTTPConnection
from http.client import HTTPConnection
import pytest

View file

@ -1,54 +1,20 @@
import sys
import socket
import six
from six.moves.xmlrpc_client import (
from xmlrpc.client import (
DateTime, Fault,
ProtocolError, ServerProxy, SafeTransport
ServerProxy, SafeTransport
)
import cherrypy
from cherrypy import _cptools
from cherrypy.test import helper
if six.PY3:
HTTPSTransport = SafeTransport
HTTPSTransport = SafeTransport
# Python 3.0's SafeTransport still mistakenly checks for socket.ssl
import socket
if not hasattr(socket, 'ssl'):
socket.ssl = True
else:
class HTTPSTransport(SafeTransport):
"""Subclass of SafeTransport to fix sock.recv errors (by using file).
"""
def request(self, host, handler, request_body, verbose=0):
# issue XML-RPC request
h = self.make_connection(host)
if verbose:
h.set_debuglevel(1)
self.send_request(h, handler, request_body)
self.send_host(h, host)
self.send_user_agent(h)
self.send_content(h, request_body)
errcode, errmsg, headers = h.getreply()
if errcode != 200:
raise ProtocolError(host + handler, errcode, errmsg, headers)
self.verbose = verbose
# Here's where we differ from the superclass. It says:
# try:
# sock = h._conn.sock
# except AttributeError:
# sock = None
# return self._parse_response(h.getfile(), sock)
return self.parse_response(h.getfile())
# Python 3.0's SafeTransport still mistakenly checks for socket.ssl
if not hasattr(socket, 'ssl'):
socket.ssl = True
def setup_server():