Bump pyjwt from 2.3.0 to 2.4.0 (#1743)

* Bump pyjwt from 2.3.0 to 2.4.0

Bumps [pyjwt](https://github.com/jpadilla/pyjwt) from 2.3.0 to 2.4.0.
- [Release notes](https://github.com/jpadilla/pyjwt/releases)
- [Changelog](https://github.com/jpadilla/pyjwt/blob/master/CHANGELOG.rst)
- [Commits](https://github.com/jpadilla/pyjwt/compare/2.3.0...2.4.0)

---
updated-dependencies:
- dependency-name: pyjwt
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update pyjwt==2.4.0

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2022-05-16 20:56:13 -07:00 committed by GitHub
parent d17015de44
commit b3aeaafd00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 171 additions and 61 deletions

View file

@ -25,19 +25,19 @@ from .exceptions import (
) )
from .jwks_client import PyJWKClient from .jwks_client import PyJWKClient
__version__ = "2.3.0" __version__ = "2.4.0"
__title__ = "PyJWT" __title__ = "PyJWT"
__description__ = "JSON Web Token implementation in Python" __description__ = "JSON Web Token implementation in Python"
__url__ = "https://pyjwt.readthedocs.io" __url__ = "https://pyjwt.readthedocs.io"
__uri__ = __url__ __uri__ = __url__
__doc__ = __description__ + " <" + __uri__ + ">" __doc__ = f"{__description__} <{__uri__}>"
__author__ = "José Padilla" __author__ = "José Padilla"
__email__ = "hello@jpadilla.com" __email__ = "hello@jpadilla.com"
__license__ = "MIT" __license__ = "MIT"
__copyright__ = "Copyright 2015-2020 José Padilla" __copyright__ = "Copyright 2015-2022 José Padilla"
__all__ = [ __all__ = [

View file

@ -9,6 +9,8 @@ from .utils import (
der_to_raw_signature, der_to_raw_signature,
force_bytes, force_bytes,
from_base64url_uint, from_base64url_uint,
is_pem_format,
is_ssh_key,
raw_to_der_signature, raw_to_der_signature,
to_base64url_uint, to_base64url_uint,
) )
@ -183,14 +185,7 @@ class HMACAlgorithm(Algorithm):
def prepare_key(self, key): def prepare_key(self, key):
key = force_bytes(key) key = force_bytes(key)
invalid_strings = [ if is_pem_format(key) or is_ssh_key(key):
b"-----BEGIN PUBLIC KEY-----",
b"-----BEGIN CERTIFICATE-----",
b"-----BEGIN RSA PUBLIC KEY-----",
b"ssh-rsa",
]
if any(string_value in key for string_value in invalid_strings):
raise InvalidKeyError( raise InvalidKeyError(
"The specified key is an asymmetric key or x509 certificate and" "The specified key is an asymmetric key or x509 certificate and"
" should not be used as an HMAC secret." " should not be used as an HMAC secret."
@ -417,6 +412,12 @@ if has_crypto:
except ValueError: except ValueError:
key = load_pem_private_key(key, password=None) key = load_pem_private_key(key, password=None)
# Explicit check the key to prevent confusing errors from cryptography
if not isinstance(key, (EllipticCurvePrivateKey, EllipticCurvePublicKey)):
raise InvalidKeyError(
"Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for ECDSA algorithms"
)
return key return key
def sign(self, msg, key): def sign(self, msg, key):
@ -545,26 +546,28 @@ if has_crypto:
pass pass
def prepare_key(self, key): def prepare_key(self, key):
if isinstance(
key,
(Ed25519PrivateKey, Ed25519PublicKey, Ed448PrivateKey, Ed448PublicKey),
):
return key
if isinstance(key, (bytes, str)): if isinstance(key, (bytes, str)):
if isinstance(key, str): if isinstance(key, str):
key = key.encode("utf-8") key = key.encode("utf-8")
str_key = key.decode("utf-8") str_key = key.decode("utf-8")
if "-----BEGIN PUBLIC" in str_key: if "-----BEGIN PUBLIC" in str_key:
return load_pem_public_key(key) key = load_pem_public_key(key)
if "-----BEGIN PRIVATE" in str_key: elif "-----BEGIN PRIVATE" in str_key:
return load_pem_private_key(key, password=None) key = load_pem_private_key(key, password=None)
if str_key[0:4] == "ssh-": elif str_key[0:4] == "ssh-":
return load_ssh_public_key(key) key = load_ssh_public_key(key)
raise TypeError("Expecting a PEM-formatted or OpenSSH key.") # Explicit check the key to prevent confusing errors from cryptography
if not isinstance(
key,
(Ed25519PrivateKey, Ed25519PublicKey, Ed448PrivateKey, Ed448PublicKey),
):
raise InvalidKeyError(
"Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for EdDSA algorithms"
)
return key
def sign(self, msg, key): def sign(self, msg, key):
""" """

View file

@ -11,7 +11,7 @@ class PyJWK:
kty = self._jwk_data.get("kty", None) kty = self._jwk_data.get("kty", None)
if not kty: if not kty:
raise InvalidKeyError("kty is not found: %s" % self._jwk_data) raise InvalidKeyError(f"kty is not found: {self._jwk_data}")
if not algorithm and isinstance(self._jwk_data, dict): if not algorithm and isinstance(self._jwk_data, dict):
algorithm = self._jwk_data.get("alg", None) algorithm = self._jwk_data.get("alg", None)
@ -29,25 +29,25 @@ class PyJWK:
elif crv == "secp256k1": elif crv == "secp256k1":
algorithm = "ES256K" algorithm = "ES256K"
else: else:
raise InvalidKeyError("Unsupported crv: %s" % crv) raise InvalidKeyError(f"Unsupported crv: {crv}")
elif kty == "RSA": elif kty == "RSA":
algorithm = "RS256" algorithm = "RS256"
elif kty == "oct": elif kty == "oct":
algorithm = "HS256" algorithm = "HS256"
elif kty == "OKP": elif kty == "OKP":
if not crv: if not crv:
raise InvalidKeyError("crv is not found: %s" % self._jwk_data) raise InvalidKeyError(f"crv is not found: {self._jwk_data}")
if crv == "Ed25519": if crv == "Ed25519":
algorithm = "EdDSA" algorithm = "EdDSA"
else: else:
raise InvalidKeyError("Unsupported crv: %s" % crv) raise InvalidKeyError(f"Unsupported crv: {crv}")
else: else:
raise InvalidKeyError("Unsupported kty: %s" % kty) raise InvalidKeyError(f"Unsupported kty: {kty}")
self.Algorithm = self._algorithms.get(algorithm) self.Algorithm = self._algorithms.get(algorithm)
if not self.Algorithm: if not self.Algorithm:
raise PyJWKError("Unable to find a algorithm for key: %s" % self._jwk_data) raise PyJWKError(f"Unable to find a algorithm for key: {self._jwk_data}")
self.key = self.Algorithm.from_jwk(self._jwk_data) self.key = self.Algorithm.from_jwk(self._jwk_data)
@ -95,3 +95,9 @@ class PyJWKSet:
def from_json(data): def from_json(data):
obj = json.loads(data) obj = json.loads(data)
return PyJWKSet.from_dict(obj) return PyJWKSet.from_dict(obj)
def __getitem__(self, kid):
for key in self.keys:
if key.key_id == kid:
return key
raise KeyError(f"keyset has no key for kid: {kid}")

View file

@ -80,34 +80,54 @@ class PyJWS:
algorithm: Optional[str] = "HS256", algorithm: Optional[str] = "HS256",
headers: Optional[Dict] = None, headers: Optional[Dict] = None,
json_encoder: Optional[Type[json.JSONEncoder]] = None, json_encoder: Optional[Type[json.JSONEncoder]] = None,
is_payload_detached: bool = False,
) -> str: ) -> str:
segments = [] segments = []
if algorithm is None: if algorithm is None:
algorithm = "none" algorithm = "none"
# Prefer headers["alg"] if present to algorithm parameter. # Prefer headers values if present to function parameters.
if headers and "alg" in headers and headers["alg"]: if headers:
algorithm = headers["alg"] headers_alg = headers.get("alg")
if headers_alg:
algorithm = headers["alg"]
headers_b64 = headers.get("b64")
if headers_b64 is False:
is_payload_detached = True
# Header # Header
header = {"typ": self.header_typ, "alg": algorithm} header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any]
if headers: if headers:
self._validate_headers(headers) self._validate_headers(headers)
header.update(headers) header.update(headers)
if not header["typ"]:
del header["typ"] if not header["typ"]:
del header["typ"]
if is_payload_detached:
header["b64"] = False
elif "b64" in header:
# True is the standard value for b64, so no need for it
del header["b64"]
json_header = json.dumps( json_header = json.dumps(
header, separators=(",", ":"), cls=json_encoder header, separators=(",", ":"), cls=json_encoder
).encode() ).encode()
segments.append(base64url_encode(json_header)) segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))
if is_payload_detached:
msg_payload = payload
else:
msg_payload = base64url_encode(payload)
segments.append(msg_payload)
# Segments # Segments
signing_input = b".".join(segments) signing_input = b".".join(segments)
try: try:
alg_obj = self._algorithms[algorithm] alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key) key = alg_obj.prepare_key(key)
@ -116,14 +136,15 @@ class PyJWS:
except KeyError as e: except KeyError as e:
if not has_crypto and algorithm in requires_cryptography: if not has_crypto and algorithm in requires_cryptography:
raise NotImplementedError( raise NotImplementedError(
"Algorithm '%s' could not be found. Do you have cryptography " f"Algorithm '{algorithm}' could not be found. Do you have cryptography installed?"
"installed?" % algorithm
) from e ) from e
else: raise NotImplementedError("Algorithm not supported") from e
raise NotImplementedError("Algorithm not supported") from e
segments.append(base64url_encode(signature)) segments.append(base64url_encode(signature))
# Don't put the payload content inside the encoded token when detached
if is_payload_detached:
segments[1] = b""
encoded_string = b".".join(segments) encoded_string = b".".join(segments)
return encoded_string.decode("utf-8") return encoded_string.decode("utf-8")
@ -132,8 +153,9 @@ class PyJWS:
self, self,
jwt: str, jwt: str,
key: str = "", key: str = "",
algorithms: List[str] = None, algorithms: Optional[List[str]] = None,
options: Dict = None, options: Optional[Dict] = None,
detached_payload: Optional[bytes] = None,
**kwargs, **kwargs,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
if options is None: if options is None:
@ -148,6 +170,14 @@ class PyJWS:
payload, signing_input, header, signature = self._load(jwt) payload, signing_input, header, signature = self._load(jwt)
if header.get("b64", True) is False:
if detached_payload is None:
raise DecodeError(
'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
)
payload = detached_payload
signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
if verify_signature: if verify_signature:
self._verify_signature(signing_input, header, signature, key, algorithms) self._verify_signature(signing_input, header, signature, key, algorithms)
@ -161,8 +191,8 @@ class PyJWS:
self, self,
jwt: str, jwt: str,
key: str = "", key: str = "",
algorithms: List[str] = None, algorithms: Optional[List[str]] = None,
options: Dict = None, options: Optional[Dict] = None,
**kwargs, **kwargs,
) -> str: ) -> str:
decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs) decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs)
@ -200,7 +230,7 @@ class PyJWS:
try: try:
header = json.loads(header_data) header = json.loads(header_data)
except ValueError as e: except ValueError as e:
raise DecodeError("Invalid header string: %s" % e) from e raise DecodeError(f"Invalid header string: {e}") from e
if not isinstance(header, Mapping): if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object") raise DecodeError("Invalid header string: must be a json object")

