Update requests-oauthlib-1.3.0

This commit is contained in:
JonnyWong16 2021-10-14 23:47:27 -07:00
commit f165d2d080
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
15 changed files with 552 additions and 257 deletions

View file

@ -1,7 +1,10 @@
from __future__ import absolute_import
from .facebook import facebook_compliance_fix
from .fitbit import fitbit_compliance_fix
from .linkedin import linkedin_compliance_fix
from .slack import slack_compliance_fix
from .instagram import instagram_compliance_fix
from .mailchimp import mailchimp_compliance_fix
from .weibo import weibo_compliance_fix
from .plentymarkets import plentymarkets_compliance_fix

View file

@ -4,15 +4,14 @@ from oauthlib.common import to_unicode
def douban_compliance_fix(session):
def fix_token_type(r):
token = json.loads(r.text)
token.setdefault('token_type', 'Bearer')
token.setdefault("token_type", "Bearer")
fixed_token = json.dumps(token)
r._content = to_unicode(fixed_token).encode('utf-8')
r._content = to_unicode(fixed_token).encode("utf-8")
return r
session._client_default_token_placement = 'query'
session.register_compliance_hook('access_token_response', fix_token_type)
session._client_default_token_placement = "query"
session.register_compliance_hook("access_token_response", fix_token_type)
return session

View file

@ -1,4 +1,5 @@
from json import dumps
try:
from urlparse import parse_qsl
except ImportError:
@ -8,26 +9,25 @@ from oauthlib.common import to_unicode
def facebook_compliance_fix(session):
def _compliance_fix(r):
# if Facebook claims to be sending us json, let's trust them.
if 'application/json' in r.headers.get('content-type', {}):
if "application/json" in r.headers.get("content-type", {}):
return r
# Facebook returns a content-type of text/plain when sending their
# x-www-form-urlencoded responses, along with a 200. If not, let's
# assume we're getting JSON and bail on the fix.
if 'text/plain' in r.headers.get('content-type', {}) and r.status_code == 200:
if "text/plain" in r.headers.get("content-type", {}) and r.status_code == 200:
token = dict(parse_qsl(r.text, keep_blank_values=True))
else:
return r
expires = token.get('expires')
expires = token.get("expires")
if expires is not None:
token['expires_in'] = expires
token['token_type'] = 'Bearer'
r._content = to_unicode(dumps(token)).encode('UTF-8')
token["expires_in"] = expires
token["token_type"] = "Bearer"
r._content = to_unicode(dumps(token)).encode("UTF-8")
return r
session.register_compliance_hook('access_token_response', _compliance_fix)
session.register_compliance_hook("access_token_response", _compliance_fix)
return session

View file

@ -0,0 +1,25 @@
"""
The Fitbit API breaks from the OAuth2 RFC standard by returning an "errors"
object list, rather than a single "error" string. This puts hooks in place so
that oauthlib can process an error in the results from access token and refresh
token responses. This is necessary to prevent getting the generic red herring
MissingTokenError.
"""
from json import loads, dumps
from oauthlib.common import to_unicode
def fitbit_compliance_fix(session):
def _missing_error(r):
token = loads(r.text)
if "errors" in token:
# Set the error to the first one we have
token["error"] = token["errors"][0]["errorType"]
r._content = to_unicode(dumps(token)).encode("UTF-8")
return r
session.register_compliance_hook("access_token_response", _missing_error)
session.register_compliance_hook("refresh_token_response", _missing_error)
return session

View file

@ -0,0 +1,26 @@
try:
from urlparse import urlparse, parse_qs
except ImportError:
from urllib.parse import urlparse, parse_qs
from oauthlib.common import add_params_to_uri
def instagram_compliance_fix(session):
def _non_compliant_param_name(url, headers, data):
# If the user has already specified the token in the URL
# then there's nothing to do.
# If the specified token is different from ``session.access_token``,
# we assume the user intends to override the access token.
url_query = dict(parse_qs(urlparse(url).query))
token = url_query.get("access_token")
if token:
# Nothing to do, just return.
return url, headers, data
token = [("access_token", session.access_token)]
url = add_params_to_uri(url, token)
return url, headers, data
session.register_compliance_hook("protected_request", _non_compliant_param_name)
return session

