Update cloudinary-1.26.0

This commit is contained in:
JonnyWong16 2021-10-14 21:18:46 -07:00
parent ebffd124f6
commit 4b28040d59
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
17 changed files with 1169 additions and 307 deletions

View file

@ -1,13 +1,15 @@
from __future__ import absolute_import
import abc
from copy import deepcopy
import hashlib
import os
import re
import logging
import numbers
import certifi
from math import ceil
from six import python_2_unicode_compatible
from six import python_2_unicode_compatible, add_metaclass
logger = logging.getLogger("Cloudinary")
ch = logging.StreamHandler()
@ -34,8 +36,9 @@ AKAMAI_SHARED_CDN = "res.cloudinary.com"
SHARED_CDN = AKAMAI_SHARED_CDN
CL_BLANK = ""
URI_SCHEME = "cloudinary"
API_VERSION = "v1_1"
VERSION = "1.20.0"
VERSION = "1.26.0"
USER_AGENT = "CloudinaryPython/{} (Python {})".format(VERSION, python_version())
""" :const: USER_AGENT """
@ -94,54 +97,20 @@ def import_django_settings():
return None
class Config(object):
@add_metaclass(abc.ABCMeta)
class BaseConfig(object):
def __init__(self):
django_settings = import_django_settings()
if django_settings:
self.update(**django_settings)
elif os.environ.get("CLOUDINARY_CLOUD_NAME"):
self.update(
cloud_name=os.environ.get("CLOUDINARY_CLOUD_NAME"),
api_key=os.environ.get("CLOUDINARY_API_KEY"),
api_secret=os.environ.get("CLOUDINARY_API_SECRET"),
secure_distribution=os.environ.get("CLOUDINARY_SECURE_DISTRIBUTION"),
private_cdn=os.environ.get("CLOUDINARY_PRIVATE_CDN") == 'true',
api_proxy=os.environ.get("CLOUDINARY_API_PROXY"),
)
elif os.environ.get("CLOUDINARY_URL"):
cloudinary_url = os.environ.get("CLOUDINARY_URL")
self._parse_cloudinary_url(cloudinary_url)
def _parse_cloudinary_url(self, cloudinary_url):
uri = urlparse(cloudinary_url)
if not self._is_url_scheme_valid(uri):
raise ValueError("Invalid CLOUDINARY_URL scheme. Expecting to start with 'cloudinary://'")
for k, v in parse_qs(uri.query).items():
if self._is_nested_key(k):
self._put_nested_key(k, v)
else:
self.__dict__[k] = v[0]
self.update(
cloud_name=uri.hostname,
api_key=uri.username,
api_secret=uri.password,
private_cdn=uri.path != ''
)
if uri.path != '':
self.update(secure_distribution=uri.path[1:])
self._load_config_from_env()
def __getattr__(self, i):
if i in self.__dict__:
return self.__dict__[i]
else:
return None
return self.__dict__.get(i)
def update(self, **keywords):
for k, v in keywords.items():
self.__dict__[k] = v
def _is_nested_key(self, key):
@staticmethod
def _is_nested_key(key):
return re.match(r'\w+\[\w+\]', key)
def _put_nested_key(self, key, value):
@ -160,8 +129,7 @@ class Config(object):
value = value[0]
outer[last_key] = value
@staticmethod
def _is_url_scheme_valid(url):
def _is_url_scheme_valid(self, url):
"""
Helper function. Validates url scheme
@ -169,9 +137,81 @@ class Config(object):
:return: bool True on success or False on failure
"""
if not url.scheme or url.scheme.lower() != URI_SCHEME:
return False
return True
return url.scheme.lower() == self._uri_scheme
@staticmethod
def _parse_cloudinary_url(cloudinary_url):
return urlparse(cloudinary_url)
@abc.abstractmethod
def _config_from_parsed_url(self, parsed_url):
"""Extract additional config from the parsed URL."""
raise NotImplementedError()
def _setup_from_parsed_url(self, parsed_url):
config_from_parsed_url = self._config_from_parsed_url(parsed_url)
self.update(**config_from_parsed_url)
for k, v in parse_qs(parsed_url.query).items():
if self._is_nested_key(k):
self._put_nested_key(k, v)
else:
self.__dict__[k] = v[0]
def _load_from_url(self, url):
parsed_url = self._parse_cloudinary_url(url)
return self._setup_from_parsed_url(parsed_url)
@abc.abstractmethod
def _load_config_from_env(self):
"""Load config from environment variables or URL."""
raise NotImplementedError()
def update(self, **keywords):
for k, v in keywords.items():
self.__dict__[k] = v
class Config(BaseConfig):
def __init__(self):
self._uri_scheme = URI_SCHEME
super(Config, self).__init__()
if not self.signature_algorithm:
self.signature_algorithm = utils.SIGNATURE_SHA1
def _config_from_parsed_url(self, parsed_url):
if not self._is_url_scheme_valid(parsed_url):
raise ValueError("Invalid CLOUDINARY_URL scheme. Expecting to start with 'cloudinary://'")
is_private_cdn = parsed_url.path != ""
result = {
"cloud_name": parsed_url.hostname,
"api_key": parsed_url.username,
"api_secret": parsed_url.password,
"private_cdn": is_private_cdn,
}
if is_private_cdn:
result.update({"secure_distribution": parsed_url.path[1:]})
return result
def _load_config_from_env(self):
if os.environ.get("CLOUDINARY_CLOUD_NAME"):
config_keys = [key for key in os.environ.keys()
if key.startswith("CLOUDINARY_") and key != "CLOUDINARY_URL"]
for full_key in config_keys:
conf_key = full_key[len("CLOUDINARY_"):].lower()
conf_val = os.environ[full_key]
if conf_val in ["true", "false"]:
conf_val = conf_val == "true"
self.update(**{conf_key: conf_val})
elif os.environ.get("CLOUDINARY_URL"):
self._load_from_url(os.environ.get("CLOUDINARY_URL"))
_config = Config()
@ -257,7 +297,8 @@ class CloudinaryResource(object):
return self.get_prep_value() + '#' + self.get_expected_signature()
def get_expected_signature(self):
return utils.api_sign_request({"public_id": self.public_id, "version": self.version}, config().api_secret)
return utils.api_sign_request({"public_id": self.public_id, "version": self.version}, config().api_secret,
config().signature_algorithm)
@property
def url(self):
@ -377,7 +418,7 @@ class CloudinaryResource(object):
max_images = srcset_data.get("max_images", 20)
transformation = srcset_data.get("transformation")
kbytes_step = int(ceil(float(bytes_step)/1024))
kbytes_step = int(ceil(float(bytes_step) / 1024))
breakpoints_width_param = "auto:breakpoints_{min_width}_{max_width}_{kbytes_step}_{max_images}:json".format(
min_width=min_width, max_width=max_width, kbytes_step=kbytes_step, max_images=max_images)

View file