View file

@ -1,4 +1,5 @@
import json import json
import warnings
from calendar import timegm from calendar import timegm
from collections.abc import Iterable, Mapping from collections.abc import Iterable, Mapping
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -66,14 +67,23 @@ class PyJWT:
self, self,
jwt: str, jwt: str,
key: str = "", key: str = "",
algorithms: List[str] = None, algorithms: Optional[List[str]] = None,
options: Dict = None, options: Optional[Dict] = None,
**kwargs, **kwargs,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
if options is None: options = dict(options or {}) # shallow-copy or initialize an empty dict
options = {"verify_signature": True} options.setdefault("verify_signature", True)
else:
options.setdefault("verify_signature", True) # If the user has set the legacy `verify` argument, and it doesn't match
# what the relevant `options` entry for the argument is, inform the user
# that they're likely making a mistake.
if "verify" in kwargs and kwargs["verify"] != options["verify_signature"]:
warnings.warn(
"The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
"The equivalent is setting `verify_signature` to False in the `options` dictionary. "
"This invocation has a mismatch between the kwarg and the option entry.",
category=DeprecationWarning,
)
if not options["verify_signature"]: if not options["verify_signature"]:
options.setdefault("verify_exp", False) options.setdefault("verify_exp", False)
@ -98,7 +108,7 @@ class PyJWT:
try: try:
payload = json.loads(decoded["payload"]) payload = json.loads(decoded["payload"])
except ValueError as e: except ValueError as e:
raise DecodeError("Invalid payload string: %s" % e) raise DecodeError(f"Invalid payload string: {e}")
if not isinstance(payload, dict): if not isinstance(payload, dict):
raise DecodeError("Invalid payload string: must be a json object") raise DecodeError("Invalid payload string: must be a json object")
@ -112,8 +122,8 @@ class PyJWT:
self, self,
jwt: str, jwt: str,
key: str = "", key: str = "",
algorithms: List[str] = None, algorithms: Optional[List[str]] = None,
options: Dict = None, options: Optional[Dict] = None,
**kwargs, **kwargs,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs) decoded = self.decode_complete(jwt, key, algorithms, options, **kwargs)