View file

@ -4,21 +4,18 @@ from oauthlib.common import add_params_to_uri, to_unicode
def linkedin_compliance_fix(session):
def _missing_token_type(r):
token = loads(r.text)
token['token_type'] = 'Bearer'
r._content = to_unicode(dumps(token)).encode('UTF-8')
token["token_type"] = "Bearer"
r._content = to_unicode(dumps(token)).encode("UTF-8")
return r
def _non_compliant_param_name(url, headers, data):
token = [('oauth2_access_token', session.access_token)]
token = [("oauth2_access_token", session.access_token)]
url = add_params_to_uri(url, token)
return url, headers, data
session._client.default_token_placement = 'query'
session.register_compliance_hook('access_token_response',
_missing_token_type)
session.register_compliance_hook('protected_request',
_non_compliant_param_name)
session._client.default_token_placement = "query"
session.register_compliance_hook("access_token_response", _missing_token_type)
session.register_compliance_hook("protected_request", _non_compliant_param_name)
return session

View file

@ -2,21 +2,22 @@ import json
from oauthlib.common import to_unicode
def mailchimp_compliance_fix(session):
def _null_scope(r):
token = json.loads(r.text)
if 'scope' in token and token['scope'] is None:
token.pop('scope')
r._content = to_unicode(json.dumps(token)).encode('utf-8')
if "scope" in token and token["scope"] is None:
token.pop("scope")
r._content = to_unicode(json.dumps(token)).encode("utf-8")
return r
def _non_zero_expiration(r):
token = json.loads(r.text)
if 'expires_in' in token and token['expires_in'] == 0:
token['expires_in'] = 3600
r._content = to_unicode(json.dumps(token)).encode('utf-8')
if "expires_in" in token and token["expires_in"] == 0:
token["expires_in"] = 3600
r._content = to_unicode(json.dumps(token)).encode("utf-8")
return r
session.register_compliance_hook('access_token_response', _null_scope)
session.register_compliance_hook('access_token_response', _non_zero_expiration)
session.register_compliance_hook("access_token_response", _null_scope)
session.register_compliance_hook("access_token_response", _non_zero_expiration)
return session

View file

@ -0,0 +1,29 @@
from json import dumps, loads
import re
from oauthlib.common import to_unicode
def plentymarkets_compliance_fix(session):
def _to_snake_case(n):
return re.sub("(.)([A-Z][a-z]+)", r"\1_\2", n).lower()
def _compliance_fix(r):
# Plenty returns the Token in CamelCase instead of _
if (
"application/json" in r.headers.get("content-type", {})
and r.status_code == 200
):
token = loads(r.text)
else:
return r
fixed_token = {}
for k, v in token.items():
fixed_token[_to_snake_case(k)] = v
r._content = to_unicode(dumps(fixed_token)).encode("UTF-8")
return r
session.register_compliance_hook("access_token_response", _compliance_fix)
return session

View file

@ -29,9 +29,9 @@ def slack_compliance_fix(session):
# ``data`` is something other than a dict: maybe a stream,
# maybe a file object, maybe something else. We can't easily
# modify it, so we'll set the token by modifying the URL instead.
token = [('token', session.access_token)]
token = [("token", session.access_token)]
url = add_params_to_uri(url, token)
return url, headers, data
session.register_compliance_hook('protected_request', _non_compliant_param_name)
session.register_compliance_hook("protected_request", _non_compliant_param_name)
return session

View file

@ -4,14 +4,12 @@ from oauthlib.common import to_unicode
def weibo_compliance_fix(session):
def _missing_token_type(r):
token = loads(r.text)
token['token_type'] = 'Bearer'
r._content = to_unicode(dumps(token)).encode('UTF-8')
token["token_type"] = "Bearer"
r._content = to_unicode(dumps(token)).encode("UTF-8")
return r
session._client.default_token_placement = 'query'
session.register_compliance_hook('access_token_response',
_missing_token_type)
session._client.default_token_placement = "query"
session.register_compliance_hook("access_token_response", _missing_token_type)
return session