@ -1,5 +1,6 @@
# Copyright Cloudinary
import datetime
import email.utils
import json
import socket
@ -10,6 +11,11 @@ from urllib3.exceptions import HTTPError
import cloudinary
from cloudinary import utils
from cloudinary.api_client.call_api import (
call_api,
call_metadata_api,
call_json_api
)
from cloudinary.exceptions import (
BadRequest,
AuthorizationRequired,
@ -20,37 +26,32 @@ from cloudinary.exceptions import (
GeneralError
)
logger = cloudinary.logger
EXCEPTION_CODES = {
400: BadRequest,
401: AuthorizationRequired,
403: NotAllowed,
404: NotFound,
409: AlreadyExists,
420: RateLimited,
500: GeneralError
}
class Response(dict):
def __init__(self, result, response, **kwargs):
super(Response, self).__init__(**kwargs)
self.update(result)
self.rate_limit_allowed = int(response.headers["x-featureratelimit-limit"])
self.rate_limit_reset_at = email.utils.parsedate(response.headers["x-featureratelimit-reset"])
self.rate_limit_remaining = int(response.headers["x-featureratelimit-remaining"])
_http = utils.get_http_connector(cloudinary.config(), cloudinary.CERT_KWARGS)
def ping(**options):
return call_api("get", ["ping"], {}, **options)
def usage(**options):
return call_api("get", ["usage"], {}, **options)
"""Get account usage details.
Get a report on the status of your Cloudinary account usage details, including storage, credits, bandwidth,
requests, number of resources, and add-on usage. Note that numbers are updated periodically.
See: `Get account usage details
<https://cloudinary.com/documentation/admin_api#get_account_usage_details>`_
:param options: Additional options
:type options: dict, optional
:return: Detailed usage information
:rtype: Response
"""
date = options.pop("date", None)
uri = ["usage"]
if date:
if isinstance(date, datetime.date):
date = utils.encode_date_to_usage_api_format(date)
uri.append(date)
return call_api("get", uri, {}, **options)
def resource_types(**options):
@ -64,7 +65,7 @@ def resources(**options):
if upload_type:
uri.append(upload_type)
params = only(options, "next_cursor", "max_results", "prefix", "tags",
"context", "moderations", "direction", "start_at")
"context", "moderations", "direction", "start_at", "metadata")
return call_api("get", uri, params, **options)
@ -72,7 +73,7 @@ def resources_by_tag(tag, **options):
resource_type = options.pop("resource_type", "image")
uri = ["resources", resource_type, "tags", tag]
params = only(options, "next_cursor", "max_results", "tags",
"context", "moderations", "direction")
"context", "moderations", "direction", "metadata")
return call_api("get", uri, params, **options)
@ -80,7 +81,7 @@ def resources_by_moderation(kind, status, **options):
resource_type = options.pop("resource_type", "image")
uri = ["resources", resource_type, "moderations", kind, status]
params = only(options, "next_cursor", "max_results", "tags",
"context", "moderations", "direction")
"context", "moderations", "direction", "metadata")
return call_api("get", uri, params, **options)
@ -92,12 +93,39 @@ def resources_by_ids(public_ids, **options):
return call_api("get", uri, params, **options)
def resources_by_context(key, value=None, **options):
"""Retrieves resources (assets) with a specified context key.
This method does not return deleted assets even if they have been backed up.
See: `Get resources by context API reference
<https://cloudinary.com/documentation/admin_api#get_resources_by_context>`_
:param key: Only assets with this context key are returned
:type key: str
:param value: Only assets with this value for the context key are returned
:type value: str, optional
:param options: Additional options
:type options: dict, optional
:return: Resources (assets) with a specified context key
:rtype: Response
"""
resource_type = options.pop("resource_type", "image")
uri = ["resources", resource_type, "context"]
params = only(options, "next_cursor", "max_results", "tags",
"context", "moderations", "direction", "metadata")
params["key"] = key
if value is not None:
params["value"] = value
return call_api("get", uri, params, **options)
def resource(public_id, **options):
resource_type = options.pop("resource_type", "image")
upload_type = options.pop("type", "upload")
uri = ["resources", resource_type, upload_type, public_id]
params = only(options, "exif", "faces", "colors", "image_metadata", "cinemagraph_analysis",
"pages", "phash", "coordinates", "max_results", "quality_analysis", "derived_next_cursor")
"pages", "phash", "coordinates", "max_results", "quality_analysis", "derived_next_cursor",
"accessibility_analysis", "versions")
return call_api("get", uri, params, **options)
@ -327,8 +355,8 @@ def restore(public_ids, **options):
resource_type = options.pop("resource_type", "image")
upload_type = options.pop("type", "upload")
uri = ["resources", resource_type, upload_type, "restore"]
params = dict(public_ids=public_ids)
return call_api("post", uri, params, **options)
params = dict(public_ids=public_ids, **only(options, "versions"))
return call_json_api("post", uri, params, **options)
def upload_mappings(**options):
@ -390,90 +418,6 @@ def update_streaming_profile(name, **options):
return call_api('PUT', uri, params, **options)
def call_json_api(method, uri, jsonBody, **options):
logger.debug(jsonBody)
data = json.dumps(jsonBody).encode('utf-8')
return _call_api(method, uri, body=data,
headers={'Content-Type': 'application/json'}, **options)
def call_api(method, uri, params, **options):
return _call_api(method, uri, params=params, **options)
def call_metadata_api(method, uri, params, **options):
"""Private function that assists with performing an API call to the
metadata_fields part of the Admin API
:param method: The HTTP method. Valid methods: get, post, put, delete
:param uri: REST endpoint of the API (without 'metadata_fields')
:param params: Query/body parameters passed to the method
:param options: Additional options
:rtype: Response
"""
uri = ["metadata_fields"] + (uri or [])
return call_json_api(method, uri, params, **options)
def _call_api(method, uri, params=None, body=None, headers=None, **options):
prefix = options.pop("upload_prefix",
cloudinary.config().upload_prefix) or "https://api.cloudinary.com"
cloud_name = options.pop("cloud_name", cloudinary.config().cloud_name)
if not cloud_name:
raise Exception("Must supply cloud_name")
api_key = options.pop("api_key", cloudinary.config().api_key)
if not api_key:
raise Exception("Must supply api_key")
api_secret = options.pop("api_secret", cloudinary.config().api_secret)
if not cloud_name:
raise Exception("Must supply api_secret")
api_url = "/".join([prefix, "v1_1", cloud_name] + uri)
processed_params = None
if isinstance(params, dict):
processed_params = {}
for key, value in params.items():
if isinstance(value, list) or isinstance(value, tuple):
value_list = {"{}[{}]".format(key, i): i_value for i, i_value in enumerate(value)}
processed_params.update(value_list)
elif value:
processed_params[key] = value
# Add authentication
req_headers = urllib3.make_headers(
basic_auth="{0}:{1}".format(api_key, api_secret),
user_agent=cloudinary.get_user_agent()
)
if headers is not None:
req_headers.update(headers)
kw = {}
if 'timeout' in options:
kw['timeout'] = options['timeout']
if body is not None:
kw['body'] = body
try:
response = _http.request(method.upper(), api_url, processed_params, req_headers, **kw)
body = response.data
except HTTPError as e:
raise GeneralError("Unexpected error {0}", e.message)
except socket.error as e:
raise GeneralError("Socket Error: %s" % (str(e)))
try:
result = json.loads(body.decode('utf-8'))
except Exception as e:
# Error is parsing json
raise GeneralError("Error parsing server response (%d) - %s. Got - %s" % (response.status, body, e))
if "error" in result:
exception_class = EXCEPTION_CODES.get(response.status) or Exception
exception_class = exception_class
raise exception_class("Error {0} - {1}".format(response.status, result["error"]["message"]))
return Response(result, response)
def only(source, *keys):
return {key: source[key] for key in keys if key in source}

View file

View file

