mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-06 13:11:15 -07:00
Add pyjwt 1.4.0
This commit is contained in:
parent
644fea6665
commit
7c4c7bfc90
12 changed files with 1104 additions and 0 deletions
29
lib/jwt/__init__.py
Normal file
29
lib/jwt/__init__.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# flake8: noqa
|
||||||
|
|
||||||
|
"""
|
||||||
|
JSON Web Token implementation
|
||||||
|
|
||||||
|
Minimum implementation based on this spec:
|
||||||
|
http://self-issued.info/docs/draft-jones-json-web-token-01.html
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
__title__ = 'pyjwt'
|
||||||
|
__version__ = '1.4.0'
|
||||||
|
__author__ = 'José Padilla'
|
||||||
|
__license__ = 'MIT'
|
||||||
|
__copyright__ = 'Copyright 2015 José Padilla'
|
||||||
|
|
||||||
|
|
||||||
|
from .api_jwt import (
|
||||||
|
encode, decode, register_algorithm, unregister_algorithm,
|
||||||
|
get_unverified_header, PyJWT
|
||||||
|
)
|
||||||
|
from .api_jws import PyJWS
|
||||||
|
from .exceptions import (
|
||||||
|
InvalidTokenError, DecodeError, InvalidAudienceError,
|
||||||
|
ExpiredSignatureError, ImmatureSignatureError, InvalidIssuedAtError,
|
||||||
|
InvalidIssuerError, ExpiredSignature, InvalidAudience, InvalidIssuer,
|
||||||
|
MissingRequiredClaimError
|
||||||
|
)
|
135
lib/jwt/__main__.py
Normal file
135
lib/jwt/__main__.py
Normal file
|
@ -0,0 +1,135 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
from __future__ import absolute_import, print_function
|
||||||
|
|
||||||
|
import json
|
||||||
|
import optparse
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from . import DecodeError, __package__, __version__, decode, encode
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
usage = '''Encodes or decodes JSON Web Tokens based on input.
|
||||||
|
|
||||||
|
%prog [options] input
|
||||||
|
|
||||||
|
Decoding examples:
|
||||||
|
|
||||||
|
%prog --key=secret json.web.token
|
||||||
|
%prog --no-verify json.web.token
|
||||||
|
|
||||||
|
Encoding requires the key option and takes space separated key/value pairs
|
||||||
|
separated by equals (=) as input. Examples:
|
||||||
|
|
||||||
|
%prog --key=secret iss=me exp=1302049071
|
||||||
|
%prog --key=secret foo=bar exp=+10
|
||||||
|
|
||||||
|
The exp key is special and can take an offset to current Unix time.\
|
||||||
|
'''
|
||||||
|
p = optparse.OptionParser(
|
||||||
|
usage=usage,
|
||||||
|
prog=__package__,
|
||||||
|
version='%s %s' % (__package__, __version__),
|
||||||
|
)
|
||||||
|
|
||||||
|
p.add_option(
|
||||||
|
'-n', '--no-verify',
|
||||||
|
action='store_false',
|
||||||
|
dest='verify',
|
||||||
|
default=True,
|
||||||
|
help='ignore signature verification on decode'
|
||||||
|
)
|
||||||
|
|
||||||
|
p.add_option(
|
||||||
|
'--key',
|
||||||
|
dest='key',
|
||||||
|
metavar='KEY',
|
||||||
|
default=None,
|
||||||
|
help='set the secret key to sign with'
|
||||||
|
)
|
||||||
|
|
||||||
|
p.add_option(
|
||||||
|
'--alg',
|
||||||
|
dest='algorithm',
|
||||||
|
metavar='ALG',
|
||||||
|
default='HS256',
|
||||||
|
help='set crypto algorithm to sign with. default=HS256'
|
||||||
|
)
|
||||||
|
|
||||||
|
options, arguments = p.parse_args()
|
||||||
|
|
||||||
|
if len(arguments) > 0 or not sys.stdin.isatty():
|
||||||
|
if len(arguments) == 1 and (not options.verify or options.key):
|
||||||
|
# Try to decode
|
||||||
|
try:
|
||||||
|
if not sys.stdin.isatty():
|
||||||
|
token = sys.stdin.read()
|
||||||
|
else:
|
||||||
|
token = arguments[0]
|
||||||
|
|
||||||
|
token = token.encode('utf-8')
|
||||||
|
data = decode(token, key=options.key, verify=options.verify)
|
||||||
|
|
||||||
|
print(json.dumps(data))
|
||||||
|
sys.exit(0)
|
||||||
|
except DecodeError as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Try to encode
|
||||||
|
if options.key is None:
|
||||||
|
print('Key is required when encoding. See --help for usage.')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Build payload object to encode
|
||||||
|
payload = {}
|
||||||
|
|
||||||
|
for arg in arguments:
|
||||||
|
try:
|
||||||
|
k, v = arg.split('=', 1)
|
||||||
|
|
||||||
|
# exp +offset special case?
|
||||||
|
if k == 'exp' and v[0] == '+' and len(v) > 1:
|
||||||
|
v = str(int(time.time()+int(v[1:])))
|
||||||
|
|
||||||
|
# Cast to integer?
|
||||||
|
if v.isdigit():
|
||||||
|
v = int(v)
|
||||||
|
else:
|
||||||
|
# Cast to float?
|
||||||
|
try:
|
||||||
|
v = float(v)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Cast to true, false, or null?
|
||||||
|
constants = {'true': True, 'false': False, 'null': None}
|
||||||
|
|
||||||
|
if v in constants:
|
||||||
|
v = constants[v]
|
||||||
|
|
||||||
|
payload[k] = v
|
||||||
|
except ValueError:
|
||||||
|
print('Invalid encoding input at {}'.format(arg))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
token = encode(
|
||||||
|
payload,
|
||||||
|
key=options.key,
|
||||||
|
algorithm=options.algorithm
|
||||||
|
)
|
||||||
|
|
||||||
|
print(token)
|
||||||
|
sys.exit(0)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
p.print_help()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
290
lib/jwt/algorithms.py
Normal file
290
lib/jwt/algorithms.py
Normal file
|
@ -0,0 +1,290 @@
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
|
||||||
|
from .compat import constant_time_compare, string_types, text_type
|
||||||
|
from .exceptions import InvalidKeyError
|
||||||
|
from .utils import der_to_raw_signature, raw_to_der_signature
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.serialization import (
|
||||||
|
load_pem_private_key, load_pem_public_key, load_ssh_public_key
|
||||||
|
)
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.rsa import (
|
||||||
|
RSAPrivateKey, RSAPublicKey
|
||||||
|
)
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.ec import (
|
||||||
|
EllipticCurvePrivateKey, EllipticCurvePublicKey
|
||||||
|
)
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec, padding
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
|
||||||
|
has_crypto = True
|
||||||
|
except ImportError:
|
||||||
|
has_crypto = False
|
||||||
|
|
||||||
|
|
||||||
|
def get_default_algorithms():
|
||||||
|
"""
|
||||||
|
Returns the algorithms that are implemented by the library.
|
||||||
|
"""
|
||||||
|
default_algorithms = {
|
||||||
|
'none': NoneAlgorithm(),
|
||||||
|
'HS256': HMACAlgorithm(HMACAlgorithm.SHA256),
|
||||||
|
'HS384': HMACAlgorithm(HMACAlgorithm.SHA384),
|
||||||
|
'HS512': HMACAlgorithm(HMACAlgorithm.SHA512)
|
||||||
|
}
|
||||||
|
|
||||||
|
if has_crypto:
|
||||||
|
default_algorithms.update({
|
||||||
|
'RS256': RSAAlgorithm(RSAAlgorithm.SHA256),
|
||||||
|
'RS384': RSAAlgorithm(RSAAlgorithm.SHA384),
|
||||||
|
'RS512': RSAAlgorithm(RSAAlgorithm.SHA512),
|
||||||
|
'ES256': ECAlgorithm(ECAlgorithm.SHA256),
|
||||||
|
'ES384': ECAlgorithm(ECAlgorithm.SHA384),
|
||||||
|
'ES512': ECAlgorithm(ECAlgorithm.SHA512),
|
||||||
|
'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
|
||||||
|
'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
|
||||||
|
'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512)
|
||||||
|
})
|
||||||
|
|
||||||
|
return default_algorithms
|
||||||
|
|
||||||
|
|
||||||
|
class Algorithm(object):
|
||||||
|
"""
|
||||||
|
The interface for an algorithm used to sign and verify tokens.
|
||||||
|
"""
|
||||||
|
def prepare_key(self, key):
|
||||||
|
"""
|
||||||
|
Performs necessary validation and conversions on the key and returns
|
||||||
|
the key value in the proper format for sign() and verify().
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
"""
|
||||||
|
Returns a digital signature for the specified message
|
||||||
|
using the specified key value.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
"""
|
||||||
|
Verifies that the specified digital signature is valid
|
||||||
|
for the specified message and key values.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class NoneAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Placeholder for use when no signing or verification
|
||||||
|
operations are required.
|
||||||
|
"""
|
||||||
|
def prepare_key(self, key):
|
||||||
|
if key == '':
|
||||||
|
key = None
|
||||||
|
|
||||||
|
if key is not None:
|
||||||
|
raise InvalidKeyError('When alg = "none", key value must be None.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
return b''
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class HMACAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Performs signing and verification operations using HMAC
|
||||||
|
and the specified hash function.
|
||||||
|
"""
|
||||||
|
SHA256 = hashlib.sha256
|
||||||
|
SHA384 = hashlib.sha384
|
||||||
|
SHA512 = hashlib.sha512
|
||||||
|
|
||||||
|
def __init__(self, hash_alg):
|
||||||
|
self.hash_alg = hash_alg
|
||||||
|
|
||||||
|
def prepare_key(self, key):
|
||||||
|
if not isinstance(key, string_types) and not isinstance(key, bytes):
|
||||||
|
raise TypeError('Expecting a string- or bytes-formatted key.')
|
||||||
|
|
||||||
|
if isinstance(key, text_type):
|
||||||
|
key = key.encode('utf-8')
|
||||||
|
|
||||||
|
invalid_strings = [
|
||||||
|
b'-----BEGIN PUBLIC KEY-----',
|
||||||
|
b'-----BEGIN CERTIFICATE-----',
|
||||||
|
b'ssh-rsa'
|
||||||
|
]
|
||||||
|
|
||||||
|
if any([string_value in key for string_value in invalid_strings]):
|
||||||
|
raise InvalidKeyError(
|
||||||
|
'The specified key is an asymmetric key or x509 certificate and'
|
||||||
|
' should not be used as an HMAC secret.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
return hmac.new(key, msg, self.hash_alg).digest()
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
return constant_time_compare(sig, self.sign(msg, key))
|
||||||
|
|
||||||
|
if has_crypto:
|
||||||
|
|
||||||
|
class RSAAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Performs signing and verification operations using
|
||||||
|
RSASSA-PKCS-v1_5 and the specified hash function.
|
||||||
|
"""
|
||||||
|
SHA256 = hashes.SHA256
|
||||||
|
SHA384 = hashes.SHA384
|
||||||
|
SHA512 = hashes.SHA512
|
||||||
|
|
||||||
|
def __init__(self, hash_alg):
|
||||||
|
self.hash_alg = hash_alg
|
||||||
|
|
||||||
|
def prepare_key(self, key):
|
||||||
|
if isinstance(key, RSAPrivateKey) or \
|
||||||
|
isinstance(key, RSAPublicKey):
|
||||||
|
return key
|
||||||
|
|
||||||
|
if isinstance(key, string_types):
|
||||||
|
if isinstance(key, text_type):
|
||||||
|
key = key.encode('utf-8')
|
||||||
|
|
||||||
|
try:
|
||||||
|
if key.startswith(b'ssh-rsa'):
|
||||||
|
key = load_ssh_public_key(key, backend=default_backend())
|
||||||
|
else:
|
||||||
|
key = load_pem_private_key(key, password=None, backend=default_backend())
|
||||||
|
except ValueError:
|
||||||
|
key = load_pem_public_key(key, backend=default_backend())
|
||||||
|
else:
|
||||||
|
raise TypeError('Expecting a PEM-formatted key.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
signer = key.signer(
|
||||||
|
padding.PKCS1v15(),
|
||||||
|
self.hash_alg()
|
||||||
|
)
|
||||||
|
|
||||||
|
signer.update(msg)
|
||||||
|
return signer.finalize()
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
verifier = key.verifier(
|
||||||
|
sig,
|
||||||
|
padding.PKCS1v15(),
|
||||||
|
self.hash_alg()
|
||||||
|
)
|
||||||
|
|
||||||
|
verifier.update(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
verifier.verify()
|
||||||
|
return True
|
||||||
|
except InvalidSignature:
|
||||||
|
return False
|
||||||
|
|
||||||
|
class ECAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Performs signing and verification operations using
|
||||||
|
ECDSA and the specified hash function
|
||||||
|
"""
|
||||||
|
SHA256 = hashes.SHA256
|
||||||
|
SHA384 = hashes.SHA384
|
||||||
|
SHA512 = hashes.SHA512
|
||||||
|
|
||||||
|
def __init__(self, hash_alg):
|
||||||
|
self.hash_alg = hash_alg
|
||||||
|
|
||||||
|
def prepare_key(self, key):
|
||||||
|
if isinstance(key, EllipticCurvePrivateKey) or \
|
||||||
|
isinstance(key, EllipticCurvePublicKey):
|
||||||
|
return key
|
||||||
|
|
||||||
|
if isinstance(key, string_types):
|
||||||
|
if isinstance(key, text_type):
|
||||||
|
key = key.encode('utf-8')
|
||||||
|
|
||||||
|
# Attempt to load key. We don't know if it's
|
||||||
|
# a Signing Key or a Verifying Key, so we try
|
||||||
|
# the Verifying Key first.
|
||||||
|
try:
|
||||||
|
key = load_pem_public_key(key, backend=default_backend())
|
||||||
|
except ValueError:
|
||||||
|
key = load_pem_private_key(key, password=None, backend=default_backend())
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise TypeError('Expecting a PEM-formatted key.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
signer = key.signer(ec.ECDSA(self.hash_alg()))
|
||||||
|
|
||||||
|
signer.update(msg)
|
||||||
|
der_sig = signer.finalize()
|
||||||
|
|
||||||
|
return der_to_raw_signature(der_sig, key.curve)
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
try:
|
||||||
|
der_sig = raw_to_der_signature(sig, key.curve)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
verifier = key.verifier(der_sig, ec.ECDSA(self.hash_alg()))
|
||||||
|
|
||||||
|
verifier.update(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
verifier.verify()
|
||||||
|
return True
|
||||||
|
except InvalidSignature:
|
||||||
|
return False
|
||||||
|
|
||||||
|
class RSAPSSAlgorithm(RSAAlgorithm):
|
||||||
|
"""
|
||||||
|
Performs a signature using RSASSA-PSS with MGF1
|
||||||
|
"""
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
signer = key.signer(
|
||||||
|
padding.PSS(
|
||||||
|
mgf=padding.MGF1(self.hash_alg()),
|
||||||
|
salt_length=self.hash_alg.digest_size
|
||||||
|
),
|
||||||
|
self.hash_alg()
|
||||||
|
)
|
||||||
|
|
||||||
|
signer.update(msg)
|
||||||
|
return signer.finalize()
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
verifier = key.verifier(
|
||||||
|
sig,
|
||||||
|
padding.PSS(
|
||||||
|
mgf=padding.MGF1(self.hash_alg()),
|
||||||
|
salt_length=self.hash_alg.digest_size
|
||||||
|
),
|
||||||
|
self.hash_alg()
|
||||||
|
)
|
||||||
|
|
||||||
|
verifier.update(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
verifier.verify()
|
||||||
|
return True
|
||||||
|
except InvalidSignature:
|
||||||
|
return False
|
189
lib/jwt/api_jws.py
Normal file
189
lib/jwt/api_jws.py
Normal file
|
@ -0,0 +1,189 @@
|
||||||
|
import binascii
|
||||||
|
import json
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
from collections import Mapping
|
||||||
|
|
||||||
|
from .algorithms import Algorithm, get_default_algorithms # NOQA
|
||||||
|
from .compat import text_type
|
||||||
|
from .exceptions import DecodeError, InvalidAlgorithmError
|
||||||
|
from .utils import base64url_decode, base64url_encode, merge_dict
|
||||||
|
|
||||||
|
|
||||||
|
class PyJWS(object):
|
||||||
|
header_typ = 'JWT'
|
||||||
|
|
||||||
|
def __init__(self, algorithms=None, options=None):
|
||||||
|
self._algorithms = get_default_algorithms()
|
||||||
|
self._valid_algs = (set(algorithms) if algorithms is not None
|
||||||
|
else set(self._algorithms))
|
||||||
|
|
||||||
|
# Remove algorithms that aren't on the whitelist
|
||||||
|
for key in list(self._algorithms.keys()):
|
||||||
|
if key not in self._valid_algs:
|
||||||
|
del self._algorithms[key]
|
||||||
|
|
||||||
|
if not options:
|
||||||
|
options = {}
|
||||||
|
|
||||||
|
self.options = merge_dict(self._get_default_options(), options)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_default_options():
|
||||||
|
return {
|
||||||
|
'verify_signature': True
|
||||||
|
}
|
||||||
|
|
||||||
|
def register_algorithm(self, alg_id, alg_obj):
|
||||||
|
"""
|
||||||
|
Registers a new Algorithm for use when creating and verifying tokens.
|
||||||
|
"""
|
||||||
|
if alg_id in self._algorithms:
|
||||||
|
raise ValueError('Algorithm already has a handler.')
|
||||||
|
|
||||||
|
if not isinstance(alg_obj, Algorithm):
|
||||||
|
raise TypeError('Object is not of type `Algorithm`')
|
||||||
|
|
||||||
|
self._algorithms[alg_id] = alg_obj
|
||||||
|
self._valid_algs.add(alg_id)
|
||||||
|
|
||||||
|
def unregister_algorithm(self, alg_id):
|
||||||
|
"""
|
||||||
|
Unregisters an Algorithm for use when creating and verifying tokens
|
||||||
|
Throws KeyError if algorithm is not registered.
|
||||||
|
"""
|
||||||
|
if alg_id not in self._algorithms:
|
||||||
|
raise KeyError('The specified algorithm could not be removed'
|
||||||
|
' because it is not registered.')
|
||||||
|
|
||||||
|
del self._algorithms[alg_id]
|
||||||
|
self._valid_algs.remove(alg_id)
|
||||||
|
|
||||||
|
def get_algorithms(self):
|
||||||
|
"""
|
||||||
|
Returns a list of supported values for the 'alg' parameter.
|
||||||
|
"""
|
||||||
|
return list(self._valid_algs)
|
||||||
|
|
||||||
|
def encode(self, payload, key, algorithm='HS256', headers=None,
|
||||||
|
json_encoder=None):
|
||||||
|
segments = []
|
||||||
|
|
||||||
|
if algorithm is None:
|
||||||
|
algorithm = 'none'
|
||||||
|
|
||||||
|
if algorithm not in self._valid_algs:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Header
|
||||||
|
header = {'typ': self.header_typ, 'alg': algorithm}
|
||||||
|
|
||||||
|
if headers:
|
||||||
|
header.update(headers)
|
||||||
|
|
||||||
|
json_header = json.dumps(
|
||||||
|
header,
|
||||||
|
separators=(',', ':'),
|
||||||
|
cls=json_encoder
|
||||||
|
).encode('utf-8')
|
||||||
|
|
||||||
|
segments.append(base64url_encode(json_header))
|
||||||
|
segments.append(base64url_encode(payload))
|
||||||
|
|
||||||
|
# Segments
|
||||||
|
signing_input = b'.'.join(segments)
|
||||||
|
try:
|
||||||
|
alg_obj = self._algorithms[algorithm]
|
||||||
|
key = alg_obj.prepare_key(key)
|
||||||
|
signature = alg_obj.sign(signing_input, key)
|
||||||
|
|
||||||
|
except KeyError:
|
||||||
|
raise NotImplementedError('Algorithm not supported')
|
||||||
|
|
||||||
|
segments.append(base64url_encode(signature))
|
||||||
|
|
||||||
|
return b'.'.join(segments)
|
||||||
|
|
||||||
|
def decode(self, jws, key='', verify=True, algorithms=None, options=None,
|
||||||
|
**kwargs):
|
||||||
|
payload, signing_input, header, signature = self._load(jws)
|
||||||
|
|
||||||
|
if verify:
|
||||||
|
merged_options = merge_dict(self.options, options)
|
||||||
|
if merged_options.get('verify_signature'):
|
||||||
|
self._verify_signature(payload, signing_input, header, signature,
|
||||||
|
key, algorithms)
|
||||||
|
else:
|
||||||
|
warnings.warn('The verify parameter is deprecated. '
|
||||||
|
'Please use options instead.', DeprecationWarning)
|
||||||
|
|
||||||
|
return payload
|
||||||
|
|
||||||
|
def get_unverified_header(self, jwt):
|
||||||
|
"""Returns back the JWT header parameters as a dict()
|
||||||
|
|
||||||
|
Note: The signature is not verified so the header parameters
|
||||||
|
should not be fully trusted until signature verification is complete
|
||||||
|
"""
|
||||||
|
return self._load(jwt)[2]
|
||||||
|
|
||||||
|
def _load(self, jwt):
|
||||||
|
if isinstance(jwt, text_type):
|
||||||
|
jwt = jwt.encode('utf-8')
|
||||||
|
|
||||||
|
try:
|
||||||
|
signing_input, crypto_segment = jwt.rsplit(b'.', 1)
|
||||||
|
header_segment, payload_segment = signing_input.split(b'.', 1)
|
||||||
|
except ValueError:
|
||||||
|
raise DecodeError('Not enough segments')
|
||||||
|
|
||||||
|
try:
|
||||||
|
header_data = base64url_decode(header_segment)
|
||||||
|
except (TypeError, binascii.Error):
|
||||||
|
raise DecodeError('Invalid header padding')
|
||||||
|
|
||||||
|
try:
|
||||||
|
header = json.loads(header_data.decode('utf-8'))
|
||||||
|
except ValueError as e:
|
||||||
|
raise DecodeError('Invalid header string: %s' % e)
|
||||||
|
|
||||||
|
if not isinstance(header, Mapping):
|
||||||
|
raise DecodeError('Invalid header string: must be a json object')
|
||||||
|
|
||||||
|
try:
|
||||||
|
payload = base64url_decode(payload_segment)
|
||||||
|
except (TypeError, binascii.Error):
|
||||||
|
raise DecodeError('Invalid payload padding')
|
||||||
|
|
||||||
|
try:
|
||||||
|
signature = base64url_decode(crypto_segment)
|
||||||
|
except (TypeError, binascii.Error):
|
||||||
|
raise DecodeError('Invalid crypto padding')
|
||||||
|
|
||||||
|
return (payload, signing_input, header, signature)
|
||||||
|
|
||||||
|
def _verify_signature(self, payload, signing_input, header, signature,
|
||||||
|
key='', algorithms=None):
|
||||||
|
|
||||||
|
alg = header.get('alg')
|
||||||
|
|
||||||
|
if algorithms is not None and alg not in algorithms:
|
||||||
|
raise InvalidAlgorithmError('The specified alg value is not allowed')
|
||||||
|
|
||||||
|
try:
|
||||||
|
alg_obj = self._algorithms[alg]
|
||||||
|
key = alg_obj.prepare_key(key)
|
||||||
|
|
||||||
|
if not alg_obj.verify(signing_input, key, signature):
|
||||||
|
raise DecodeError('Signature verification failed')
|
||||||
|
|
||||||
|
except KeyError:
|
||||||
|
raise InvalidAlgorithmError('Algorithm not supported')
|
||||||
|
|
||||||
|
|
||||||
|
_jws_global_obj = PyJWS()
|
||||||
|
encode = _jws_global_obj.encode
|
||||||
|
decode = _jws_global_obj.decode
|
||||||
|
register_algorithm = _jws_global_obj.register_algorithm
|
||||||
|
unregister_algorithm = _jws_global_obj.unregister_algorithm
|
||||||
|
get_unverified_header = _jws_global_obj.get_unverified_header
|
187
lib/jwt/api_jwt.py
Normal file
187
lib/jwt/api_jwt.py
Normal file
|
@ -0,0 +1,187 @@
|
||||||
|
import json
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
from calendar import timegm
|
||||||
|
from collections import Mapping
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from .api_jws import PyJWS
|
||||||
|
from .algorithms import Algorithm, get_default_algorithms # NOQA
|
||||||
|
from .compat import string_types, timedelta_total_seconds
|
||||||
|
from .exceptions import (
|
||||||
|
DecodeError, ExpiredSignatureError, ImmatureSignatureError,
|
||||||
|
InvalidAudienceError, InvalidIssuedAtError,
|
||||||
|
InvalidIssuerError, MissingRequiredClaimError
|
||||||
|
)
|
||||||
|
from .utils import merge_dict
|
||||||
|
|
||||||
|
|
||||||
|
class PyJWT(PyJWS):
|
||||||
|
header_type = 'JWT'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_default_options():
|
||||||
|
return {
|
||||||
|
'verify_signature': True,
|
||||||
|
'verify_exp': True,
|
||||||
|
'verify_nbf': True,
|
||||||
|
'verify_iat': True,
|
||||||
|
'verify_aud': True,
|
||||||
|
'verify_iss': True,
|
||||||
|
'require_exp': False,
|
||||||
|
'require_iat': False,
|
||||||
|
'require_nbf': False
|
||||||
|
}
|
||||||
|
|
||||||
|
def encode(self, payload, key, algorithm='HS256', headers=None,
|
||||||
|
json_encoder=None):
|
||||||
|
# Check that we get a mapping
|
||||||
|
if not isinstance(payload, Mapping):
|
||||||
|
raise TypeError('Expecting a mapping object, as JWT only supports '
|
||||||
|
'JSON objects as payloads.')
|
||||||
|
|
||||||
|
# Payload
|
||||||
|
for time_claim in ['exp', 'iat', 'nbf']:
|
||||||
|
# Convert datetime to a intDate value in known time-format claims
|
||||||
|
if isinstance(payload.get(time_claim), datetime):
|
||||||
|
payload[time_claim] = timegm(payload[time_claim].utctimetuple())
|
||||||
|
|
||||||
|
json_payload = json.dumps(
|
||||||
|
payload,
|
||||||
|
separators=(',', ':'),
|
||||||
|
cls=json_encoder
|
||||||
|
).encode('utf-8')
|
||||||
|
|
||||||
|
return super(PyJWT, self).encode(
|
||||||
|
json_payload, key, algorithm, headers, json_encoder
|
||||||
|
)
|
||||||
|
|
||||||
|
def decode(self, jwt, key='', verify=True, algorithms=None, options=None,
|
||||||
|
**kwargs):
|
||||||
|
payload, signing_input, header, signature = self._load(jwt)
|
||||||
|
|
||||||
|
decoded = super(PyJWT, self).decode(jwt, key, verify, algorithms,
|
||||||
|
options, **kwargs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
payload = json.loads(decoded.decode('utf-8'))
|
||||||
|
except ValueError as e:
|
||||||
|
raise DecodeError('Invalid payload string: %s' % e)
|
||||||
|
if not isinstance(payload, Mapping):
|
||||||
|
raise DecodeError('Invalid payload string: must be a json object')
|
||||||
|
|
||||||
|
if verify:
|
||||||
|
merged_options = merge_dict(self.options, options)
|
||||||
|
self._validate_claims(payload, merged_options, **kwargs)
|
||||||
|
|
||||||
|
return payload
|
||||||
|
|
||||||
|
def _validate_claims(self, payload, options, audience=None, issuer=None,
|
||||||
|
leeway=0, **kwargs):
|
||||||
|
|
||||||
|
if 'verify_expiration' in kwargs:
|
||||||
|
options['verify_exp'] = kwargs.get('verify_expiration', True)
|
||||||
|
warnings.warn('The verify_expiration parameter is deprecated. '
|
||||||
|
'Please use options instead.', DeprecationWarning)
|
||||||
|
|
||||||
|
if isinstance(leeway, timedelta):
|
||||||
|
leeway = timedelta_total_seconds(leeway)
|
||||||
|
|
||||||
|
if not isinstance(audience, (string_types, type(None))):
|
||||||
|
raise TypeError('audience must be a string or None')
|
||||||
|
|
||||||
|
self._validate_required_claims(payload, options)
|
||||||
|
|
||||||
|
now = timegm(datetime.utcnow().utctimetuple())
|
||||||
|
|
||||||
|
if 'iat' in payload and options.get('verify_iat'):
|
||||||
|
self._validate_iat(payload, now, leeway)
|
||||||
|
|
||||||
|
if 'nbf' in payload and options.get('verify_nbf'):
|
||||||
|
self._validate_nbf(payload, now, leeway)
|
||||||
|
|
||||||
|
if 'exp' in payload and options.get('verify_exp'):
|
||||||
|
self._validate_exp(payload, now, leeway)
|
||||||
|
|
||||||
|
if options.get('verify_iss'):
|
||||||
|
self._validate_iss(payload, issuer)
|
||||||
|
|
||||||
|
if options.get('verify_aud'):
|
||||||
|
self._validate_aud(payload, audience)
|
||||||
|
|
||||||
|
def _validate_required_claims(self, payload, options):
|
||||||
|
if options.get('require_exp') and payload.get('exp') is None:
|
||||||
|
raise MissingRequiredClaimError('exp')
|
||||||
|
|
||||||
|
if options.get('require_iat') and payload.get('iat') is None:
|
||||||
|
raise MissingRequiredClaimError('iat')
|
||||||
|
|
||||||
|
if options.get('require_nbf') and payload.get('nbf') is None:
|
||||||
|
raise MissingRequiredClaimError('nbf')
|
||||||
|
|
||||||
|
def _validate_iat(self, payload, now, leeway):
|
||||||
|
try:
|
||||||
|
iat = int(payload['iat'])
|
||||||
|
except ValueError:
|
||||||
|
raise DecodeError('Issued At claim (iat) must be an integer.')
|
||||||
|
|
||||||
|
if iat > (now + leeway):
|
||||||
|
raise InvalidIssuedAtError('Issued At claim (iat) cannot be in'
|
||||||
|
' the future.')
|
||||||
|
|
||||||
|
def _validate_nbf(self, payload, now, leeway):
|
||||||
|
try:
|
||||||
|
nbf = int(payload['nbf'])
|
||||||
|
except ValueError:
|
||||||
|
raise DecodeError('Not Before claim (nbf) must be an integer.')
|
||||||
|
|
||||||
|
if nbf > (now + leeway):
|
||||||
|
raise ImmatureSignatureError('The token is not yet valid (nbf)')
|
||||||
|
|
||||||
|
def _validate_exp(self, payload, now, leeway):
|
||||||
|
try:
|
||||||
|
exp = int(payload['exp'])
|
||||||
|
except ValueError:
|
||||||
|
raise DecodeError('Expiration Time claim (exp) must be an'
|
||||||
|
' integer.')
|
||||||
|
|
||||||
|
if exp < (now - leeway):
|
||||||
|
raise ExpiredSignatureError('Signature has expired')
|
||||||
|
|
||||||
|
def _validate_aud(self, payload, audience):
|
||||||
|
if audience is None and 'aud' not in payload:
|
||||||
|
return
|
||||||
|
|
||||||
|
if audience is not None and 'aud' not in payload:
|
||||||
|
# Application specified an audience, but it could not be
|
||||||
|
# verified since the token does not contain a claim.
|
||||||
|
raise MissingRequiredClaimError('aud')
|
||||||
|
|
||||||
|
audience_claims = payload['aud']
|
||||||
|
|
||||||
|
if isinstance(audience_claims, string_types):
|
||||||
|
audience_claims = [audience_claims]
|
||||||
|
if not isinstance(audience_claims, list):
|
||||||
|
raise InvalidAudienceError('Invalid claim format in token')
|
||||||
|
if any(not isinstance(c, string_types) for c in audience_claims):
|
||||||
|
raise InvalidAudienceError('Invalid claim format in token')
|
||||||
|
if audience not in audience_claims:
|
||||||
|
raise InvalidAudienceError('Invalid audience')
|
||||||
|
|
||||||
|
def _validate_iss(self, payload, issuer):
|
||||||
|
if issuer is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if 'iss' not in payload:
|
||||||
|
raise MissingRequiredClaimError('iss')
|
||||||
|
|
||||||
|
if payload['iss'] != issuer:
|
||||||
|
raise InvalidIssuerError('Invalid issuer')
|
||||||
|
|
||||||
|
|
||||||
|
_jwt_global_obj = PyJWT()
|
||||||
|
encode = _jwt_global_obj.encode
|
||||||
|
decode = _jwt_global_obj.decode
|
||||||
|
register_algorithm = _jwt_global_obj.register_algorithm
|
||||||
|
unregister_algorithm = _jwt_global_obj.unregister_algorithm
|
||||||
|
get_unverified_header = _jwt_global_obj.get_unverified_header
|
52
lib/jwt/compat.py
Normal file
52
lib/jwt/compat.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
"""
|
||||||
|
The `compat` module provides support for backwards compatibility with older
|
||||||
|
versions of python, and compatibility wrappers around optional packages.
|
||||||
|
"""
|
||||||
|
# flake8: noqa
|
||||||
|
import sys
|
||||||
|
import hmac
|
||||||
|
|
||||||
|
|
||||||
|
PY3 = sys.version_info[0] == 3
|
||||||
|
|
||||||
|
|
||||||
|
if PY3:
|
||||||
|
string_types = str,
|
||||||
|
text_type = str
|
||||||
|
else:
|
||||||
|
string_types = basestring,
|
||||||
|
text_type = unicode
|
||||||
|
|
||||||
|
|
||||||
|
def timedelta_total_seconds(delta):
|
||||||
|
try:
|
||||||
|
delta.total_seconds
|
||||||
|
except AttributeError:
|
||||||
|
# On Python 2.6, timedelta instances do not have
|
||||||
|
# a .total_seconds() method.
|
||||||
|
total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
|
||||||
|
else:
|
||||||
|
total_seconds = delta.total_seconds()
|
||||||
|
|
||||||
|
return total_seconds
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
constant_time_compare = hmac.compare_digest
|
||||||
|
except AttributeError:
|
||||||
|
# Fallback for Python < 2.7
|
||||||
|
def constant_time_compare(val1, val2):
|
||||||
|
"""
|
||||||
|
Returns True if the two strings are equal, False otherwise.
|
||||||
|
|
||||||
|
The time taken is independent of the number of characters that match.
|
||||||
|
"""
|
||||||
|
if len(val1) != len(val2):
|
||||||
|
return False
|
||||||
|
|
||||||
|
result = 0
|
||||||
|
|
||||||
|
for x, y in zip(val1, val2):
|
||||||
|
result |= ord(x) ^ ord(y)
|
||||||
|
|
||||||
|
return result == 0
|
0
lib/jwt/contrib/__init__.py
Normal file
0
lib/jwt/contrib/__init__.py
Normal file
0
lib/jwt/contrib/algorithms/__init__.py
Normal file
0
lib/jwt/contrib/algorithms/__init__.py
Normal file
60
lib/jwt/contrib/algorithms/py_ecdsa.py
Normal file
60
lib/jwt/contrib/algorithms/py_ecdsa.py
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
# Note: This file is named py_ecdsa.py because import behavior in Python 2
|
||||||
|
# would cause ecdsa.py to squash the ecdsa library that it depends upon.
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
import ecdsa
|
||||||
|
|
||||||
|
from jwt.algorithms import Algorithm
|
||||||
|
from jwt.compat import string_types, text_type
|
||||||
|
|
||||||
|
|
||||||
|
class ECAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Performs signing and verification operations using
|
||||||
|
ECDSA and the specified hash function
|
||||||
|
|
||||||
|
This class requires the ecdsa package to be installed.
|
||||||
|
|
||||||
|
This is based off of the implementation in PyJWT 0.3.2
|
||||||
|
"""
|
||||||
|
SHA256 = hashlib.sha256
|
||||||
|
SHA384 = hashlib.sha384
|
||||||
|
SHA512 = hashlib.sha512
|
||||||
|
|
||||||
|
def __init__(self, hash_alg):
|
||||||
|
self.hash_alg = hash_alg
|
||||||
|
|
||||||
|
def prepare_key(self, key):
|
||||||
|
|
||||||
|
if isinstance(key, ecdsa.SigningKey) or \
|
||||||
|
isinstance(key, ecdsa.VerifyingKey):
|
||||||
|
return key
|
||||||
|
|
||||||
|
if isinstance(key, string_types):
|
||||||
|
if isinstance(key, text_type):
|
||||||
|
key = key.encode('utf-8')
|
||||||
|
|
||||||
|
# Attempt to load key. We don't know if it's
|
||||||
|
# a Signing Key or a Verifying Key, so we try
|
||||||
|
# the Verifying Key first.
|
||||||
|
try:
|
||||||
|
key = ecdsa.VerifyingKey.from_pem(key)
|
||||||
|
except ecdsa.der.UnexpectedDER:
|
||||||
|
key = ecdsa.SigningKey.from_pem(key)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise TypeError('Expecting a PEM-formatted key.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
return key.sign(msg, hashfunc=self.hash_alg,
|
||||||
|
sigencode=ecdsa.util.sigencode_string)
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
try:
|
||||||
|
return key.verify(sig, msg, hashfunc=self.hash_alg,
|
||||||
|
sigdecode=ecdsa.util.sigdecode_string)
|
||||||
|
except AssertionError:
|
||||||
|
return False
|
47
lib/jwt/contrib/algorithms/pycrypto.py
Normal file
47
lib/jwt/contrib/algorithms/pycrypto.py
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
import Crypto.Hash.SHA256
|
||||||
|
import Crypto.Hash.SHA384
|
||||||
|
import Crypto.Hash.SHA512
|
||||||
|
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.Signature import PKCS1_v1_5
|
||||||
|
|
||||||
|
from jwt.algorithms import Algorithm
|
||||||
|
from jwt.compat import string_types, text_type
|
||||||
|
|
||||||
|
|
||||||
|
class RSAAlgorithm(Algorithm):
|
||||||
|
"""
|
||||||
|
Performs signing and verification operations using
|
||||||
|
RSASSA-PKCS-v1_5 and the specified hash function.
|
||||||
|
|
||||||
|
This class requires PyCrypto package to be installed.
|
||||||
|
|
||||||
|
This is based off of the implementation in PyJWT 0.3.2
|
||||||
|
"""
|
||||||
|
SHA256 = Crypto.Hash.SHA256
|
||||||
|
SHA384 = Crypto.Hash.SHA384
|
||||||
|
SHA512 = Crypto.Hash.SHA512
|
||||||
|
|
||||||
|
def __init__(self, hash_alg):
|
||||||
|
self.hash_alg = hash_alg
|
||||||
|
|
||||||
|
def prepare_key(self, key):
|
||||||
|
|
||||||
|
if isinstance(key, RSA._RSAobj):
|
||||||
|
return key
|
||||||
|
|
||||||
|
if isinstance(key, string_types):
|
||||||
|
if isinstance(key, text_type):
|
||||||
|
key = key.encode('utf-8')
|
||||||
|
|
||||||
|
key = RSA.importKey(key)
|
||||||
|
else:
|
||||||
|
raise TypeError('Expecting a PEM- or RSA-formatted key.')
|
||||||
|
|
||||||
|
return key
|
||||||
|
|
||||||
|
def sign(self, msg, key):
|
||||||
|
return PKCS1_v1_5.new(key).sign(self.hash_alg.new(msg))
|
||||||
|
|
||||||
|
def verify(self, msg, key, sig):
|
||||||
|
return PKCS1_v1_5.new(key).verify(self.hash_alg.new(msg), sig)
|
48
lib/jwt/exceptions.py
Normal file
48
lib/jwt/exceptions.py
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
class InvalidTokenError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DecodeError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ExpiredSignatureError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidAudienceError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidIssuerError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidIssuedAtError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ImmatureSignatureError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidKeyError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidAlgorithmError(InvalidTokenError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MissingRequiredClaimError(InvalidTokenError):
|
||||||
|
def __init__(self, claim):
|
||||||
|
self.claim = claim
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return 'Token is missing the "%s" claim' % self.claim
|
||||||
|
|
||||||
|
|
||||||
|
# Compatibility aliases (deprecated)
|
||||||
|
ExpiredSignature = ExpiredSignatureError
|
||||||
|
InvalidAudience = InvalidAudienceError
|
||||||
|
InvalidIssuer = InvalidIssuerError
|
67
lib/jwt/utils.py
Normal file
67
lib/jwt/utils.py
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
import base64
|
||||||
|
import binascii
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import (
|
||||||
|
decode_rfc6979_signature, encode_rfc6979_signature
|
||||||
|
)
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def base64url_decode(input):
|
||||||
|
rem = len(input) % 4
|
||||||
|
|
||||||
|
if rem > 0:
|
||||||
|
input += b'=' * (4 - rem)
|
||||||
|
|
||||||
|
return base64.urlsafe_b64decode(input)
|
||||||
|
|
||||||
|
|
||||||
|
def base64url_encode(input):
|
||||||
|
return base64.urlsafe_b64encode(input).replace(b'=', b'')
|
||||||
|
|
||||||
|
|
||||||
|
def merge_dict(original, updates):
|
||||||
|
if not updates:
|
||||||
|
return original
|
||||||
|
|
||||||
|
try:
|
||||||
|
merged_options = original.copy()
|
||||||
|
merged_options.update(updates)
|
||||||
|
except (AttributeError, ValueError) as e:
|
||||||
|
raise TypeError('original and updates must be a dictionary: %s' % e)
|
||||||
|
|
||||||
|
return merged_options
|
||||||
|
|
||||||
|
|
||||||
|
def number_to_bytes(num, num_bytes):
|
||||||
|
padded_hex = '%0*x' % (2 * num_bytes, num)
|
||||||
|
big_endian = binascii.a2b_hex(padded_hex.encode('ascii'))
|
||||||
|
return big_endian
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_to_number(string):
|
||||||
|
return int(binascii.b2a_hex(string), 16)
|
||||||
|
|
||||||
|
|
||||||
|
def der_to_raw_signature(der_sig, curve):
|
||||||
|
num_bits = curve.key_size
|
||||||
|
num_bytes = (num_bits + 7) // 8
|
||||||
|
|
||||||
|
r, s = decode_rfc6979_signature(der_sig)
|
||||||
|
|
||||||
|
return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def raw_to_der_signature(raw_sig, curve):
|
||||||
|
num_bits = curve.key_size
|
||||||
|
num_bytes = (num_bits + 7) // 8
|
||||||
|
|
||||||
|
if len(raw_sig) != 2 * num_bytes:
|
||||||
|
raise ValueError('Invalid signature')
|
||||||
|
|
||||||
|
r = bytes_to_number(raw_sig[:num_bytes])
|
||||||
|
s = bytes_to_number(raw_sig[num_bytes:])
|
||||||
|
|
||||||
|
return encode_rfc6979_signature(r, s)
|
Loading…
Add table
Add a link
Reference in a new issue