Bump requests from 2.27.1 to 2.28.1 (#1781)

* Bump requests from 2.27.1 to 2.28.1

Bumps [requests](https://github.com/psf/requests) from 2.27.1 to 2.28.1.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.27.1...v2.28.1)

---
updated-dependencies:
- dependency-name: requests
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update requests==2.28.1

* Update urllib3==1.26.12

* Update certifi==2022.9.24

* Update idna==3.4

* Update charset-normalizer==2.1.1

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2022-11-12 17:12:19 -08:00 committed by GitHub
parent baa0e08c2a
commit af1aed0b6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 3295 additions and 2709 deletions

View file

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
"""
requests.utils
~~~~~~~~~~~~~~
@ -20,28 +18,46 @@ import tempfile
import warnings
import zipfile
from collections import OrderedDict
from urllib3.util import make_headers
from urllib3.util import parse_url
from .__version__ import __version__
from urllib3.util import make_headers, parse_url
from . import certs
from .__version__ import __version__
# to_native_string is unused here, but imported here for backwards compatibility
from ._internal_utils import to_native_string
from ._internal_utils import HEADER_VALIDATORS, to_native_string # noqa: F401
from .compat import (
Mapping,
basestring,
bytes,
getproxies,
getproxies_environment,
integer_types,
)
from .compat import parse_http_list as _parse_list_header
from .compat import (
quote, urlparse, bytes, str, unquote, getproxies,
proxy_bypass, urlunparse, basestring, integer_types, is_py3,
proxy_bypass_environment, getproxies_environment, Mapping)
proxy_bypass,
proxy_bypass_environment,
quote,
str,
unquote,
urlparse,
urlunparse,
)
from .cookies import cookiejar_from_dict
from .structures import CaseInsensitiveDict
from .exceptions import (
InvalidURL, InvalidHeader, FileModeWarning, UnrewindableBodyError)
FileModeWarning,
InvalidHeader,
InvalidURL,
UnrewindableBodyError,
)
from .structures import CaseInsensitiveDict
NETRC_FILES = ('.netrc', '_netrc')
NETRC_FILES = (".netrc", "_netrc")
DEFAULT_CA_BUNDLE_PATH = certs.where()
DEFAULT_PORTS = {'http': 80, 'https': 443}
DEFAULT_PORTS = {"http": 80, "https": 443}
# Ensure that ', ' is used to preserve previous delimiter behavior.
DEFAULT_ACCEPT_ENCODING = ", ".join(
@ -49,28 +65,25 @@ DEFAULT_ACCEPT_ENCODING = ", ".join(
)
if sys.platform == 'win32':
if sys.platform == "win32":
# provide a proxy_bypass version on Windows without DNS lookups
def proxy_bypass_registry(host):
try:
if is_py3:
import winreg
else:
import _winreg as winreg
import winreg
except ImportError:
return False
try:
internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Internet Settings')
internetSettings = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
)
# ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
proxyEnable = int(winreg.QueryValueEx(internetSettings,
'ProxyEnable')[0])
proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
# ProxyOverride is almost always a string
proxyOverride = winreg.QueryValueEx(internetSettings,
'ProxyOverride')[0]
except OSError:
proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
except (OSError, ValueError):
return False
if not proxyEnable or not proxyOverride:
return False
@ -78,15 +91,15 @@ if sys.platform == 'win32':
# make a check value list from the registry entry: replace the
# '<local>' string by the localhost entry and the corresponding
# canonical entry.
proxyOverride = proxyOverride.split(';')
proxyOverride = proxyOverride.split(";")
# now check if we match one of the registry values.
for test in proxyOverride:
if test == '<local>':
if '.' not in host:
if test == "<local>":
if "." not in host:
return True
test = test.replace(".", r"\.") # mask dots
test = test.replace("*", r".*") # change glob sequence
test = test.replace("?", r".") # change glob char
test = test.replace(".", r"\.") # mask dots
test = test.replace("*", r".*") # change glob sequence
test = test.replace("?", r".") # change glob char
if re.match(test, host, re.I):
return True
return False
@ -106,7 +119,7 @@ if sys.platform == 'win32':
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
if hasattr(d, 'items'):
if hasattr(d, "items"):
d = d.items()
return d
@ -116,13 +129,13 @@ def super_len(o):
total_length = None
current_position = 0
if hasattr(o, '__len__'):
if hasattr(o, "__len__"):
total_length = len(o)
elif hasattr(o, 'len'):
elif hasattr(o, "len"):
total_length = o.len
elif hasattr(o, 'fileno'):
elif hasattr(o, "fileno"):
try:
fileno = o.fileno()
except (io.UnsupportedOperation, AttributeError):
@ -135,21 +148,23 @@ def super_len(o):
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
if "b" not in o.mode:
warnings.warn(
(
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."
),
FileModeWarning,
)
if hasattr(o, 'tell'):
if hasattr(o, "tell"):
try:
current_position = o.tell()
except (OSError, IOError):
except OSError:
# This can happen in some weird situations, such as when the file
# is actually a special file descriptor like stdin. In this
# instance, we don't know what the length is, so set it to zero and
@ -157,7 +172,7 @@ def super_len(o):
if total_length is not None:
current_position = total_length
else:
if hasattr(o, 'seek') and total_length is None:
if hasattr(o, "seek") and total_length is None:
# StringIO and BytesIO have seek but no usable fileno
try:
# seek to end of file
@ -167,7 +182,7 @@ def super_len(o):
# seek back to current position to support
# partially read file-like objects
o.seek(current_position or 0)
except (OSError, IOError):
except OSError:
total_length = 0
if total_length is None:
@ -179,14 +194,14 @@ def super_len(o):
def get_netrc_auth(url, raise_errors=False):
"""Returns the Requests tuple auth for a given url from netrc."""
netrc_file = os.environ.get('NETRC')
netrc_file = os.environ.get("NETRC")
if netrc_file is not None:
netrc_locations = (netrc_file,)
else:
netrc_locations = ('~/{}'.format(f) for f in NETRC_FILES)
netrc_locations = (f"~/{f}" for f in NETRC_FILES)
try:
from netrc import netrc, NetrcParseError
from netrc import NetrcParseError, netrc
netrc_path = None
@ -211,18 +226,18 @@ def get_netrc_auth(url, raise_errors=False):
# Strip port numbers from netloc. This weird `if...encode`` dance is
# used for Python 3.2, which doesn't support unicode literals.
splitstr = b':'
splitstr = b":"
if isinstance(url, str):
splitstr = splitstr.decode('ascii')
splitstr = splitstr.decode("ascii")
host = ri.netloc.split(splitstr)[0]
try:
_netrc = netrc(netrc_path).authenticators(host)
if _netrc:
# Return with login / password
login_i = (0 if _netrc[0] else 1)
login_i = 0 if _netrc[0] else 1
return (_netrc[login_i], _netrc[2])
except (NetrcParseError, IOError):
except (NetrcParseError, OSError):
# If there was a parsing error or a permissions issue reading the file,
# we'll just skip netrc auth unless explicitly asked to raise errors.
if raise_errors:
@ -235,9 +250,8 @@ def get_netrc_auth(url, raise_errors=False):
def guess_filename(obj):
"""Tries to guess the filename of the given object."""
name = getattr(obj, 'name', None)
if (name and isinstance(name, basestring) and name[0] != '<' and
name[-1] != '>'):
name = getattr(obj, "name", None)
if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
return os.path.basename(name)
@ -259,7 +273,7 @@ def extract_zipped_paths(path):
# If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
# we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
break
member = '/'.join([prefix, member])
member = "/".join([prefix, member])
if not zipfile.is_zipfile(archive):
return path
@ -270,7 +284,7 @@ def extract_zipped_paths(path):
# we have a valid zip archive and a valid member of that archive
tmp = tempfile.gettempdir()
extracted_path = os.path.join(tmp, member.split('/')[-1])
extracted_path = os.path.join(tmp, member.split("/")[-1])
if not os.path.exists(extracted_path):
# use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
with atomic_open(extracted_path) as file_handler:
@ -281,12 +295,11 @@ def extract_zipped_paths(path):
@contextlib.contextmanager
def atomic_open(filename):
"""Write a file to the disk in an atomic fashion"""
replacer = os.rename if sys.version_info[0] == 2 else os.replace
tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
try:
with os.fdopen(tmp_descriptor, 'wb') as tmp_handler:
with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
yield tmp_handler
replacer(tmp_name, filename)
os.replace(tmp_name, filename)
except BaseException:
os.remove(tmp_name)
raise
@ -314,7 +327,7 @@ def from_key_val_list(value):
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
raise ValueError("cannot encode objects that are not 2-tuples")
return OrderedDict(value)
@ -340,7 +353,7 @@ def to_key_val_list(value):
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
raise ValueError("cannot encode objects that are not 2-tuples")
if isinstance(value, Mapping):
value = value.items()
@ -405,10 +418,10 @@ def parse_dict_header(value):
"""
result = {}
for item in _parse_list_header(value):
if '=' not in item:
if "=" not in item:
result[item] = None
continue
name, value = item.split('=', 1)
name, value = item.split("=", 1)
if value[:1] == value[-1:] == '"':
value = unquote_header_value(value[1:-1])
result[name] = value
@ -436,8 +449,8 @@ def unquote_header_value(value, is_filename=False):
# replace sequence below on a UNC path has the effect of turning
# the leading double slash into a single slash and then
# _fix_ie_filename() doesn't work correctly. See #458.
if not is_filename or value[:2] != '\\\\':
return value.replace('\\\\', '\\').replace('\\"', '"')
if not is_filename or value[:2] != "\\\\":
return value.replace("\\\\", "\\").replace('\\"', '"')
return value
@ -472,19 +485,24 @@ def get_encodings_from_content(content):
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
warnings.warn(
(
"In requests 3.0, get_encodings_from_content will be removed. For "
"more information, please see the discussion on issue #2266. (This"
" warning should only appear once.)"
),
DeprecationWarning,
)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
return (
charset_re.findall(content)
+ pragma_re.findall(content)
+ xml_re.findall(content)
)
def _parse_content_type_header(header):
@ -495,7 +513,7 @@ def _parse_content_type_header(header):
parameters
"""
tokens = header.split(';')
tokens = header.split(";")
content_type, params = tokens[0].strip(), tokens[1:]
params_dict = {}
items_to_strip = "\"' "
@ -507,7 +525,7 @@ def _parse_content_type_header(header):
index_of_equals = param.find("=")
if index_of_equals != -1:
key = param[:index_of_equals].strip(items_to_strip)
value = param[index_of_equals + 1:].strip(items_to_strip)
value = param[index_of_equals + 1 :].strip(items_to_strip)
params_dict[key.lower()] = value
return content_type, params_dict
@ -519,38 +537,37 @@ def get_encoding_from_headers(headers):
:rtype: str
"""
content_type = headers.get('content-type')
content_type = headers.get("content-type")
if not content_type:
return None
content_type, params = _parse_content_type_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
if "charset" in params:
return params["charset"].strip("'\"")
if 'text' in content_type:
return 'ISO-8859-1'
if "text" in content_type:
return "ISO-8859-1"
if 'application/json' in content_type:
if "application/json" in content_type:
# Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
return 'utf-8'
return "utf-8"
def stream_decode_response_unicode(iterator, r):
"""Stream decodes a iterator."""
"""Stream decodes an iterator."""
if r.encoding is None:
for item in iterator:
yield item
yield from iterator
return
decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace')
decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
for chunk in iterator:
rv = decoder.decode(chunk)
if rv:
yield rv
rv = decoder.decode(b'', final=True)
rv = decoder.decode(b"", final=True)
if rv:
yield rv
@ -561,7 +578,7 @@ def iter_slices(string, slice_length):
if slice_length is None or slice_length <= 0:
slice_length = len(string)
while pos < len(string):
yield string[pos:pos + slice_length]
yield string[pos : pos + slice_length]
pos += slice_length
@ -577,11 +594,14 @@ def get_unicode_from_response(r):
:rtype: str
"""
warnings.warn((
'In requests 3.0, get_unicode_from_response will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
warnings.warn(
(
"In requests 3.0, get_unicode_from_response will be removed. For "
"more information, please see the discussion on issue #2266. (This"
" warning should only appear once.)"
),
DeprecationWarning,
)
tried_encodings = []
@ -596,14 +616,15 @@ def get_unicode_from_response(r):
# Fall back:
try:
return str(r.content, encoding, errors='replace')
return str(r.content, encoding, errors="replace")
except TypeError:
return r.content
# The unreserved URI characters (RFC 3986)
UNRESERVED_SET = frozenset(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~")
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
)
def unquote_unreserved(uri):
@ -612,22 +633,22 @@ def unquote_unreserved(uri):
:rtype: str
"""
parts = uri.split('%')
parts = uri.split("%")
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
parts[i] = f"%{parts[i]}"
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
parts[i] = f"%{parts[i]}"
return "".join(parts)
def requote_uri(uri):
@ -660,10 +681,10 @@ def address_in_network(ip, net):
:rtype: bool
"""
ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0]
netaddr, bits = net.split('/')
netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0]
network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask
ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
netaddr, bits = net.split("/")
netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
return (ipaddr & netmask) == (network & netmask)
@ -674,8 +695,8 @@ def dotted_netmask(mask):
:rtype: str
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack(">I", bits))
def is_ipv4_address(string_ip):
@ -684,7 +705,7 @@ def is_ipv4_address(string_ip):
"""
try:
socket.inet_aton(string_ip)
except socket.error:
except OSError:
return False
return True
@ -695,9 +716,9 @@ def is_valid_cidr(string_network):
:rtype: bool
"""
if string_network.count('/') == 1:
if string_network.count("/") == 1:
try:
mask = int(string_network.split('/')[1])
mask = int(string_network.split("/")[1])
except ValueError:
return False
@ -705,8 +726,8 @@ def is_valid_cidr(string_network):
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
socket.inet_aton(string_network.split("/")[0])
except OSError:
return False
else:
return False
@ -743,13 +764,14 @@ def should_bypass_proxies(url, no_proxy):
"""
# Prioritize lowercase environment variables over uppercase
# to keep a consistent behaviour with other http projects (curl, wget).
get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
def get_proxy(key):
return os.environ.get(key) or os.environ.get(key.upper())
# First check whether no_proxy is defined. If it is, check that the URL
# we're getting isn't in the no_proxy list.
no_proxy_arg = no_proxy
if no_proxy is None:
no_proxy = get_proxy('no_proxy')
no_proxy = get_proxy("no_proxy")
parsed = urlparse(url)
if parsed.hostname is None:
@ -759,9 +781,7 @@ def should_bypass_proxies(url, no_proxy):
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the hostname, both with and without the port.
no_proxy = (
host for host in no_proxy.replace(' ', '').split(',') if host
)
no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
if is_ipv4_address(parsed.hostname):
for proxy_ip in no_proxy:
@ -775,7 +795,7 @@ def should_bypass_proxies(url, no_proxy):
else:
host_with_port = parsed.hostname
if parsed.port:
host_with_port += ':{}'.format(parsed.port)
host_with_port += f":{parsed.port}"
for host in no_proxy:
if parsed.hostname.endswith(host) or host_with_port.endswith(host):
@ -783,7 +803,7 @@ def should_bypass_proxies(url, no_proxy):
# to apply the proxies on this URL.
return True
with set_environ('no_proxy', no_proxy_arg):
with set_environ("no_proxy", no_proxy_arg):
# parsed.hostname can be `None` in cases such as a file URI.
try:
bypass = proxy_bypass(parsed.hostname)
@ -817,13 +837,13 @@ def select_proxy(url, proxies):
proxies = proxies or {}
urlparts = urlparse(url)
if urlparts.hostname is None:
return proxies.get(urlparts.scheme, proxies.get('all'))
return proxies.get(urlparts.scheme, proxies.get("all"))
proxy_keys = [
urlparts.scheme + '://' + urlparts.hostname,
urlparts.scheme + "://" + urlparts.hostname,
urlparts.scheme,
'all://' + urlparts.hostname,
'all',
"all://" + urlparts.hostname,
"all",
]
proxy = None
for proxy_key in proxy_keys:
@ -848,13 +868,13 @@ def resolve_proxies(request, proxies, trust_env=True):
proxies = proxies if proxies is not None else {}
url = request.url
scheme = urlparse(url).scheme
no_proxy = proxies.get('no_proxy')
no_proxy = proxies.get("no_proxy")
new_proxies = proxies.copy()
if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
proxy = environ_proxies.get(scheme, environ_proxies.get('all'))
proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
if proxy:
new_proxies.setdefault(scheme, proxy)
@ -867,19 +887,21 @@ def default_user_agent(name="python-requests"):
:rtype: str
"""
return '%s/%s' % (name, __version__)
return f"{name}/{__version__}"
def default_headers():
"""
:rtype: requests.structures.CaseInsensitiveDict
"""
return CaseInsensitiveDict({
'User-Agent': default_user_agent(),
'Accept-Encoding': DEFAULT_ACCEPT_ENCODING,
'Accept': '*/*',
'Connection': 'keep-alive',
})
return CaseInsensitiveDict(
{
"User-Agent": default_user_agent(),
"Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
"Accept": "*/*",
"Connection": "keep-alive",
}
)
def parse_header_links(value):
@ -892,23 +914,23 @@ def parse_header_links(value):
links = []
replace_chars = ' \'"'
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
for val in re.split(', *<', value):
for val in re.split(", *<", value):
try:
url, params = val.split(';', 1)
url, params = val.split(";", 1)
except ValueError:
url, params = val, ''
url, params = val, ""
link = {'url': url.strip('<> \'"')}
link = {"url": url.strip("<> '\"")}
for param in params.split(';'):
for param in params.split(";"):
try:
key, value = param.split('=')
key, value = param.split("=")
except ValueError:
break
@ -920,7 +942,7 @@ def parse_header_links(value):
# Null bytes; no need to recreate these on each call to guess_json_utf
_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3
_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
_null2 = _null * 2
_null3 = _null * 3
@ -934,25 +956,25 @@ def guess_json_utf(data):
# determine the encoding. Also detect a BOM, if present.
sample = data[:4]
if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
return 'utf-32' # BOM included
return "utf-32" # BOM included
if sample[:3] == codecs.BOM_UTF8:
return 'utf-8-sig' # BOM included, MS style (discouraged)
return "utf-8-sig" # BOM included, MS style (discouraged)
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
return 'utf-16' # BOM included
return "utf-16" # BOM included
nullcount = sample.count(_null)
if nullcount == 0:
return 'utf-8'
return "utf-8"
if nullcount == 2:
if sample[::2] == _null2: # 1st and 3rd are null
return 'utf-16-be'
if sample[::2] == _null2: # 1st and 3rd are null
return "utf-16-be"
if sample[1::2] == _null2: # 2nd and 4th are null
return 'utf-16-le'
return "utf-16-le"
# Did not detect 2 valid UTF-16 ascii-range characters
if nullcount == 3:
if sample[:3] == _null3:
return 'utf-32-be'
return "utf-32-be"
if sample[1:] == _null3:
return 'utf-32-le'
return "utf-32-le"
# Did not detect a valid UTF-32 ascii-range character
return None
@ -977,13 +999,13 @@ def prepend_scheme_if_needed(url, new_scheme):
if auth:
# parse_url doesn't provide the netloc with auth
# so we'll add it ourselves.
netloc = '@'.join([auth, netloc])
netloc = "@".join([auth, netloc])
if scheme is None:
scheme = new_scheme
if path is None:
path = ''
path = ""
return urlunparse((scheme, netloc, path, '', query, fragment))
return urlunparse((scheme, netloc, path, "", query, fragment))
def get_auth_from_url(url):
@ -997,35 +1019,36 @@ def get_auth_from_url(url):
try:
auth = (unquote(parsed.username), unquote(parsed.password))
except (AttributeError, TypeError):
auth = ('', '')
auth = ("", "")
return auth
# Moved outside of function to avoid recompile every call
_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$')
_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$')
def check_header_validity(header):
"""Verifies that header value is a string which doesn't contain
leading whitespace or return characters. This prevents unintended
header injection.
"""Verifies that header parts don't contain leading whitespace
reserved characters, or return characters.
:param header: tuple, in the format (name, value).
"""
name, value = header
if isinstance(value, bytes):
pat = _CLEAN_HEADER_REGEX_BYTE
else:
pat = _CLEAN_HEADER_REGEX_STR
try:
if not pat.match(value):
raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
except TypeError:
raise InvalidHeader("Value for header {%s: %s} must be of type str or "
"bytes, not %s" % (name, value, type(value)))
for part in header:
if type(part) not in HEADER_VALIDATORS:
raise InvalidHeader(
f"Header part ({part!r}) from {{{name!r}: {value!r}}} must be "
f"of type str or bytes, not {type(part)}"
)
_validate_header_part(name, "name", HEADER_VALIDATORS[type(name)][0])
_validate_header_part(value, "value", HEADER_VALIDATORS[type(value)][1])
def _validate_header_part(header_part, header_kind, validator):
if not validator.match(header_part):
raise InvalidHeader(
f"Invalid leading whitespace, reserved character(s), or return"
f"character(s) in header {header_kind}: {header_part!r}"
)
def urldefragauth(url):
@ -1040,21 +1063,24 @@ def urldefragauth(url):
if not netloc:
netloc, path = path, netloc
netloc = netloc.rsplit('@', 1)[-1]
netloc = netloc.rsplit("@", 1)[-1]
return urlunparse((scheme, netloc, path, params, query, ''))
return urlunparse((scheme, netloc, path, params, query, ""))
def rewind_body(prepared_request):
"""Move file pointer back to its recorded starting position
so it can be read again on redirect.
"""
body_seek = getattr(prepared_request.body, 'seek', None)
if body_seek is not None and isinstance(prepared_request._body_position, integer_types):
body_seek = getattr(prepared_request.body, "seek", None)
if body_seek is not None and isinstance(
prepared_request._body_position, integer_types
):
try:
body_seek(prepared_request._body_position)
except (IOError, OSError):
raise UnrewindableBodyError("An error occurred when rewinding request "
"body for redirect.")
except OSError:
raise UnrewindableBodyError(
"An error occurred when rewinding request body for redirect."
)
else:
raise UnrewindableBodyError("Unable to rewind request body for redirect.")