@ -0,0 +1,35 @@
import cloudinary
from cloudinary.api_client.execute_request import execute_request
from cloudinary.provisioning.account_config import account_config
from cloudinary.utils import get_http_connector
PROVISIONING_SUB_PATH = "provisioning"
ACCOUNT_SUB_PATH = "accounts"
_http = get_http_connector(account_config(), cloudinary.CERT_KWARGS)
def _call_account_api(method, uri, params=None, headers=None, **options):
prefix = options.pop("upload_prefix",
cloudinary.config().upload_prefix) or "https://api.cloudinary.com"
account_id = options.pop("account_id", account_config().account_id)
if not account_id:
raise Exception("Must supply account_id")
provisioning_api_key = options.pop("provisioning_api_key", account_config().provisioning_api_key)
if not provisioning_api_key:
raise Exception("Must supply provisioning_api_key")
provisioning_api_secret = options.pop("provisioning_api_secret",
account_config().provisioning_api_secret)
if not provisioning_api_secret:
raise Exception("Must supply provisioning_api_secret")
provisioning_api_url = "/".join(
[prefix, cloudinary.API_VERSION, PROVISIONING_SUB_PATH, ACCOUNT_SUB_PATH, account_id] + uri)
auth = {"key": provisioning_api_key, "secret": provisioning_api_secret}
return execute_request(http_connector=_http,
method=method,
params=params,
headers=headers,
auth=auth,
api_url=provisioning_api_url,
**options)

View file

@ -0,0 +1,70 @@
import json
import cloudinary
from cloudinary.api_client.execute_request import execute_request
from cloudinary.utils import get_http_connector
logger = cloudinary.logger
_http = get_http_connector(cloudinary.config(), cloudinary.CERT_KWARGS)
def call_metadata_api(method, uri, params, **options):
"""Private function that assists with performing an API call to the
metadata_fields part of the Admin API
:param method: The HTTP method. Valid methods: get, post, put, delete
:param uri: REST endpoint of the API (without 'metadata_fields')
:param params: Query/body parameters passed to the method
:param options: Additional options
:rtype: Response
"""
uri = ["metadata_fields"] + (uri or [])
return call_json_api(method, uri, params, **options)
def call_json_api(method, uri, json_body, **options):
data = json.dumps(json_body).encode('utf-8')
return _call_api(method, uri, body=data, headers={'Content-Type': 'application/json'}, **options)
def call_api(method, uri, params, **options):
return _call_api(method, uri, params=params, **options)
def _call_api(method, uri, params=None, body=None, headers=None, **options):
prefix = options.pop("upload_prefix",
cloudinary.config().upload_prefix) or "https://api.cloudinary.com"
cloud_name = options.pop("cloud_name", cloudinary.config().cloud_name)
if not cloud_name:
raise Exception("Must supply cloud_name")
api_key = options.pop("api_key", cloudinary.config().api_key)
api_secret = options.pop("api_secret", cloudinary.config().api_secret)
oauth_token = options.pop("oauth_token", cloudinary.config().oauth_token)
_validate_authorization(api_key, api_secret, oauth_token)
api_url = "/".join([prefix, cloudinary.API_VERSION, cloud_name] + uri)
auth = {"key": api_key, "secret": api_secret, "oauth_token": oauth_token}
if body is not None:
options["body"] = body
return execute_request(http_connector=_http,
method=method,
params=params,
headers=headers,
auth=auth,
api_url=api_url,
**options)
def _validate_authorization(api_key, api_secret, oauth_token):
if oauth_token:
return
if not api_key:
raise Exception("Must supply api_key")
if not api_secret:
raise Exception("Must supply api_secret")

View file

@ -0,0 +1,86 @@
import email.utils
import json
import socket
import urllib3
from urllib3.exceptions import HTTPError
import cloudinary
from cloudinary.exceptions import (
BadRequest,
AuthorizationRequired,
NotAllowed,
NotFound,
AlreadyExists,
RateLimited,
GeneralError
)
from cloudinary.utils import process_params, safe_cast, smart_escape, unquote
EXCEPTION_CODES = {
400: BadRequest,
401: AuthorizationRequired,
403: NotAllowed,
404: NotFound,
409: AlreadyExists,
420: RateLimited,
500: GeneralError
}
class Response(dict):
def __init__(self, result, response, **kwargs):
super(Response, self).__init__(**kwargs)
self.update(result)
self.rate_limit_allowed = safe_cast(response.headers.get("x-featureratelimit-limit"), int)
self.rate_limit_reset_at = safe_cast(response.headers.get("x-featureratelimit-reset"), email.utils.parsedate)
self.rate_limit_remaining = safe_cast(response.headers.get("x-featureratelimit-remaining"), int)
def execute_request(http_connector, method, params, headers, auth, api_url, **options):
# authentication
key = auth.get("key")
secret = auth.get("secret")
oauth_token = auth.get("oauth_token")
req_headers = urllib3.make_headers(
user_agent=cloudinary.get_user_agent()
)
if oauth_token:
req_headers["authorization"] = "Bearer {}".format(oauth_token)
else:
req_headers.update(urllib3.make_headers(basic_auth="{0}:{1}".format(key, secret)))
if headers is not None:
req_headers.update(headers)
kw = {}
if "timeout" in options:
kw["timeout"] = options["timeout"]
if "body" in options:
kw["body"] = options["body"]
processed_params = process_params(params)
api_url = smart_escape(unquote(api_url))
try:
response = http_connector.request(method.upper(), api_url, processed_params, req_headers, **kw)
body = response.data
except HTTPError as e:
raise GeneralError("Unexpected error {0}", e.message)
except socket.error as e:
raise GeneralError("Socket Error: %s" % (str(e)))
try:
result = json.loads(body.decode('utf-8'))
except Exception as e:
# Error is parsing json
raise GeneralError("Error parsing server response (%d) - %s. Got - %s" % (response.status, body, e))
if "error" in result:
exception_class = EXCEPTION_CODES.get(response.status) or Exception
exception_class = exception_class
raise exception_class("Error {0} - {1}".format(response.status, result["error"]["message"]))
return Response(result, response)

View file