View file

@ -51,7 +51,7 @@ class MissingRequiredClaimError(InvalidTokenError):
self.claim = claim self.claim = claim
def __str__(self): def __str__(self):
return 'Token is missing the "%s" claim' % self.claim return f'Token is missing the "{self.claim}" claim'
class PyJWKError(PyJWTError): class PyJWKError(PyJWTError):

View file

@ -28,10 +28,10 @@ def info():
if implementation == "CPython": if implementation == "CPython":
implementation_version = platform.python_version() implementation_version = platform.python_version()
elif implementation == "PyPy": elif implementation == "PyPy":
implementation_version = "{}.{}.{}".format( implementation_version = (
sys.pypy_version_info.major, f"{sys.pypy_version_info.major}."
sys.pypy_version_info.minor, f"{sys.pypy_version_info.minor}."
sys.pypy_version_info.micro, f"{sys.pypy_version_info.micro}"
) )
if sys.pypy_version_info.releaselevel != "final": if sys.pypy_version_info.releaselevel != "final":
implementation_version = "".join( implementation_version = "".join(

View file

@ -1,5 +1,6 @@
import base64 import base64
import binascii import binascii
import re
from typing import Any, Union from typing import Any, Union
try: try:
@ -97,3 +98,63 @@ def raw_to_der_signature(raw_sig: bytes, curve: EllipticCurve) -> bytes:
s = bytes_to_number(raw_sig[num_bytes:]) s = bytes_to_number(raw_sig[num_bytes:])
return encode_dss_signature(r, s) return encode_dss_signature(r, s)
# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252
_PEMS = {
b"CERTIFICATE",
b"TRUSTED CERTIFICATE",
b"PRIVATE KEY",
b"PUBLIC KEY",
b"ENCRYPTED PRIVATE KEY",
b"OPENSSH PRIVATE KEY",
b"DSA PRIVATE KEY",
b"RSA PRIVATE KEY",
b"RSA PUBLIC KEY",
b"EC PRIVATE KEY",
b"DH PARAMETERS",
b"NEW CERTIFICATE REQUEST",
b"CERTIFICATE REQUEST",
b"SSH2 PUBLIC KEY",
b"SSH2 ENCRYPTED PRIVATE KEY",
b"X509 CRL",
}
_PEM_RE = re.compile(
b"----[- ]BEGIN ("
+ b"|".join(_PEMS)
+ b""")[- ]----\r?
.+?\r?
----[- ]END \\1[- ]----\r?\n?""",
re.DOTALL,
)
def is_pem_format(key: bytes) -> bool:
return bool(_PEM_RE.search(key))
# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46
_CERT_SUFFIX = b"-cert-v01@openssh.com"
_SSH_PUBKEY_RC = re.compile(br"\A(\S+)[ \t]+(\S+)")
_SSH_KEY_FORMATS = [
b"ssh-ed25519",
b"ssh-rsa",
b"ssh-dss",
b"ecdsa-sha2-nistp256",
b"ecdsa-sha2-nistp384",
b"ecdsa-sha2-nistp521",
]
def is_ssh_key(key: bytes) -> bool:
if any(string_value in key for string_value in _SSH_KEY_FORMATS):
return True
ssh_pubkey_match = _SSH_PUBKEY_RC.match(key)
if ssh_pubkey_match:
key_type = ssh_pubkey_match.group(1)
if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]:
return True
return False

View file

@ -30,7 +30,7 @@ paho-mqtt==1.6.1
plexapi==4.9.2 plexapi==4.9.2
portend==3.1.0 portend==3.1.0
profilehooks==1.12.0 profilehooks==1.12.0
PyJWT==2.3.0 PyJWT==2.4.0
pyparsing==3.0.9 pyparsing==3.0.9
python-dateutil==2.8.2 python-dateutil==2.8.2
python-twitter==3.5 python-twitter==3.5