@ -20,6 +20,9 @@ def generate(url=None, acl=None, start_time=None, duration=None,
else:
raise Exception("Must provide either expiration or duration")
if url is None and acl is None:
raise Exception("Must provide either acl or url")
token_parts = []
if ip is not None:
token_parts.append("ip=" + ip)

View file

@ -5,7 +5,7 @@ import cloudinary.uploader
import cloudinary.utils
from cloudinary import CloudinaryResource
from django import forms
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
def cl_init_js_callbacks(form, request):
@ -52,7 +52,7 @@ class CloudinaryInput(forms.TextInput):
class CloudinaryJsFileField(forms.Field):
default_error_messages = {
'required': _(u"No file selected!")
"required": _("No file selected!")
}
def __init__(self, attrs=None, options=None, autosave=True, *args, **kwargs):
@ -121,7 +121,7 @@ class CloudinaryUnsignedJsFileField(CloudinaryJsFileField):
class CloudinaryFileField(forms.FileField):
my_default_error_messages = {
'required': _(u"No file selected!")
"required": _("No file selected!")
}
default_error_messages = forms.FileField.default_error_messages.copy()
default_error_messages.update(my_default_error_messages)

View file

@ -107,6 +107,8 @@ class CloudinaryField(models.Field):
if isinstance(value, UploadedFile):
options = {"type": self.type, "resource_type": self.resource_type}
options.update(self.options)
if hasattr(value, 'seekable') and value.seekable():
value.seek(0)
instance_value = uploader.upload_resource(value, **options)
setattr(model_instance, self.attname, instance_value)
if self.width_field:

View file

@ -0,0 +1,5 @@
from .account_config import AccountConfig, account_config, reset_config
from .account import (sub_accounts, create_sub_account, delete_sub_account, sub_account, update_sub_account,
user_groups, create_user_group, update_user_group, delete_user_group, user_group,
add_user_to_group, remove_user_from_group, user_group_users, user_in_user_groups,
users, create_user, delete_user, user, update_user, Role)

View file

@ -0,0 +1,366 @@
from cloudinary.api_client.call_account_api import _call_account_api
from cloudinary.utils import encode_list
SUB_ACCOUNTS_SUB_PATH = "sub_accounts"
USERS_SUB_PATH = "users"
USER_GROUPS_SUB_PATH = "user_groups"
class Role(object):
"""
A user role to use in the user management API (create/update user).
"""
MASTER_ADMIN = "master_admin"
ADMIN = "admin"
BILLING = "billing"
TECHNICAL_ADMIN = "technical_admin"
REPORTS = "reports"
MEDIA_LIBRARY_ADMIN = "media_library_admin"
MEDIA_LIBRARY_USER = "media_library_user"
def sub_accounts(enabled=None, ids=None, prefix=None, **options):
"""
List all sub accounts
:param enabled: Whether to only return enabled sub-accounts (true) or disabled accounts (false).
Default: all accounts are returned (both enabled and disabled).
:type enabled: bool, optional
:param ids: List of sub-account IDs. Up to 100. When provided, other filters are ignored.
:type ids: list, optional
:param prefix: Search by prefix of the sub-account name. Case-insensitive.
:type prefix: str, optional
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: A list of sub accounts
:rtype: dict
"""
uri = [SUB_ACCOUNTS_SUB_PATH]
params = {"ids": ids, "enabled": enabled, "prefix": prefix}
return _call_account_api("GET", uri, params=params, **options)
def create_sub_account(name, cloud_name=None, custom_attributes=None, enabled=None,
base_account=None, **options):
"""
Create a new sub account
:param name: Name of the new sub account
:type name: str
:param cloud_name: A case-insensitive cloud name comprised of alphanumeric and underscore characters.
* Generates an error if the cloud name is not unique across all Cloudinary accounts.
:type cloud_name: str, optional
:param custom_attributes: Any custom attributes you want to associate with the sub-account
:type custom_attributes: dict, optional
:param enabled: Whether to create the account as enabled (default is enabled).
:type enabled: bool, optional
:param base_account: ID of sub-account from which to copy settings
:type base_account: str, optional
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: The created sub account
:rtype: dict
"""
uri = [SUB_ACCOUNTS_SUB_PATH]
params = {"name": name,
"cloud_name": cloud_name,
"custom_attributes": custom_attributes,
"enabled": enabled,
"base_account": base_account}
return _call_account_api("POST", uri, params=params, **options)
def delete_sub_account(sub_account_id, **options):
"""
Delete a sub account
:param sub_account_id: The id of the sub account
:type sub_account_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: Result message
:rtype: dict
"""
uri = [SUB_ACCOUNTS_SUB_PATH, sub_account_id]
return _call_account_api("delete", uri, {}, **options)
def sub_account(sub_account_id, **options):
"""
Get information of a sub account
:param sub_account_id: The id of the sub account
:type sub_account_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: A sub account
:rtype: dict
"""
uri = [SUB_ACCOUNTS_SUB_PATH, sub_account_id]
return _call_account_api("get", uri, {}, **options)
def update_sub_account(sub_account_id, name=None, cloud_name=None, custom_attributes=None,
enabled=None, base_account=None,
**options):
"""
Update a sub account
:param sub_account_id: The id of the sub account
:type sub_account_id: str
:param name: Name of the account
:type name: str, optional
:param cloud_name: Unique cloud name
:type cloud_name: str, optional
:param custom_attributes: Any custom attributes you want to associate with the sub-account.
:type custom_attributes: dict, optional
:param enabled: Whether to create the account as enabled (default is enabled).
:type enabled: bool, optional
:param base_account: ID of sub-account from which to copy settings
:type base_account: str, optional
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: Updated sub account
:rtype: dict
"""
uri = [SUB_ACCOUNTS_SUB_PATH, sub_account_id]
params = {"name": name,
"cloud_name": cloud_name,
"custom_attributes": custom_attributes,
"enabled": enabled,
"base_account": base_account}
return _call_account_api("put", uri, params=params, **options)
def users(user_ids=None, sub_account_id=None, pending=None, prefix=None, **options):
"""
List all users
:param user_ids: The ids of the users to fetch
:type user_ids: list, optional
:param sub_account_id: The id of a sub account
:type sub_account_id: str, optional
:param pending: Limit results to pending users (True),
users that are not pending (False),
or all users (None, the default).
:type pending: bool, optional
:param prefix: User prefix
:type prefix: str, optional
:param options: Generic advanced options dict, see online documentation.
:type options: dict, optional
:return: List of users associated with the account
:rtype: dict
"""
uri = [USERS_SUB_PATH]
user_ids = encode_list(user_ids)
params = {"ids": user_ids,
"sub_account_id": sub_account_id,
"pending": pending,
"prefix": prefix}
return _call_account_api("get", uri, params=params, **options)
def create_user(name, email, role, sub_account_ids=None, **options):
"""
Create a user
:param name: Username
:type name: str
:param email: User's email
:type email: str
:param role: User's role
:type role: str
:param sub_account_ids: Optional. Sub accounts to associate with the user
:type sub_account_ids: list, optional
:param options: Generic advanced options dict, see online documentation.
:type options: dict, optional
:return: Details of created user
:rtype: dict
"""
uri = [USERS_SUB_PATH]
params = {"name": name,
"email": email,
"role": role,
"sub_account_ids": sub_account_ids}
return _call_account_api("post", uri, params=params, **options)
def delete_user(user_id, **options):
"""
Delete a user
:param user_id: The id of user to delete
:type user_id: str
:param options: Generic advanced options dict, see online documentation.
:type options: dict, optional
:return: Result message
:rtype: dict
"""
uri = [USERS_SUB_PATH, user_id]
return _call_account_api("delete", uri, {}, **options)
def user(user_id, **options):
"""
Get information of a user
:param user_id: The id of the user
:type user_id: str
:param options: Generic advanced options dict, see online documentation.
:type options: dict, optional
:return: A user
:rtype: dict
"""
uri = [USERS_SUB_PATH, user_id]
return _call_account_api("get", uri, {}, **options)
def update_user(user_id, name=None, email=None, role=None, sub_account_ids=None, **options):
"""
Update a user
:param user_id: The id of the user to update
:type user_id: str
:param name: Username
:type name: str, optional
:param email: User's email
:type email: str, optional
:param role: User's role
:type role: Role, optional
:param sub_account_ids: The list of sub-account IDs that this user can access.
Note: This parameter is ignored if the role is specified as master_admin.
:type sub_account_ids: list, optional
:param options: Generic advanced options dict, see online documentation.
:type options: dict, optional
:return: The updated user
:rtype: dict
"""
uri = [USERS_SUB_PATH, user_id]
params = {"name": name,
"email": email,
"role": role,
"sub_account_ids": sub_account_ids}
return _call_account_api("put", uri, params=params, **options)
def user_groups(**options):
"""
List all user groups
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: List of user groups
:rtype: ProvisioningAPIRespose
"""
uri = [USER_GROUPS_SUB_PATH]
return _call_account_api("get", uri, {}, **options)
def create_user_group(name, **options):
"""
Create a new user group
:param name: Name of the user group
:type name: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: The newly created group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH]
params = {"name": name}
return _call_account_api("post", uri, params, **options)
def update_user_group(user_group_id, name, **options):
"""
Update a user group
:param user_group_id: The id of the user group to update
:type user_group_id: str
:param name: Name of the user group
:type name: str, optional
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: The updated group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id]
params = {"name": name}
return _call_account_api("put", uri, params, **options)
def delete_user_group(user_group_id, **options):
"""
Delete a user group
:param user_group_id: The id of the user group to delete
:type user_group_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: The result message
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id]
return _call_account_api("delete", uri, {}, **options)
def user_group(user_group_id, **options):
"""
Get information of a user group
:param user_group_id: The id of the user group
:type user_group_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: Details of the group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id]
return _call_account_api("get", uri, {}, **options)
def add_user_to_group(user_group_id, user_id, **options):
"""
Add a user to a user group
:param user_group_id: The id of the user group to add the user to
:type user_group_id: str
:param user_id: The user id to add
:type user_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: List of users in the group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id, "users", user_id]
return _call_account_api("post", uri, {}, **options)
def remove_user_from_group(user_group_id, user_id, **options):
"""
Remove a user from a user group
:param user_group_id: The id of the user group to remove the user from
:type user_group_id: str
:param user_id: The id of the user to remove
:type user_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: List of users in the group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id, "users", user_id]
return _call_account_api("delete", uri, {}, **options)
def user_group_users(user_group_id, **options):
"""
Get all users in a user group
:param user_group_id: The id of user group to get list of users
:type user_group_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: List of users in the group
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_group_id, "users"]
return _call_account_api("get", uri, {}, **options)
def user_in_user_groups(user_id, **options):
"""
Get all user groups a user belongs to
:param user_id: The id of user
:param user_id: str
:param options: Generic advanced options dict, see online documentation
:type options: dict, optional
:return: List of groups user is in
:rtype: dict
"""
uri = [USER_GROUPS_SUB_PATH, user_id]
return _call_account_api("get", uri, {}, **options)

View file

@ -0,0 +1,42 @@
from __future__ import absolute_import
import os
from cloudinary import BaseConfig, import_django_settings
ACCOUNT_URI_SCHEME = "account"
class AccountConfig(BaseConfig):
def __init__(self):
self._uri_scheme = ACCOUNT_URI_SCHEME
super(AccountConfig, self).__init__()
def _config_from_parsed_url(self, parsed_url):
if not self._is_url_scheme_valid(parsed_url):
raise ValueError("Invalid CLOUDINARY_ACCOUNT_URL scheme. URL should begin with 'account://'")
return {
"account_id": parsed_url.hostname,
"provisioning_api_key": parsed_url.username,
"provisioning_api_secret": parsed_url.password,
}
def _load_config_from_env(self):
if os.environ.get("CLOUDINARY_ACCOUNT_URL"):
self._load_from_url(os.environ.get("CLOUDINARY_ACCOUNT_URL"))
def account_config(**keywords):
global _account_config
_account_config.update(**keywords)
return _account_config
def reset_config():
global _account_config
_account_config = AccountConfig()
_account_config = AccountConfig()

View file

@ -1,7 +1,7 @@
import json
from copy import deepcopy
from cloudinary.api import call_json_api
from cloudinary.api_client.call_api import call_json_api
class Search:

View file

@ -1403,7 +1403,8 @@ var slice = [].slice,
"*": "mul",
"/": "div",
"+": "add",
"-": "sub"
"-": "sub",
"^": "pow",
};
@ -1472,30 +1473,37 @@ var slice = [].slice,
return new this(expressionStr);
};
/**
* Normalize a string expression
* @function Cloudinary#normalize
* @param {string} expression a expression, e.g. "w gt 100", "width_gt_100", "width > 100"
* @return {string} the normalized form of the value expression, e.g. "w_gt_100"
*/
Expression.normalize = function(expression) {
var operators, pattern, replaceRE;
var operators, operatorsPattern, operatorsReplaceRE, predefinedVarsPattern, predefinedVarsReplaceRE;
if (expression == null) {
return expression;
}
expression = String(expression);
operators = "\\|\\||>=|<=|&&|!=|>|=|<|/|-|\\+|\\*";
pattern = "((" + operators + ")(?=[ _])|" + Object.keys(Expression.PREDEFINED_VARS).join("|") + ")";
replaceRE = new RegExp(pattern, "g");
expression = expression.replace(replaceRE, function(match) {
return Expression.OPERATORS[match] || Expression.PREDEFINED_VARS[match];
operators = "\\|\\||>=|<=|&&|!=|>|=|<|/|-|\\+|\\*|\\^";
// operators
operatorsPattern = "((" + operators + ")(?=[ _]))";
operatorsReplaceRE = new RegExp(operatorsPattern, "g");
expression = expression.replace(operatorsReplaceRE, function (match) {
return Expression.OPERATORS[match];
});
// predefined variables
predefinedVarsPattern = "(" + Object.keys(Expression.PREDEFINED_VARS).join("|") + ")";
predefinedVarsReplaceRE = new RegExp(predefinedVarsPattern, "g");
expression = expression.replace(predefinedVarsReplaceRE, function(match, p1, offset){
return (expression[offset - 1] === '$' ? match : Expression.PREDEFINED_VARS[match]);
});
return expression.replace(/[ _]+/g, '_');
};
/**
* Serialize the expression
* @return {string} the expression as a string
@ -4294,12 +4302,20 @@ var slice = [].slice,
switch (false) {
case !/w_auto:breakpoints/.test(dataSrc):
requiredWidth = maxWidth(containerWidth, tag);
if (requiredWidth) {
dataSrc = dataSrc.replace(/w_auto:breakpoints([_0-9]*)(:[0-9]+)?/, "w_auto:breakpoints$1:" + requiredWidth);
} else {
setUrl = false;
}
break;
case !(match = /w_auto(:(\d+))?/.exec(dataSrc)):
requiredWidth = applyBreakpoints.call(this, tag, containerWidth, match[2], options);
requiredWidth = maxWidth(requiredWidth, tag);
if (requiredWidth) {
dataSrc = dataSrc.replace(/w_auto[^,\/]*/g, "w_" + requiredWidth);
} else {
setUrl = false;
}
}
Util.removeAttribute(tag, 'width');
if (!options.responsive_preserve_height) {

View file

@ -1,4 +1,4 @@
{% load staticfiles %}
{% load static %}
<script src="{% static "js/jquery.ui.widget.js" %}" type="text/javascript"></script>
<script src="{% static "js/jquery.iframe-transport.js" %}" type="text/javascript"></script>

View file

@ -58,7 +58,7 @@ def upload_image(file, **options):
def upload_resource(file, **options):
result = upload(file, **options)
result = upload_large(file, **options)
return cloudinary.CloudinaryResource(
result["public_id"], version=str(result["version"]),
format=result.get("format"), type=result["type"],
@ -131,7 +131,9 @@ def rename(from_public_id, to_public_id, **options):
"invalidate": options.get("invalidate"),
"from_public_id": from_public_id,
"to_public_id": to_public_id,
"to_type": options.get("to_type")
"to_type": options.get("to_type"),
"context": options.get("context"),
"metadata": options.get("metadata")
}
return call_api("rename", params, **options)
@ -180,30 +182,81 @@ def create_zip(**options):
return create_archive(target_format="zip", **options)
def generate_sprite(tag, **options):
params = {
"timestamp": utils.now(),
"tag": tag,
"async": options.get("async"),
"notification_url": options.get("notification_url"),
"transformation": utils.generate_transformation_string(
fetch_format=options.get("format"), **options)[0]
}
def generate_sprite(tag=None, urls=None, **options):
"""
Generates sprites by merging multiple images into a single large image.
See: `Sprite method API reference
<https://cloudinary.com/documentation/image_upload_api_reference#sprite_method>`_
:param tag: The sprite is created from all images with this tag. If not set - `urls` parameter is required
:type tag: str
:param urls: List of URLs to create a sprite from. Can only be used if `tag` is not set
:type urls: list
:param options: Additional options
:type options: dict, optional
:return: Dictionary with meta information URLs of generated sprite resources
:rtype: dict
"""
params = utils.build_multi_and_sprite_params(tag=tag, urls=urls, **options)
return call_api("sprite", params, **options)
def multi(tag, **options):
params = {
"timestamp": utils.now(),
"tag": tag,
"format": options.get("format"),
"async": options.get("async"),
"notification_url": options.get("notification_url"),
"transformation": utils.generate_transformation_string(**options)[0]
}
def download_generated_sprite(tag=None, urls=None, **options):
"""
Returns signed URL for the sprite endpoint with `mode=download`
:param tag: The sprite is created from all images with this tag. If not set - `urls` parameter is required
:type tag: str
:param urls: List of URLs to create a sprite from. Can only be used if `tag` is not set
:type urls: list
:param options: Additional options
:type options: dict, optional
:return: The signed URL to download sprite
:rtype: str
"""
params = utils.build_multi_and_sprite_params(tag=tag, urls=urls, **options)
return utils.cloudinary_api_download_url(action="sprite", params=params, **options)
def multi(tag=None, urls=None, **options):
"""
Creates either a single animated image, video or a PDF.
See: `Upload method API reference
<https://cloudinary.com/documentation/image_upload_api_reference#multi_method>`_
:param tag: The animated image, video or PDF is created from all images with this tag.
If not set - `urls` parameter is required
:type tag: str
:param urls: List of URLs to create an animated image, video or PDF from. Can only be used if `tag` is not set
:type urls: list
:param options: Additional options
:type options: dict, optional
:return: Dictionary with meta information URLs of the generated file
:rtype: dict
"""
params = utils.build_multi_and_sprite_params(tag=tag, urls=urls, **options)
return call_api("multi", params, **options)
def download_multi(tag=None, urls=None, **options):
"""
Returns signed URL for the multi endpoint with `mode=download`
:param tag: The sprite is created from all images with this tag. If not set - `urls` parameter is required
:type tag: str
:param urls: List of URLs to create a sprite from. Can only be used if `tag` is not set
:type urls: list
:param options: Additional options
:type options: dict, optional
:return: The signed URL to download multi
:rtype: str
"""
params = utils.build_multi_and_sprite_params(tag=tag, urls=urls, **options)
return utils.cloudinary_api_download_url(action="multi", params=params, **options)
def explode(public_id, **options):
params = {
"timestamp": utils.now(),
@ -347,24 +400,30 @@ def call_cacheable_api(action, params, http_headers=None, return_error=False, un
def call_api(action, params, http_headers=None, return_error=False, unsigned=False, file=None, timeout=None, **options):
if http_headers is None:
http_headers = {}
file_io = None
try:
if unsigned:
params = utils.cleanup_params(params)
else:
headers = {"User-Agent": cloudinary.get_user_agent()}
if http_headers is not None:
headers.update(http_headers)
oauth_token = options.get("oauth_token", cloudinary.config().oauth_token)
if oauth_token:
headers["authorization"] = "Bearer {}".format(oauth_token)
elif not unsigned:
params = utils.sign_request(params, options)
param_list = OrderedDict()
param_list = []
for k, v in params.items():
if isinstance(v, list):
for i in range(len(v)):
param_list["{0}[{1}]".format(k, i)] = v[i]
for i in v:
param_list.append(("{0}[]".format(k), i))
elif v:
param_list[k] = v
param_list.append((k, v))
api_url = utils.cloudinary_api_url(action, **options)
if file:
filename = options.get("filename") # Custom filename provided by user (relevant only for streams and files)
@ -389,10 +448,7 @@ def call_api(action, params, http_headers=None, return_error=False, unsigned=Fal
name = filename or "file"
data = file
param_list["file"] = (name, data) if name else data
headers = {"User-Agent": cloudinary.get_user_agent()}
headers.update(http_headers)
param_list.append(("file", (name, data) if name else data))
kw = {}
if timeout is not None:
@ -421,6 +477,3 @@ def call_api(action, params, http_headers=None, return_error=False, unsigned=Fal
raise Error(result["error"]["message"])
return result
finally:
if file_io:
file_io.close()

View file

@ -15,10 +15,10 @@ from collections import OrderedDict
from datetime import datetime, date
from fractions import Fraction
from numbers import Number
from urllib3 import ProxyManager, PoolManager
import six.moves.urllib.parse
from six import iteritems
from six import iteritems, string_types
from urllib3 import ProxyManager, PoolManager
import cloudinary
from cloudinary import auth_token
@ -37,7 +37,7 @@ DEFAULT_RESPONSIVE_WIDTH_TRANSFORMATION = {"width": "auto", "crop": "limit"}
RANGE_VALUE_RE = r'^(?P<value>(\d+\.)?\d+)(?P<modifier>[%pP])?$'
RANGE_RE = r'^(\d+\.)?\d+[%pP]?\.\.(\d+\.)?\d+[%pP]?$'
FLOAT_RE = r'^(\d+)\.(\d+)?$'
REMOTE_URL_RE = r'ftp:|https?:|s3:|gs:|data:([\w-]+\/[\w-]+)?(;[\w-]+=[\w-]+)*;base64,([a-zA-Z0-9\/+\n=]+)$'
REMOTE_URL_RE = r'ftp:|https?:|s3:|gs:|data:([\w-]+\/[\w-]+(\+[\w-]+)?)?(;[\w-]+=[\w-]+)*;base64,([a-zA-Z0-9\/+\n=]+)$'
__LAYER_KEYWORD_PARAMS = [("font_weight", "normal"),
("font_style", "normal"),
("text_decoration", "none"),
@ -63,7 +63,9 @@ __URL_KEYS = [
'type',
'url_suffix',
'use_root_path',
'version'
'version',
'long_url_signature',
'signature_algorithm',
]
__SIMPLE_UPLOAD_PARAMS = [
@ -79,10 +81,12 @@ __SIMPLE_UPLOAD_PARAMS = [
"use_filename",
"unique_filename",
"discard_original_filename",
"filename_override",
"invalidate",
"notification_url",
"eager_notification_url",
"eager_async",
"eval",
"proxy",
"folder",
"overwrite",
@ -101,6 +105,7 @@ __SIMPLE_UPLOAD_PARAMS = [
"auto_tagging",
"async",
"cinemagraph_analysis",
"accessibility_analysis",
]
__SERIALIZED_UPLOAD_PARAMS = [
@ -121,16 +126,32 @@ __SERIALIZED_UPLOAD_PARAMS = [
upload_params = __SIMPLE_UPLOAD_PARAMS + __SERIALIZED_UPLOAD_PARAMS
SHORT_URL_SIGNATURE_LENGTH = 8
LONG_URL_SIGNATURE_LENGTH = 32
def compute_hex_hash(s):
SIGNATURE_SHA1 = "sha1"
SIGNATURE_SHA256 = "sha256"
signature_algorithms = {
SIGNATURE_SHA1: hashlib.sha1,
SIGNATURE_SHA256: hashlib.sha256,
}
def compute_hex_hash(s, algorithm=SIGNATURE_SHA1):
"""
Compute hash and convert the result to HEX string
Computes string hash using specified algorithm and return HEX string representation of hash.
:param s: string to process
:param s: String to compute hash for
:param algorithm: The name of algorithm to use for computing hash
:return: HEX string
:return: HEX string of computed hash value
"""
return hashlib.sha1(to_bytes(s)).hexdigest()
try:
hash_fn = signature_algorithms[algorithm]
except KeyError:
raise ValueError('Unsupported hash algorithm: {}'.format(algorithm))
return hash_fn(to_bytes(s)).hexdigest()
def build_array(arg):
@ -189,7 +210,7 @@ def encode_double_array(array):
if len(array) > 0 and isinstance(array[0], list):
return "|".join([",".join([str(i) for i in build_array(inner)]) for inner in array])
else:
return ",".join([str(i) for i in array])
return encode_list([str(i) for i in array])
def encode_dict(arg):
@ -203,16 +224,37 @@ def encode_dict(arg):
return arg
def encode_context(context):
def normalize_context_value(value):
"""
:param context: dict of context to be encoded
:return: a joined string of all keys and values properly escaped and separated by a pipe character
Escape "=" and "|" delimiter characters and json encode lists
:param value: Value to escape
:type value: int or str or list or tuple
:return: The normalized value
:rtype: str
"""
if isinstance(value, (list, tuple)):
value = json_encode(value)
return str(value).replace("=", "\\=").replace("|", "\\|")
def encode_context(context):
"""
Encode metadata fields based on incoming value.
List and tuple values are encoded to json strings.
:param context: dict of context to be encoded
:return: a joined string of all keys and values properly escaped and separated by a pipe character
"""
if not isinstance(context, dict):
return context
return "|".join(("{}={}".format(k, v.replace("=", "\\=").replace("|", "\\|"))) for k, v in iteritems(context))
return "|".join(("{}={}".format(k, normalize_context_value(v))) for k, v in iteritems(context))
def json_encode(value):
@ -226,6 +268,17 @@ def json_encode(value):
return json.dumps(value, default=__json_serializer, separators=(',', ':'))
def encode_date_to_usage_api_format(date_obj):
"""
Encodes date object to `dd-mm-yyyy` format string
:param date_obj: datetime.date object to encode
:return: Encoded date as a string
"""
return date_obj.strftime('%d-%m-%Y')
def patch_fetch_format(options):
"""
When upload type is fetch, remove the format options.
@ -488,6 +541,19 @@ def process_radius(param):
return str(param)
def process_params(params):
processed_params = None
if isinstance(params, dict):
processed_params = {}
for key, value in params.items():
if isinstance(value, list) or isinstance(value, tuple):
value_list = {"{}[{}]".format(key, i): i_value for i, i_value in enumerate(value)}
processed_params.update(value_list)
elif value is not None:
processed_params[key] = value
return processed_params
def cleanup_params(params):
return dict([(k, __safe_value(v)) for (k, v) in params.items() if v is not None and not v == ""])
@ -499,18 +565,19 @@ def sign_request(params, options):
api_secret = options.get("api_secret", cloudinary.config().api_secret)
if not api_secret:
raise ValueError("Must supply api_secret")
signature_algorithm = options.get("signature_algorithm", cloudinary.config().signature_algorithm)
params = cleanup_params(params)
params["signature"] = api_sign_request(params, api_secret)
params["signature"] = api_sign_request(params, api_secret, signature_algorithm)
params["api_key"] = api_key
return params
def api_sign_request(params_to_sign, api_secret):
def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1):
params = [(k + "=" + (",".join(v) if isinstance(v, list) else str(v))) for k, v in params_to_sign.items() if v]
to_sign = "&".join(sorted(params))
return compute_hex_hash(to_sign + api_secret)
return compute_hex_hash(to_sign + api_secret, algorithm)
def breakpoint_settings_mapper(breakpoint_settings):
@ -667,6 +734,8 @@ def cloudinary_url(source, **options):
url_suffix = options.pop("url_suffix", None)
use_root_path = options.pop("use_root_path", cloudinary.config().use_root_path)
auth_token = options.pop("auth_token", None)
long_url_signature = options.pop("long_url_signature", cloudinary.config().long_url_signature)
signature_algorithm = options.pop("signature_algorithm", cloudinary.config().signature_algorithm)
if auth_token is not False:
auth_token = merge(cloudinary.config().auth_token, auth_token)
@ -692,9 +761,18 @@ def cloudinary_url(source, **options):
signature = None
if sign_url and not auth_token:
to_sign = "/".join(__compact([transformation, source_to_sign]))
if long_url_signature:
# Long signature forces SHA256
signature_algorithm = SIGNATURE_SHA256
chars_length = LONG_URL_SIGNATURE_LENGTH
else:
chars_length = SHORT_URL_SIGNATURE_LENGTH
if signature_algorithm not in signature_algorithms:
raise ValueError("Unsupported signature algorithm '{}'".format(signature_algorithm))
hash_fn = signature_algorithms[signature_algorithm]
signature = "s--" + to_string(
base64.urlsafe_b64encode(
hashlib.sha1(to_bytes(to_sign + api_secret)).digest())[0:8]) + "--"
hash_fn(to_bytes(to_sign + api_secret)).digest())[0:chars_length]) + "--"
prefix = unsigned_download_url_prefix(
source, cloud_name, private_cdn, cdn_subdomain, secure_cdn_subdomain,
@ -708,15 +786,30 @@ def cloudinary_url(source, **options):
return source, options
def cloudinary_api_url(action='upload', **options):
cloudinary_prefix = options.get("upload_prefix", cloudinary.config().upload_prefix)\
def base_api_url(path, **options):
cloudinary_prefix = options.get("upload_prefix", cloudinary.config().upload_prefix) \
or "https://api.cloudinary.com"
cloud_name = options.get("cloud_name", cloudinary.config().cloud_name)
if not cloud_name:
raise ValueError("Must supply cloud_name")
path = build_array(path)
return encode_unicode_url("/".join([cloudinary_prefix, cloudinary.API_VERSION, cloud_name] + path))
def cloudinary_api_url(action='upload', **options):
resource_type = options.get("resource_type", "image")
return encode_unicode_url("/".join([cloudinary_prefix, "v1_1", cloud_name, resource_type, action]))
return base_api_url([resource_type, action], **options)
def cloudinary_api_download_url(action, params, **options):
params = params.copy()
params["mode"] = "download"
cloudinary_params = sign_request(params, options)
return cloudinary_api_url(action, **options) + "?" + urlencode(bracketize_seq(cloudinary_params), True)
def cloudinary_scaled_url(source, width, transformation, options):
@ -817,11 +910,7 @@ def bracketize_seq(params):
def download_archive_url(**options):
params = options.copy()
params.update(mode="download")
cloudinary_params = sign_request(archive_params(**params), options)
return cloudinary_api_url("generate_archive", **options) + "?" + \
urlencode(bracketize_seq(cloudinary_params), True)
return cloudinary_api_download_url(action="generate_archive", params=archive_params(**options), **options)
def download_zip_url(**options):
@ -830,6 +919,47 @@ def download_zip_url(**options):
return download_archive_url(**new_options)
def download_folder(folder_path, **options):
"""
Creates and returns a URL that when invoked creates an archive of a folder.
:param folder_path: The full path from the root that is used to generate download url.
:type folder_path: str
:param options: Additional options.
:type options: dict, optional
:return: Signed URL to download the folder.
:rtype: str
"""
options["prefixes"] = folder_path
options.setdefault("resource_type", "all")
return download_archive_url(**options)
def download_backedup_asset(asset_id, version_id, **options):
"""
The returned url allows downloading the backedup asset based on the the asset ID and the version ID.
Parameters asset_id and version_id are returned with api.resource(<PUBLIC_ID1>, versions=True) API call.
:param asset_id: The asset ID of the asset.
:type asset_id: str
:param version_id: The version ID of the asset.
:type version_id: str
:param options: Additional options.
:type options: dict, optional
:return:The signed URL for downloading backup version of the asset.
:rtype: str
"""
params = {
"timestamp": options.get("timestamp", now()),
"asset_id": asset_id,
"version_id": version_id
}
cloudinary_params = sign_request(params, options)
return base_api_url("download_backup", **options) + "?" + urlencode(bracketize_seq(cloudinary_params), True)
def generate_auth_token(**options):
token_options = merge(cloudinary.config().auth_token, options)
return auth_token.generate(**token_options)
@ -919,8 +1049,8 @@ def build_upload_params(**options):
"transformation": generate_transformation_string(**options)[0],
"headers": build_custom_headers(options.get("headers")),
"eager": build_eager(options.get("eager")),
"tags": options.get("tags") and ",".join(build_array(options["tags"])),
"allowed_formats": options.get("allowed_formats") and ",".join(build_array(options["allowed_formats"])),
"tags": options.get("tags") and encode_list(build_array(options["tags"])),
"allowed_formats": options.get("allowed_formats") and encode_list(build_array(options["allowed_formats"])),
"face_coordinates": encode_double_array(options.get("face_coordinates")),
"custom_coordinates": encode_double_array(options.get("custom_coordinates")),
"context": encode_context(options.get("context")),
@ -938,6 +1068,26 @@ def build_upload_params(**options):
return params
def build_multi_and_sprite_params(**options):
"""
Build params for multi, download_multi, generate_sprite, and download_generated_sprite methods
"""
tag = options.get("tag")
urls = options.get("urls")
if bool(tag) == bool(urls):
raise ValueError("Either 'tag' or 'urls' parameter has to be set but not both")
params = {
"mode": options.get("mode"),
"timestamp": now(),
"async": options.get("async"),
"notification_url": options.get("notification_url"),
"tag": tag,
"urls": urls,
"transformation": generate_transformation_string(fetch_format=options.get("format"), **options)[0]
}
return params
def __process_text_options(layer, layer_parameter):
font_family = layer.get("font_family")
font_size = layer.get("font_size")
@ -1067,22 +1217,38 @@ IF_OPERATORS = {
PREDEFINED_VARS = {
"aspect_ratio": "ar",
"aspectRatio": "ar",
"current_page": "cp",
"currentPage": "cp",
"face_count": "fc",
"faceCount": "fc",
"height": "h",
"initial_aspect_ratio": "iar",
"initialAspectRatio": "iar",
"trimmed_aspect_ratio": "tar",
"trimmedAspectRatio": "tar",
"initial_height": "ih",
"initialHeight": "ih",
"initial_width": "iw",
"initialWidth": "iw",
"page_count": "pc",
"pageCount": "pc",
"page_x": "px",
"pageX": "px",
"page_y": "py",
"pageY": "py",
"tags": "tags",
"width": "w",
"duration": "du",
"initial_duration": "idu",
"initialDuration": "idu",
"illustration_score": "ils",
"illustrationScore": "ils",
"context": "ctx"
}
replaceRE = "((\\|\\||>=|<=|&&|!=|>|=|<|/|-|\\+|\\*|\^)(?=[ _])|(?<!\$)(" + '|'.join(PREDEFINED_VARS.keys()) + "))"
replaceRE = "((\\|\\||>=|<=|&&|!=|>|=|<|/|-|\\+|\\*|\\^)(?=[ _])|(\\$_*[^_ ]+)|(?<!\\$)(" + \
'|'.join(PREDEFINED_VARS.keys()) + "))"
def translate_if(match):
@ -1277,13 +1443,15 @@ def check_property_enabled(f):
return wrapper
def verify_api_response_signature(public_id, version, signature):
def verify_api_response_signature(public_id, version, signature, algorithm=None):
"""
Verifies the authenticity of an API response signature
:param public_id: The public id of the asset as returned in the API response
:param version: The version of the asset as returned in the API response
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
:param algorithm: Name of hashing algorithm to use for calculation of HMACs.
By default uses `cloudinary.config().signature_algorithm`
:return: Boolean result of the validation
"""
@ -1293,10 +1461,14 @@ def verify_api_response_signature(public_id, version, signature):
parameters_to_sign = {'public_id': public_id,
'version': version}
return signature == api_sign_request(parameters_to_sign, cloudinary.config().api_secret)
return signature == api_sign_request(
parameters_to_sign,
cloudinary.config().api_secret,
algorithm or cloudinary.config().signature_algorithm
)
def verify_notification_signature(body, timestamp, signature, valid_for=7200):
def verify_notification_signature(body, timestamp, signature, valid_for=7200, algorithm=None):
"""
Verifies the authenticity of a notification signature
@ -1304,6 +1476,8 @@ def verify_notification_signature(body, timestamp, signature, valid_for=7200):
:param timestamp: Unix timestamp. Can be retrieved from the X-Cld-Timestamp header
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
:param valid_for: The desired time in seconds for considering the request valid
:param algorithm: Name of hashing algorithm to use for calculation of HMACs.
By default uses `cloudinary.config().signature_algorithm`
:return: Boolean result of the validation
"""
@ -1316,7 +1490,9 @@ def verify_notification_signature(body, timestamp, signature, valid_for=7200):
if not isinstance(body, str):
raise ValueError('Body should be type of string')
return signature == compute_hex_hash('{}{}{}'.format(body, timestamp, cloudinary.config().api_secret))
return signature == compute_hex_hash(
'{}{}{}'.format(body, timestamp, cloudinary.config().api_secret),
algorithm or cloudinary.config().signature_algorithm)
def get_http_connector(conf, options):
@ -1332,3 +1508,26 @@ def get_http_connector(conf, options):
return ProxyManager(conf.api_proxy, **options)
else:
return PoolManager(**options)
def encode_list(obj):
if isinstance(obj, list):
return ",".join(obj)
return obj
def safe_cast(val, casting_fn, default=None):
"""
Attempts to cast a value to another using a given casting function
Will return a default value if casting fails (configurable, defaults to None)
:param val: The value to cast
:param casting_fn: The casting function that will receive the value to cast
:param default: The return value if casting fails
:return: Result of casting the value or the value of the default parameter
"""
try:
return casting_fn(val)
except (ValueError, TypeError):
return default