diff --git a/lib/musicbrainzngs/__init__.py b/lib/musicbrainzngs/__init__.py new file mode 100644 index 00000000..22fed80d --- /dev/null +++ b/lib/musicbrainzngs/__init__.py @@ -0,0 +1,2 @@ +from musicbrainzngs.musicbrainz import * +from musicbrainzngs.caa import * diff --git a/lib/musicbrainzngs/caa.py b/lib/musicbrainzngs/caa.py new file mode 100644 index 00000000..caa0c5f3 --- /dev/null +++ b/lib/musicbrainzngs/caa.py @@ -0,0 +1,187 @@ +# This file is part of the musicbrainzngs library +# Copyright (C) Alastair Porter, Wieland Hoffmann, and others +# This file is distributed under a BSD-2-Clause type license. +# See the COPYING file for more information. + +__all__ = [ + 'set_caa_hostname', 'get_image_list', 'get_release_group_image_list', + 'get_release_group_image_front', 'get_image_front', 'get_image_back', + 'get_image' + ] + +import json + +from musicbrainzngs import compat +from musicbrainzngs import musicbrainz +from musicbrainzngs.util import _unicode + +hostname = "coverartarchive.org" +https = True + + +def set_caa_hostname(new_hostname, use_https=False): + """Set the base hostname for Cover Art Archive requests. + Defaults to 'coverartarchive.org', accessing over https. + For backwards compatibility, `use_https` is False by default. + + :param str new_hostname: The hostname (and port) of the CAA server to connect to + :param bool use_https: `True` if the host should be accessed using https. Default is `False` +""" + global hostname + global https + hostname = new_hostname + https = use_https + + +def _caa_request(mbid, imageid=None, size=None, entitytype="release"): + """ Make a CAA request. + + :param imageid: ``front``, ``back`` or a number from the listing obtained + with :meth:`get_image_list`. + :type imageid: str + + :param size: "250", "500", "1200" + :type size: str or None + + :param entitytype: ``release`` or ``release-group`` + :type entitytype: str + """ + # Construct the full URL for the request, including hostname and + # query string. + path = [entitytype, mbid] + if imageid and size: + path.append("%s-%s" % (imageid, size)) + elif imageid: + path.append(imageid) + url = compat.urlunparse(( + 'https' if https else 'http', + hostname, + '/%s' % '/'.join(path), + '', + '', + '' + )) + musicbrainz._log.debug("GET request for %s" % (url, )) + + # Set up HTTP request handler and URL opener. + httpHandler = compat.HTTPHandler(debuglevel=0) + handlers = [httpHandler] + + opener = compat.build_opener(*handlers) + + # Make request. + req = musicbrainz._MusicbrainzHttpRequest("GET", url, None) + # Useragent isn't needed for CAA, but we'll add it if it exists + if musicbrainz._useragent != "": + req.add_header('User-Agent', musicbrainz._useragent) + musicbrainz._log.debug("requesting with UA %s" % musicbrainz._useragent) + + resp = musicbrainz._safe_read(opener, req, None) + + # TODO: The content type declared by the CAA for JSON files is + # 'applicaiton/octet-stream'. This is not useful to detect whether the + # content is JSON, so default to decoding JSON if no imageid was supplied. + # http://tickets.musicbrainz.org/browse/CAA-75 + if imageid: + # If we asked for an image, return the image + return resp + else: + # Otherwise it's json + data = _unicode(resp) + return json.loads(data) + + +def get_image_list(releaseid): + """Get the list of cover art associated with a release. + + The return value is the deserialized response of the `JSON listing + `_ + returned by the Cover Art Archive API. + + If an error occurs then a :class:`~musicbrainzngs.ResponseError` will + be raised with one of the following HTTP codes: + + * 400: `Releaseid` is not a valid UUID + * 404: No release exists with an MBID of `releaseid` + * 503: Ratelimit exceeded + """ + return _caa_request(releaseid) + + +def get_release_group_image_list(releasegroupid): + """Get the list of cover art associated with a release group. + + The return value is the deserialized response of the `JSON listing + `_ + returned by the Cover Art Archive API. + + If an error occurs then a :class:`~musicbrainzngs.ResponseError` will + be raised with one of the following HTTP codes: + + * 400: `Releaseid` is not a valid UUID + * 404: No release exists with an MBID of `releaseid` + * 503: Ratelimit exceeded + """ + return _caa_request(releasegroupid, entitytype="release-group") + + +def get_release_group_image_front(releasegroupid, size=None): + """Download the front cover art for a release group. + The `size` argument and the possible error conditions are the same as for + :meth:`get_image`. + """ + return get_image(releasegroupid, "front", size=size, + entitytype="release-group") + + +def get_image_front(releaseid, size=None): + """Download the front cover art for a release. + The `size` argument and the possible error conditions are the same as for + :meth:`get_image`. + """ + return get_image(releaseid, "front", size=size) + + +def get_image_back(releaseid, size=None): + """Download the back cover art for a release. + The `size` argument and the possible error conditions are the same as for + :meth:`get_image`. + """ + return get_image(releaseid, "back", size=size) + + +def get_image(mbid, coverid, size=None, entitytype="release"): + """Download cover art for a release. The coverart file to download + is specified by the `coverid` argument. + + If `size` is not specified, download the largest copy present, which can be + very large. + + If an error occurs then a :class:`~musicbrainzngs.ResponseError` + will be raised with one of the following HTTP codes: + + * 400: `Releaseid` is not a valid UUID or `coverid` is invalid + * 404: No release exists with an MBID of `releaseid` + * 503: Ratelimit exceeded + + :param coverid: ``front``, ``back`` or a number from the listing obtained with + :meth:`get_image_list` + :type coverid: int or str + + :param size: "250", "500", "1200" or None. If it is None, the largest + available picture will be downloaded. If the image originally + uploaded to the Cover Art Archive was smaller than the + requested size, only the original image will be returned. + :type size: str or None + + :param entitytype: The type of entity for which to download the cover art. + This is either ``release`` or ``release-group``. + :type entitytype: str + :return: The binary image data + :type: str + """ + if isinstance(coverid, int): + coverid = "%d" % (coverid, ) + if isinstance(size, int): + size = "%d" % (size, ) + return _caa_request(mbid, coverid, size=size, entitytype=entitytype) diff --git a/lib/musicbrainzngs/compat.py b/lib/musicbrainzngs/compat.py new file mode 100644 index 00000000..689e0834 --- /dev/null +++ b/lib/musicbrainzngs/compat.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2012 Kenneth Reitz. + +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. + +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +pythoncompat +""" + + +import sys + +# ------- +# Pythons +# ------- + +# Syntax sugar. +_ver = sys.version_info + +#: Python 2.x? +is_py2 = (_ver[0] == 2) + +#: Python 3.x? +is_py3 = (_ver[0] == 3) + +# --------- +# Specifics +# --------- + +if is_py2: + from StringIO import StringIO + from urllib2 import HTTPPasswordMgr, HTTPDigestAuthHandler, Request,\ + HTTPHandler, build_opener, HTTPError, URLError + from httplib import BadStatusLine, HTTPException + from urlparse import urlunparse + from urllib import urlencode, quote_plus + + bytes = str + unicode = unicode + basestring = basestring +elif is_py3: + from io import StringIO + from urllib.request import HTTPPasswordMgr, HTTPDigestAuthHandler, Request,\ + HTTPHandler, build_opener + from urllib.error import HTTPError, URLError + from http.client import HTTPException, BadStatusLine + from urllib.parse import urlunparse, urlencode, quote_plus + + unicode = str + bytes = bytes + basestring = (str,bytes) diff --git a/lib/musicbrainzngs/mbxml.py b/lib/musicbrainzngs/mbxml.py new file mode 100644 index 00000000..e85fe7cb --- /dev/null +++ b/lib/musicbrainzngs/mbxml.py @@ -0,0 +1,817 @@ +# This file is part of the musicbrainzngs library +# Copyright (C) Alastair Porter, Adrian Sampson, and others +# This file is distributed under a BSD-2-Clause type license. +# See the COPYING file for more information. + +import re +import xml.etree.ElementTree as ET +import logging + +from . import util + + +def fixtag(tag, namespaces): + # given a decorated tag (of the form {uri}tag), return prefixed + # tag and namespace declaration, if any + if isinstance(tag, ET.QName): + tag = tag.text + namespace_uri, tag = tag[1:].split("}", 1) + prefix = namespaces.get(namespace_uri) + if prefix is None: + prefix = "ns%d" % len(namespaces) + namespaces[namespace_uri] = prefix + if prefix == "xml": + xmlns = None + else: + xmlns = ("xmlns:%s" % prefix, namespace_uri) + else: + xmlns = None + return "%s:%s" % (prefix, tag), xmlns + + +NS_MAP = {"http://musicbrainz.org/ns/mmd-2.0#": "ws2", + "http://musicbrainz.org/ns/ext#-2.0": "ext"} +_log = logging.getLogger("musicbrainzngs") + +def get_error_message(error): + """ Given an error XML message from the webservice containing + xy, return a list + of [x, y]""" + try: + tree = util.bytes_to_elementtree(error) + root = tree.getroot() + errors = [] + if root.tag == "error": + for ch in root: + if ch.tag == "text": + errors.append(ch.text) + return errors + except ET.ParseError: + return None + +def make_artist_credit(artists): + names = [] + for artist in artists: + if isinstance(artist, dict): + if "name" in artist: + names.append(artist.get("name", "")) + else: + names.append(artist.get("artist", {}).get("name", "")) + else: + names.append(artist) + return "".join(names) + +def parse_elements(valid_els, inner_els, element): + """ Extract single level subelements from an element. + For example, given the element: + + Text + + and a list valid_els that contains "subelement", + return a dict {'subelement': 'Text'} + + Delegate the parsing of multi-level subelements to another function. + For example, given the element: + + + FooBar + + + and a dictionary {'subelement': parse_subelement}, + call parse_subelement() and + return a dict {'subelement': } + if parse_subelement returns a tuple of the form + (True, {'subelement-key': }) + then merge the second element of the tuple into the + result (which may have a key other than 'subelement' or + more than 1 key) + """ + result = {} + for sub in element: + t = fixtag(sub.tag, NS_MAP)[0] + if ":" in t: + t = t.split(":")[1] + if t in valid_els: + result[t] = sub.text or "" + elif t in inner_els.keys(): + inner_result = inner_els[t](sub) + if isinstance(inner_result, tuple) and inner_result[0]: + result.update(inner_result[1]) + else: + result[t] = inner_result + # add counts for lists when available + m = re.match(r'([a-z0-9-]+)-list', t) + if m and "count" in sub.attrib: + result["%s-count" % m.group(1)] = int(sub.attrib["count"]) + else: + _log.info("in <%s>, uncaught <%s>", + fixtag(element.tag, NS_MAP)[0], t) + return result + +def parse_attributes(attributes, element): + """ Extract attributes from an element. + For example, given the element: + + and a list attributes that contains "type", + return a dict {'type': 'Group'} + """ + result = {} + for attr in element.attrib: + if "{" in attr: + a = fixtag(attr, NS_MAP)[0] + else: + a = attr + if a in attributes: + result[a] = element.attrib[attr] + else: + _log.info("in <%s>, uncaught attribute %s", fixtag(element.tag, NS_MAP)[0], attr) + + return result + +def parse_message(message): + tree = util.bytes_to_elementtree(message) + root = tree.getroot() + result = {} + valid_elements = {"area": parse_area, + "artist": parse_artist, + "instrument": parse_instrument, + "label": parse_label, + "place": parse_place, + "event": parse_event, + "release": parse_release, + "release-group": parse_release_group, + "series": parse_series, + "recording": parse_recording, + "work": parse_work, + "url": parse_url, + + "disc": parse_disc, + "cdstub": parse_cdstub, + "isrc": parse_isrc, + + "annotation-list": parse_annotation_list, + "area-list": parse_area_list, + "artist-list": parse_artist_list, + "label-list": parse_label_list, + "place-list": parse_place_list, + "event-list": parse_event_list, + "instrument-list": parse_instrument_list, + "release-list": parse_release_list, + "release-group-list": parse_release_group_list, + "series-list": parse_series_list, + "recording-list": parse_recording_list, + "work-list": parse_work_list, + "url-list": parse_url_list, + + "collection-list": parse_collection_list, + "collection": parse_collection, + + "message": parse_response_message + } + result.update(parse_elements([], valid_elements, root)) + return result + +def parse_response_message(message): + return parse_elements(["text"], {}, message) + +def parse_collection_list(cl): + return [parse_collection(c) for c in cl] + +def parse_collection(collection): + result = {} + attribs = ["id", "type", "entity-type"] + elements = ["name", "editor"] + inner_els = {"release-list": parse_release_list, + "artist-list": parse_artist_list, + "event-list": parse_event_list, + "place-list": parse_place_list, + "recording-list": parse_recording_list, + "work-list": parse_work_list} + result.update(parse_attributes(attribs, collection)) + result.update(parse_elements(elements, inner_els, collection)) + + return result + +def parse_annotation_list(al): + return [parse_annotation(a) for a in al] + +def parse_annotation(annotation): + result = {} + attribs = ["type", "ext:score"] + elements = ["entity", "name", "text"] + result.update(parse_attributes(attribs, annotation)) + result.update(parse_elements(elements, {}, annotation)) + return result + +def parse_lifespan(lifespan): + parts = parse_elements(["begin", "end", "ended"], {}, lifespan) + + return parts + +def parse_area_list(al): + return [parse_area(a) for a in al] + +def parse_area(area): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "sort-name", "disambiguation"] + inner_els = {"life-span": parse_lifespan, + "alias-list": parse_alias_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation, + "iso-3166-1-code-list": parse_element_list, + "iso-3166-2-code-list": parse_element_list, + "iso-3166-3-code-list": parse_element_list} + + result.update(parse_attributes(attribs, area)) + result.update(parse_elements(elements, inner_els, area)) + + return result + +def parse_artist_list(al): + return [parse_artist(a) for a in al] + +def parse_artist(artist): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "sort-name", "country", "user-rating", + "disambiguation", "gender", "ipi"] + inner_els = {"area": parse_area, + "begin-area": parse_area, + "end-area": parse_area, + "life-span": parse_lifespan, + "recording-list": parse_recording_list, + "relation-list": parse_relation_list, + "release-list": parse_release_list, + "release-group-list": parse_release_group_list, + "work-list": parse_work_list, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "rating": parse_rating, + "ipi-list": parse_element_list, + "isni-list": parse_element_list, + "alias-list": parse_alias_list, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, artist)) + result.update(parse_elements(elements, inner_els, artist)) + + return result + +def parse_coordinates(c): + return parse_elements(['latitude', 'longitude'], {}, c) + +def parse_place_list(pl): + return [parse_place(p) for p in pl] + +def parse_place(place): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "address", + "ipi", "disambiguation"] + inner_els = {"area": parse_area, + "coordinates": parse_coordinates, + "life-span": parse_lifespan, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "alias-list": parse_alias_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, place)) + result.update(parse_elements(elements, inner_els, place)) + + return result + +def parse_event_list(el): + return [parse_event(e) for e in el] + +def parse_event(event): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "time", "setlist", "cancelled", "disambiguation", "user-rating"] + inner_els = {"life-span": parse_lifespan, + "relation-list": parse_relation_list, + "alias-list": parse_alias_list, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "rating": parse_rating} + + result.update(parse_attributes(attribs, event)) + result.update(parse_elements(elements, inner_els, event)) + + return result + +def parse_instrument(instrument): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "description", "disambiguation"] + inner_els = {"relation-list": parse_relation_list, + "tag-list": parse_tag_list, + "alias-list": parse_alias_list, + "annotation": parse_annotation} + result.update(parse_attributes(attribs, instrument)) + result.update(parse_elements(elements, inner_els, instrument)) + + return result + +def parse_label_list(ll): + return [parse_label(l) for l in ll] + +def parse_label(label): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "sort-name", "country", "label-code", "user-rating", + "ipi", "disambiguation"] + inner_els = {"area": parse_area, + "life-span": parse_lifespan, + "release-list": parse_release_list, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "rating": parse_rating, + "ipi-list": parse_element_list, + "alias-list": parse_alias_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, label)) + result.update(parse_elements(elements, inner_els, label)) + + return result + +def parse_relation_target(tgt): + attributes = parse_attributes(['id'], tgt) + if 'id' in attributes: + return (True, {'target-id': attributes['id']}) + else: + return (True, {'target-id': tgt.text}) + +def parse_relation_list(rl): + attribs = ["target-type"] + ttype = parse_attributes(attribs, rl) + key = "%s-relation-list" % ttype["target-type"] + return (True, {key: [parse_relation(r) for r in rl]}) + +def parse_relation(relation): + result = {} + attribs = ["type", "type-id"] + elements = ["target", "direction", "begin", "end", "ended", "ordering-key"] + inner_els = {"area": parse_area, + "artist": parse_artist, + "instrument": parse_instrument, + "label": parse_label, + "place": parse_place, + "event": parse_event, + "recording": parse_recording, + "release": parse_release, + "release-group": parse_release_group, + "series": parse_series, + "attribute-list": parse_element_list, + "work": parse_work, + "target": parse_relation_target + } + result.update(parse_attributes(attribs, relation)) + result.update(parse_elements(elements, inner_els, relation)) + # We parse attribute-list again to get attributes that have both + # text and attribute values + result.update(parse_elements(['target-credit'], {"attribute-list": parse_relation_attribute_list}, relation)) + + return result + +def parse_relation_attribute_list(attributelist): + ret = [] + for attribute in attributelist: + ret.append(parse_relation_attribute_element(attribute)) + return (True, {"attributes": ret}) + +def parse_relation_attribute_element(element): + # Parses an attribute into a dictionary containing an element + # {"attribute": } and also an additional element + # containing any xml attributes. + # e.g number + # -> {"attribute": "number", "value": "BuxWV 1"} + result = {} + for attr in element.attrib: + if "{" in attr: + a = fixtag(attr, NS_MAP)[0] + else: + a = attr + result[a] = element.attrib[attr] + result["attribute"] = element.text + return result + +def parse_release(release): + result = {} + attribs = ["id", "ext:score"] + elements = ["title", "status", "disambiguation", "quality", "country", + "barcode", "date", "packaging", "asin"] + inner_els = {"text-representation": parse_text_representation, + "artist-credit": parse_artist_credit, + "label-info-list": parse_label_info_list, + "medium-list": parse_medium_list, + "release-group": parse_release_group, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation, + "cover-art-archive": parse_caa, + "release-event-list": parse_release_event_list} + + result.update(parse_attributes(attribs, release)) + result.update(parse_elements(elements, inner_els, release)) + if "artist-credit" in result: + result["artist-credit-phrase"] = make_artist_credit( + result["artist-credit"]) + + return result + +def parse_medium_list(ml): + """medium-list results from search have an additional + element containing the number of tracks + over all mediums. Optionally add this""" + medium_list = [] + track_count = None + for m in ml: + tag = fixtag(m.tag, NS_MAP)[0] + if tag == "ws2:medium": + medium_list.append(parse_medium(m)) + elif tag == "ws2:track-count": + track_count = int(m.text) + ret = {"medium-list": medium_list} + if track_count is not None: + ret["medium-track-count"] = track_count + + return (True, ret) + +def parse_release_event_list(rel): + return [parse_release_event(re) for re in rel] + +def parse_release_event(event): + result = {} + elements = ["date"] + inner_els = {"area": parse_area} + + result.update(parse_elements(elements, inner_els, event)) + return result + +def parse_medium(medium): + result = {} + elements = ["position", "format", "title"] + inner_els = {"disc-list": parse_disc_list, + "pregap": parse_track, + "track-list": parse_track_list, + "data-track-list": parse_track_list} + + result.update(parse_elements(elements, inner_els, medium)) + return result + +def parse_disc_list(dl): + return [parse_disc(d) for d in dl] + +def parse_text_representation(textr): + return parse_elements(["language", "script"], {}, textr) + +def parse_release_group(rg): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["title", "user-rating", "first-release-date", "primary-type", + "disambiguation"] + inner_els = {"artist-credit": parse_artist_credit, + "release-list": parse_release_list, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "secondary-type-list": parse_element_list, + "relation-list": parse_relation_list, + "rating": parse_rating, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, rg)) + result.update(parse_elements(elements, inner_els, rg)) + if "artist-credit" in result: + result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"]) + + return result + +def parse_recording(recording): + result = {} + attribs = ["id", "ext:score"] + elements = ["title", "length", "user-rating", "disambiguation", "video"] + inner_els = {"artist-credit": parse_artist_credit, + "release-list": parse_release_list, + "tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "rating": parse_rating, + "isrc-list": parse_external_id_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, recording)) + result.update(parse_elements(elements, inner_els, recording)) + if "artist-credit" in result: + result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"]) + + return result + +def parse_series_list(sl): + return [parse_series(s) for s in sl] + +def parse_series(series): + result = {} + attribs = ["id", "type", "ext:score"] + elements = ["name", "disambiguation"] + inner_els = {"alias-list": parse_alias_list, + "relation-list": parse_relation_list, + "annotation": parse_annotation} + + result.update(parse_attributes(attribs, series)) + result.update(parse_elements(elements, inner_els, series)) + + return result + +def parse_external_id_list(pl): + return [parse_attributes(["id"], p)["id"] for p in pl] + +def parse_element_list(el): + return [e.text for e in el] + +def parse_work_list(wl): + return [parse_work(w) for w in wl] + +def parse_work(work): + result = {} + attribs = ["id", "ext:score", "type"] + elements = ["title", "user-rating", "language", "iswc", "disambiguation"] + inner_els = {"tag-list": parse_tag_list, + "user-tag-list": parse_tag_list, + "rating": parse_rating, + "alias-list": parse_alias_list, + "iswc-list": parse_element_list, + "relation-list": parse_relation_list, + "annotation": parse_response_message, + "attribute-list": parse_work_attribute_list + } + + result.update(parse_attributes(attribs, work)) + result.update(parse_elements(elements, inner_els, work)) + + return result + +def parse_work_attribute_list(wal): + return [parse_work_attribute(wa) for wa in wal] + +def parse_work_attribute(wa): + attribs = ["type"] + typeinfo = parse_attributes(attribs, wa) + result = {} + if typeinfo: + result = {"attribute": typeinfo["type"], + "value": wa.text} + + return result + + +def parse_url_list(ul): + return [parse_url(u) for u in ul] + +def parse_url(url): + result = {} + attribs = ["id"] + elements = ["resource"] + inner_els = {"relation-list": parse_relation_list} + + result.update(parse_attributes(attribs, url)) + result.update(parse_elements(elements, inner_els, url)) + + return result + +def parse_disc(disc): + result = {} + attribs = ["id"] + elements = ["sectors"] + inner_els = {"release-list": parse_release_list, + "offset-list": parse_offset_list + } + + result.update(parse_attributes(attribs, disc)) + result.update(parse_elements(elements, inner_els, disc)) + + return result + +def parse_cdstub(cdstub): + result = {} + attribs = ["id"] + elements = ["title", "artist", "barcode"] + inner_els = {"track-list": parse_track_list} + + result.update(parse_attributes(attribs, cdstub)) + result.update(parse_elements(elements, inner_els, cdstub)) + + return result + +def parse_offset_list(ol): + return [int(o.text) for o in ol] + +def parse_instrument_list(rl): + result = [] + for r in rl: + result.append(parse_instrument(r)) + return result + +def parse_release_list(rl): + result = [] + for r in rl: + result.append(parse_release(r)) + return result + +def parse_release_group_list(rgl): + result = [] + for rg in rgl: + result.append(parse_release_group(rg)) + return result + +def parse_isrc(isrc): + result = {} + attribs = ["id"] + inner_els = {"recording-list": parse_recording_list} + + result.update(parse_attributes(attribs, isrc)) + result.update(parse_elements([], inner_els, isrc)) + + return result + +def parse_recording_list(recs): + result = [] + for r in recs: + result.append(parse_recording(r)) + return result + +def parse_artist_credit(ac): + result = [] + for namecredit in ac: + result.append(parse_name_credit(namecredit)) + join = parse_attributes(["joinphrase"], namecredit) + if "joinphrase" in join: + result.append(join["joinphrase"]) + return result + +def parse_name_credit(nc): + result = {} + elements = ["name"] + inner_els = {"artist": parse_artist} + + result.update(parse_elements(elements, inner_els, nc)) + + return result + +def parse_label_info_list(lil): + result = [] + + for li in lil: + result.append(parse_label_info(li)) + return result + +def parse_label_info(li): + result = {} + elements = ["catalog-number"] + inner_els = {"label": parse_label} + + result.update(parse_elements(elements, inner_els, li)) + return result + +def parse_track_list(tl): + result = [] + for t in tl: + result.append(parse_track(t)) + return result + +def parse_track(track): + result = {} + attribs = ["id"] + elements = ["number", "position", "title", "length"] + inner_els = {"recording": parse_recording, + "artist-credit": parse_artist_credit} + + result.update(parse_attributes(attribs, track)) + result.update(parse_elements(elements, inner_els, track)) + if "artist-credit" in result.get("recording", {}) and "artist-credit" not in result: + result["artist-credit"] = result["recording"]["artist-credit"] + if "artist-credit" in result: + result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"]) + # Make a length field that contains track length or recording length + track_or_recording = None + if "length" in result: + track_or_recording = result["length"] + elif result.get("recording", {}).get("length"): + track_or_recording = result.get("recording", {}).get("length") + if track_or_recording: + result["track_or_recording_length"] = track_or_recording + return result + +def parse_tag_list(tl): + return [parse_tag(t) for t in tl] + +def parse_tag(tag): + result = {} + attribs = ["count"] + elements = ["name"] + + result.update(parse_attributes(attribs, tag)) + result.update(parse_elements(elements, {}, tag)) + + return result + +def parse_rating(rating): + result = {} + attribs = ["votes-count"] + + result.update(parse_attributes(attribs, rating)) + result["rating"] = rating.text + + return result + +def parse_alias_list(al): + return [parse_alias(a) for a in al] + +def parse_alias(alias): + result = {} + attribs = ["locale", "sort-name", "type", "primary", + "begin-date", "end-date"] + + result.update(parse_attributes(attribs, alias)) + result["alias"] = alias.text + + return result + +def parse_caa(caa_element): + result = {} + elements = ["artwork", "count", "front", "back", "darkened"] + + result.update(parse_elements(elements, {}, caa_element)) + return result + + +### + +def make_barcode_request(release2barcode): + NS = "http://musicbrainz.org/ns/mmd-2.0#" + root = ET.Element("{%s}metadata" % NS) + rel_list = ET.SubElement(root, "{%s}release-list" % NS) + for release, barcode in release2barcode.items(): + rel_xml = ET.SubElement(rel_list, "{%s}release" % NS) + bar_xml = ET.SubElement(rel_xml, "{%s}barcode" % NS) + rel_xml.set("{%s}id" % NS, release) + bar_xml.text = barcode + + return ET.tostring(root, "utf-8") + +def make_tag_request(**kwargs): + NS = "http://musicbrainz.org/ns/mmd-2.0#" + root = ET.Element("{%s}metadata" % NS) + for entity_type in ['artist', 'label', 'place', 'recording', 'release', 'release_group', 'work']: + entity_tags = kwargs.pop(entity_type + '_tags', None) + if entity_tags is not None: + e_list = ET.SubElement(root, "{%s}%s-list" % (NS, entity_type.replace('_', '-'))) + for e, tags in entity_tags.items(): + e_xml = ET.SubElement(e_list, "{%s}%s" % (NS, entity_type.replace('_', '-'))) + e_xml.set("{%s}id" % NS, e) + taglist = ET.SubElement(e_xml, "{%s}user-tag-list" % NS) + for tag in tags: + usertag_xml = ET.SubElement(taglist, "{%s}user-tag" % NS) + name_xml = ET.SubElement(usertag_xml, "{%s}name" % NS) + name_xml.text = tag + if kwargs.keys(): + raise TypeError("make_tag_request() got an unexpected keyword argument '%s'" % kwargs.popitem()[0]) + + return ET.tostring(root, "utf-8") + +def make_rating_request(**kwargs): + NS = "http://musicbrainz.org/ns/mmd-2.0#" + root = ET.Element("{%s}metadata" % NS) + for entity_type in ['artist', 'label', 'recording', 'release_group', 'work']: + entity_ratings = kwargs.pop(entity_type + '_ratings', None) + if entity_ratings is not None: + e_list = ET.SubElement(root, "{%s}%s-list" % (NS, entity_type.replace('_', '-'))) + for e, rating in entity_ratings.items(): + e_xml = ET.SubElement(e_list, "{%s}%s" % (NS, entity_type.replace('_', '-'))) + e_xml.set("{%s}id" % NS, e) + rating_xml = ET.SubElement(e_xml, "{%s}user-rating" % NS) + rating_xml.text = str(rating) + if kwargs.keys(): + raise TypeError("make_rating_request() got an unexpected keyword argument '%s'" % kwargs.popitem()[0]) + + return ET.tostring(root, "utf-8") + +def make_isrc_request(recording2isrcs): + NS = "http://musicbrainz.org/ns/mmd-2.0#" + root = ET.Element("{%s}metadata" % NS) + rec_list = ET.SubElement(root, "{%s}recording-list" % NS) + for rec, isrcs in recording2isrcs.items(): + if len(isrcs) > 0: + rec_xml = ET.SubElement(rec_list, "{%s}recording" % NS) + rec_xml.set("{%s}id" % NS, rec) + isrc_list_xml = ET.SubElement(rec_xml, "{%s}isrc-list" % NS) + isrc_list_xml.set("{%s}count" % NS, str(len(isrcs))) + for isrc in isrcs: + isrc_xml = ET.SubElement(isrc_list_xml, "{%s}isrc" % NS) + isrc_xml.set("{%s}id" % NS, isrc) + return ET.tostring(root, "utf-8") diff --git a/lib/musicbrainzngs/musicbrainz.py b/lib/musicbrainzngs/musicbrainz.py new file mode 100644 index 00000000..5540db95 --- /dev/null +++ b/lib/musicbrainzngs/musicbrainz.py @@ -0,0 +1,1312 @@ +# This file is part of the musicbrainzngs library +# Copyright (C) Alastair Porter, Adrian Sampson, and others +# This file is distributed under a BSD-2-Clause type license. +# See the COPYING file for more information. + +import re +import threading +import time +import logging +import socket +import hashlib +import locale +import sys +import json +import xml.etree.ElementTree as etree +from xml.parsers import expat +from warnings import warn + +from musicbrainzngs import mbxml +from musicbrainzngs import util +from musicbrainzngs import compat + +_version = "0.7dev" +_log = logging.getLogger("musicbrainzngs") + +LUCENE_SPECIAL = r'([+\-&|!(){}\[\]\^"~*?:\\\/])' + +# Constants for validation. + +RELATABLE_TYPES = ['area', 'artist', 'label', 'place', 'event', 'recording', 'release', 'release-group', 'series', 'url', 'work', 'instrument'] +RELATION_INCLUDES = [entity + '-rels' for entity in RELATABLE_TYPES] +TAG_INCLUDES = ["tags", "user-tags"] +RATING_INCLUDES = ["ratings", "user-ratings"] + +VALID_INCLUDES = { + 'area' : ["aliases", "annotation"] + RELATION_INCLUDES, + 'artist': [ + "recordings", "releases", "release-groups", "works", # Subqueries + "various-artists", "discids", "media", "isrcs", + "aliases", "annotation" + ] + RELATION_INCLUDES + TAG_INCLUDES + RATING_INCLUDES, + 'annotation': [ + + ], + 'instrument': ["aliases", "annotation" + ] + RELATION_INCLUDES + TAG_INCLUDES, + 'label': [ + "releases", # Subqueries + "discids", "media", + "aliases", "annotation" + ] + RELATION_INCLUDES + TAG_INCLUDES + RATING_INCLUDES, + 'place' : ["aliases", "annotation"] + RELATION_INCLUDES + TAG_INCLUDES, + 'event' : ["aliases"] + RELATION_INCLUDES + TAG_INCLUDES + RATING_INCLUDES, + 'recording': [ + "artists", "releases", # Subqueries + "discids", "media", "artist-credits", "isrcs", + "work-level-rels", "annotation", "aliases" + ] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'release': [ + "artists", "labels", "recordings", "release-groups", "media", + "artist-credits", "discids", "isrcs", + "recording-level-rels", "work-level-rels", "annotation", "aliases" + ] + TAG_INCLUDES + RELATION_INCLUDES, + 'release-group': [ + "artists", "releases", "discids", "media", + "artist-credits", "annotation", "aliases" + ] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'series': [ + "annotation", "aliases" + ] + RELATION_INCLUDES, + 'work': [ + "aliases", "annotation" + ] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'url': RELATION_INCLUDES, + 'discid': [ # Discid should be the same as release + "artists", "labels", "recordings", "release-groups", "media", + "artist-credits", "discids", "isrcs", + "recording-level-rels", "work-level-rels", "annotation", "aliases" + ] + RELATION_INCLUDES, + 'isrc': ["artists", "releases", "isrcs"], + 'iswc': ["artists"], + 'collection': ['releases'], +} +VALID_BROWSE_INCLUDES = { + 'artist': ["aliases"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'event': ["aliases"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'label': ["aliases"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'recording': ["artist-credits", "isrcs"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'release': ["artist-credits", "labels", "recordings", "isrcs", + "release-groups", "media", "discids"] + RELATION_INCLUDES, + 'place': ["aliases"] + TAG_INCLUDES + RELATION_INCLUDES, + 'release-group': ["artist-credits"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, + 'url': RELATION_INCLUDES, + 'work': ["aliases", "annotation"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES, +} + +#: These can be used to filter whenever releases are includes or browsed +VALID_RELEASE_TYPES = [ + "nat", + "album", "single", "ep", "broadcast", "other", # primary types + "compilation", "soundtrack", "spokenword", "interview", "audiobook", + "live", "remix", "dj-mix", "mixtape/street", # secondary types +] +#: These can be used to filter whenever releases or release-groups are involved +VALID_RELEASE_STATUSES = ["official", "promotion", "bootleg", "pseudo-release"] +VALID_SEARCH_FIELDS = { + 'annotation': [ + 'entity', 'name', 'text', 'type' + ], + 'area': [ + 'aid', 'alias', 'area', 'areaaccent', 'begin', 'comment', 'end', + 'ended', 'iso', 'iso1', 'iso2', 'iso3', 'sortname', 'tag', 'type' + ], + 'artist': [ + 'alias', 'area', 'arid', 'artist', 'artistaccent', 'begin', 'beginarea', + 'comment', 'country', 'end', 'endarea', 'ended', 'gender', + 'ipi', 'isni', 'primary_alias', 'sortname', 'tag', 'type' + ], + 'event': [ + 'aid', 'alias', 'area', 'arid', 'artist', 'begin', 'comment', 'eid', + 'end', 'ended', 'event', 'eventaccent', 'pid', 'place', 'tag', 'type' + ], + 'instrument': [ + 'alias', 'comment', 'description', 'iid', 'instrument', + 'instrumentaccent', 'tag', 'type' + ], + 'label': [ + 'alias', 'area', 'begin', 'code', 'comment', 'country', 'end', 'ended', + 'ipi', 'label', 'labelaccent', 'laid', 'release_count', 'sortname', + 'tag', 'type' + ], + 'place': [ + 'address', 'alias', 'area', 'begin', 'comment', 'end', 'ended', 'lat', 'long', + 'pid', 'place', 'placeaccent', 'type' + ], + 'recording': [ + 'alias', 'arid', 'artist', 'artistname', 'comment', 'country', + 'creditname', 'date', 'dur', 'format', 'isrc', 'number', 'position', + 'primarytype', 'qdur', 'recording', 'recordingaccent', 'reid', + 'release', 'rgid', 'rid', 'secondarytype', 'status', 'tag', 'tid', + 'tnum', 'tracks', 'tracksrelease', 'type', 'video'], + + 'release-group': [ + 'alias', 'arid', 'artist', 'artistname', 'comment', 'creditname', + 'primarytype', 'reid', 'release', 'releasegroup', 'releasegroupaccent', + 'releases', 'rgid', 'secondarytype', 'status', 'tag', 'type' + ], + 'release': [ + 'alias', 'arid', 'artist', 'artistname', 'asin', 'barcode', 'catno', + 'comment', 'country', 'creditname', 'date', 'discids', 'discidsmedium', + 'format', 'label', 'laid', 'lang', 'mediums', 'primarytype', 'quality', + 'reid', 'release', 'releaseaccent', 'rgid', 'script', 'secondarytype', + 'status', 'tag', 'tracks', 'tracksmedium', 'type' + ], + 'series': [ + 'alias', 'comment', 'orderingattribute', 'series', 'seriesaccent', + 'sid', 'tag', 'type' + ], + 'work': [ + 'alias', 'arid', 'artist', 'comment', 'iswc', 'lang', 'recording', + 'recording_count', 'rid', 'tag', 'type', 'wid', 'work', 'workaccent' + ] +} + +# Constants +class AUTH_YES: pass +class AUTH_NO: pass +class AUTH_IFSET: pass + + +# Exceptions. + +class MusicBrainzError(Exception): + """Base class for all exceptions related to MusicBrainz.""" + pass + +class UsageError(MusicBrainzError): + """Error related to misuse of the module API.""" + pass + +class InvalidSearchFieldError(UsageError): + pass + +class InvalidIncludeError(UsageError): + def __init__(self, msg='Invalid Includes', reason=None): + super(InvalidIncludeError, self).__init__(self) + self.msg = msg + self.reason = reason + + def __str__(self): + return self.msg + +class InvalidFilterError(UsageError): + def __init__(self, msg='Invalid Includes', reason=None): + super(InvalidFilterError, self).__init__(self) + self.msg = msg + self.reason = reason + + def __str__(self): + return self.msg + +class WebServiceError(MusicBrainzError): + """Error related to MusicBrainz API requests.""" + def __init__(self, message=None, cause=None): + """Pass ``cause`` if this exception was caused by another + exception. + """ + self.message = message + self.cause = cause + + def __str__(self): + if self.message: + msg = "%s, " % self.message + else: + msg = "" + msg += "caused by: %s" % str(self.cause) + return msg + +class NetworkError(WebServiceError): + """Problem communicating with the MB server.""" + pass + +class ResponseError(WebServiceError): + """Bad response sent by the MB server.""" + pass + +class AuthenticationError(WebServiceError): + """Received a HTTP 401 response while accessing a protected resource.""" + pass + + +# Helpers for validating and formatting allowed sets. + +def _check_includes_impl(includes, valid_includes): + for i in includes: + if i not in valid_includes: + raise InvalidIncludeError("Bad includes: " + "%s is not a valid include" % i) +def _check_includes(entity, inc): + _check_includes_impl(inc, VALID_INCLUDES[entity]) + +def _check_filter(values, valid): + for v in values: + if v not in valid: + raise InvalidFilterError(v) + +def _check_filter_and_make_params(entity, includes, release_status=[], release_type=[]): + """Check that the status or type values are valid. Then, check that + the filters can be used with the given includes. Return a params + dict that can be passed to _do_mb_query. + """ + if isinstance(release_status, compat.basestring): + release_status = [release_status] + if isinstance(release_type, compat.basestring): + release_type = [release_type] + _check_filter(release_status, VALID_RELEASE_STATUSES) + _check_filter(release_type, VALID_RELEASE_TYPES) + + if (release_status + and "releases" not in includes and entity != "release"): + raise InvalidFilterError("Can't have a status with no release include") + if (release_type + and "release-groups" not in includes and "releases" not in includes + and entity not in ["release-group", "release"]): + raise InvalidFilterError("Can't have a release type " + "with no releases or release-groups involved") + + # Build parameters. + params = {} + if len(release_status): + params["status"] = "|".join(release_status) + if len(release_type): + params["type"] = "|".join(release_type) + return params + +def _docstring_get(entity): + includes = list(VALID_INCLUDES.get(entity, [])) + return _docstring_impl("includes", includes) + +def _docstring_browse(entity): + includes = list(VALID_BROWSE_INCLUDES.get(entity, [])) + return _docstring_impl("includes", includes) + +def _docstring_search(entity): + search_fields = list(VALID_SEARCH_FIELDS.get(entity, [])) + return _docstring_impl("fields", search_fields) + +def _docstring_impl(name, values): + def _decorator(func): + vstr = ", ".join(values) + args = {name: vstr} + if func.__doc__: + func.__doc__ = func.__doc__.format(**args) + return func + + return _decorator + + +# Global authentication and endpoint details. + +user = password = "" +hostname = "musicbrainz.org" +https = True +_client = "" +_useragent = "" + +def auth(u, p): + """Set the username and password to be used in subsequent queries to + the MusicBrainz XML API that require authentication. + """ + global user, password + user = u + password = p + +def set_useragent(app, version, contact=None): + """Set the User-Agent to be used for requests to the MusicBrainz webservice. + This must be set before requests are made.""" + global _useragent, _client + if not app or not version: + raise ValueError("App and version can not be empty") + if contact is not None: + _useragent = "%s/%s python-musicbrainzngs/%s ( %s )" % (app, version, _version, contact) + else: + _useragent = "%s/%s python-musicbrainzngs/%s" % (app, version, _version) + _client = "%s-%s" % (app, version) + _log.debug("set user-agent to %s" % _useragent) + + +def set_hostname(new_hostname, use_https=False): + """Set the hostname for MusicBrainz webservice requests. + Defaults to 'musicbrainz.org', accessing over https. + For backwards compatibility, `use_https` is False by default. + + :param str new_hostname: The hostname (and port) of the MusicBrainz server to connect to + :param bool use_https: `True` if the host should be accessed using https. Default is `False` + + Specify a non-standard port by adding it to the hostname, + for example 'localhost:8000'.""" + global hostname + global https + hostname = new_hostname + https = use_https + +# Rate limiting. + +limit_interval = 1.0 +limit_requests = 1 +do_rate_limit = True + +def set_rate_limit(limit_or_interval=1.0, new_requests=1): + """Sets the rate limiting behavior of the module. Must be invoked + before the first Web service call. + If the `limit_or_interval` parameter is set to False then + rate limiting will be disabled. If it is a number then only + a set number of requests (`new_requests`) will be made per + given interval (`limit_or_interval`). + """ + global limit_interval + global limit_requests + global do_rate_limit + if isinstance(limit_or_interval, bool): + do_rate_limit = limit_or_interval + else: + if limit_or_interval <= 0.0: + raise ValueError("limit_or_interval can't be less than 0") + if new_requests <= 0: + raise ValueError("new_requests can't be less than 0") + do_rate_limit = True + limit_interval = limit_or_interval + limit_requests = new_requests + +class _rate_limit(object): + """A decorator that limits the rate at which the function may be + called. The rate is controlled by the `limit_interval` and + `limit_requests` global variables. The limiting is thread-safe; + only one thread may be in the function at a time (acts like a + monitor in this sense). The globals must be set before the first + call to the limited function. + """ + def __init__(self, fun): + self.fun = fun + self.last_call = 0.0 + self.lock = threading.Lock() + self.remaining_requests = None # Set on first invocation. + + def _update_remaining(self): + """Update remaining requests based on the elapsed time since + they were last calculated. + """ + # On first invocation, we have the maximum number of requests + # available. + if self.remaining_requests is None: + self.remaining_requests = float(limit_requests) + + else: + since_last_call = time.time() - self.last_call + self.remaining_requests += since_last_call * \ + (limit_requests / limit_interval) + self.remaining_requests = min(self.remaining_requests, + float(limit_requests)) + + self.last_call = time.time() + + def __call__(self, *args, **kwargs): + with self.lock: + if do_rate_limit: + self._update_remaining() + + # Delay if necessary. + while self.remaining_requests < 0.999: + time.sleep((1.0 - self.remaining_requests) * + (limit_requests / limit_interval)) + self._update_remaining() + + # Call the original function, "paying" for this call. + self.remaining_requests -= 1.0 + return self.fun(*args, **kwargs) + +# From pymb2 +class _RedirectPasswordMgr(compat.HTTPPasswordMgr): + def __init__(self): + self._realms = { } + + def find_user_password(self, realm, uri): + # ignoring the uri parameter intentionally + try: + return self._realms[realm] + except KeyError: + return (None, None) + + def add_password(self, realm, uri, username, password): + # ignoring the uri parameter intentionally + self._realms[realm] = (username, password) + +class _DigestAuthHandler(compat.HTTPDigestAuthHandler): + def get_authorization (self, req, chal): + qop = chal.get ('qop', None) + if qop and ',' in qop and 'auth' in qop.split (','): + chal['qop'] = 'auth' + + return compat.HTTPDigestAuthHandler.get_authorization (self, req, chal) + + def _encode_utf8(self, msg): + """The MusicBrainz server also accepts UTF-8 encoded passwords.""" + encoding = sys.stdin.encoding or locale.getpreferredencoding() + try: + # This works on Python 2 (msg in bytes) + msg = msg.decode(encoding) + except AttributeError: + # on Python 3 (msg is already in unicode) + pass + return msg.encode("utf-8") + + def get_algorithm_impls(self, algorithm): + # algorithm should be case-insensitive according to RFC2617 + algorithm = algorithm.upper() + # lambdas assume digest modules are imported at the top level + if algorithm == 'MD5': + H = lambda x: hashlib.md5(self._encode_utf8(x)).hexdigest() + elif algorithm == 'SHA': + H = lambda x: hashlib.sha1(self._encode_utf8(x)).hexdigest() + # XXX MD5-sess + KD = lambda s, d: H("%s:%s" % (s, d)) + return H, KD + +class _MusicbrainzHttpRequest(compat.Request): + """ A custom request handler that allows DELETE and PUT""" + def __init__(self, method, url, data=None): + compat.Request.__init__(self, url, data) + allowed_m = ["GET", "POST", "DELETE", "PUT"] + if method not in allowed_m: + raise ValueError("invalid method: %s" % method) + self.method = method + + def get_method(self): + return self.method + + +# Core (internal) functions for calling the MB API. + +def _safe_read(opener, req, body=None, max_retries=8, retry_delay_delta=2.0): + """Open an HTTP request with a given URL opener and (optionally) a + request body. Transient errors lead to retries. Permanent errors + and repeated errors are translated into a small set of handleable + exceptions. Return a bytestring. + """ + last_exc = None + for retry_num in range(max_retries): + if retry_num: # Not the first try: delay an increasing amount. + _log.info("retrying after delay (#%i)" % retry_num) + time.sleep(retry_num * retry_delay_delta) + + try: + if body: + f = opener.open(req, body) + else: + f = opener.open(req) + return f.read() + + except compat.HTTPError as exc: + if exc.code in (400, 404, 411): + # Bad request, not found, etc. + raise ResponseError(cause=exc) + elif exc.code in (503, 502, 500): + # Rate limiting, internal overloading... + _log.info("HTTP error %i" % exc.code) + elif exc.code in (401, ): + raise AuthenticationError(cause=exc) + else: + # Other, unknown error. Should handle more cases, but + # retrying for now. + _log.info("unknown HTTP error %i" % exc.code) + last_exc = exc + except compat.BadStatusLine as exc: + _log.info("bad status line") + last_exc = exc + except compat.HTTPException as exc: + _log.info("miscellaneous HTTP exception: %s" % str(exc)) + last_exc = exc + except compat.URLError as exc: + if isinstance(exc.reason, socket.error): + code = exc.reason.errno + if code == 104: # "Connection reset by peer." + continue + raise NetworkError(cause=exc) + except socket.timeout as exc: + _log.info("socket timeout") + last_exc = exc + except socket.error as exc: + if exc.errno == 104: + continue + raise NetworkError(cause=exc) + except IOError as exc: + raise NetworkError(cause=exc) + + # Out of retries! + raise NetworkError("retried %i times" % max_retries, last_exc) + +# Get the XML parsing exceptions to catch. The behavior chnaged with Python 2.7 +# and ElementTree 1.3. +if hasattr(etree, 'ParseError'): + ETREE_EXCEPTIONS = (etree.ParseError, expat.ExpatError) +else: + ETREE_EXCEPTIONS = (expat.ExpatError) + + +# Parsing setup + +def mb_parser_null(resp): + """Return the raw response (XML)""" + return resp + +def mb_parser_xml(resp): + """Return a Python dict representing the XML response""" + # Parse the response. + try: + return mbxml.parse_message(resp) + except UnicodeError as exc: + raise ResponseError(cause=exc) + except Exception as exc: + if isinstance(exc, ETREE_EXCEPTIONS): + raise ResponseError(cause=exc) + else: + raise + +# Defaults +parser_fun = mb_parser_xml +ws_format = "xml" + +def set_parser(new_parser_fun=None): + """Sets the function used to parse the response from the + MusicBrainz web service. + + If no parser is given, the parser is reset to the default parser + :func:`mb_parser_xml`. + """ + global parser_fun + if new_parser_fun is None: + new_parser_fun = mb_parser_xml + if not callable(new_parser_fun): + raise ValueError("new_parser_fun must be callable") + parser_fun = new_parser_fun + +def set_format(fmt="xml"): + """Sets the format that should be returned by the Web Service. + The server currently supports `xml` and `json`. + + This method will set a default parser for the specified format, + but you can modify it with :func:`set_parser`. + + .. warning:: The json format used by the server is different from + the json format returned by the `musicbrainzngs` internal parser + when using the `xml` format! This format may change at any time. + """ + global ws_format + if fmt == "xml": + ws_format = fmt + set_parser() # set to default + elif fmt == "json": + ws_format = fmt + warn("The json format is non-official and may change at any time") + set_parser(json.loads) + else: + raise ValueError("invalid format: %s" % fmt) + + +@_rate_limit +def _mb_request(path, method='GET', auth_required=AUTH_NO, + client_required=False, args=None, data=None, body=None): + """Makes a request for the specified `path` (endpoint) on /ws/2 on + the globally-specified hostname. Parses the responses and returns + the resulting object. `auth_required` and `client_required` control + whether exceptions should be raised if the username/password and + client are left unspecified, respectively. + """ + global parser_fun + + if args is None: + args = {} + else: + args = dict(args) or {} + + if _useragent == "": + raise UsageError("set a proper user-agent with " + "set_useragent(\"application name\", \"application version\", \"contact info (preferably URL or email for your application)\")") + + if client_required: + args["client"] = _client + + if ws_format != "xml": + args["fmt"] = ws_format + + # Convert args from a dictionary to a list of tuples + # so that the ordering of elements is stable for easy + # testing (in this case we order alphabetically) + # Encode Unicode arguments using UTF-8. + newargs = [] + for key, value in sorted(args.items()): + if isinstance(value, compat.unicode): + value = value.encode('utf8') + newargs.append((key, value)) + + # Construct the full URL for the request, including hostname and + # query string. + url = compat.urlunparse(( + 'https' if https else 'http', + hostname, + '/ws/2/%s' % path, + '', + compat.urlencode(newargs), + '' + )) + _log.debug("%s request for %s" % (method, url)) + + # Set up HTTP request handler and URL opener. + httpHandler = compat.HTTPHandler(debuglevel=0) + handlers = [httpHandler] + + # Add credentials if required. + add_auth = False + if auth_required == AUTH_YES: + _log.debug("Auth required for %s" % url) + if not user: + raise UsageError("authorization required; " + "use auth(user, pass) first") + add_auth = True + + if auth_required == AUTH_IFSET and user: + _log.debug("Using auth for %s because user and pass is set" % url) + add_auth = True + + if add_auth: + passwordMgr = _RedirectPasswordMgr() + authHandler = _DigestAuthHandler(passwordMgr) + authHandler.add_password("musicbrainz.org", (), user, password) + handlers.append(authHandler) + + opener = compat.build_opener(*handlers) + + # Make request. + req = _MusicbrainzHttpRequest(method, url, data) + req.add_header('User-Agent', _useragent) + _log.debug("requesting with UA %s" % _useragent) + if body: + req.add_header('Content-Type', 'application/xml; charset=UTF-8') + elif not data and not req.has_header('Content-Length'): + # Explicitly indicate zero content length if no request data + # will be sent (avoids HTTP 411 error). + req.add_header('Content-Length', '0') + resp = _safe_read(opener, req, body) + + return parser_fun(resp) + +def _get_auth_type(entity, id, includes): + """ Some calls require authentication. This returns + True if a call does, False otherwise + """ + if "user-tags" in includes or "user-ratings" in includes: + return AUTH_YES + elif entity.startswith("collection"): + if not id: + return AUTH_YES + else: + return AUTH_IFSET + else: + return AUTH_NO + +def _do_mb_query(entity, id, includes=[], params={}): + """Make a single GET call to the MusicBrainz XML API. `entity` is a + string indicated the type of object to be retrieved. The id may be + empty, in which case the query is a search. `includes` is a list + of strings that must be valid includes for the entity type. `params` + is a dictionary of additional parameters for the API call. The + response is parsed and returned. + """ + # Build arguments. + if not isinstance(includes, list): + includes = [includes] + _check_includes(entity, includes) + auth_required = _get_auth_type(entity, id, includes) + args = dict(params) + if len(includes) > 0: + inc = " ".join(includes) + args["inc"] = inc + + # Build the endpoint components. + path = '%s/%s' % (entity, id) + return _mb_request(path, 'GET', auth_required, args=args) + +def _do_mb_search(entity, query='', fields={}, + limit=None, offset=None, strict=False): + """Perform a full-text search on the MusicBrainz search server. + `query` is a lucene query string when no fields are set, + but is escaped when any fields are given. `fields` is a dictionary + of key/value query parameters. They keys in `fields` must be valid + for the given entity type. + """ + # Encode the query terms as a Lucene query string. + query_parts = [] + if query: + clean_query = util._unicode(query) + if fields: + clean_query = re.sub(LUCENE_SPECIAL, r'\\\1', + clean_query) + if strict: + query_parts.append('"%s"' % clean_query) + else: + query_parts.append(clean_query.lower()) + else: + query_parts.append(clean_query) + for key, value in fields.items(): + # Ensure this is a valid search field. + if key not in VALID_SEARCH_FIELDS[entity]: + raise InvalidSearchFieldError( + '%s is not a valid search field for %s' % (key, entity) + ) + + # Escape Lucene's special characters. + value = util._unicode(value) + value = re.sub(LUCENE_SPECIAL, r'\\\1', value) + if value: + if strict: + query_parts.append('%s:"%s"' % (key, value)) + else: + value = value.lower() # avoid AND / OR + query_parts.append('%s:(%s)' % (key, value)) + if strict: + full_query = ' AND '.join(query_parts).strip() + else: + full_query = ' '.join(query_parts).strip() + + if not full_query: + raise ValueError('at least one query term is required') + + # Additional parameters to the search. + params = {'query': full_query} + if limit: + params['limit'] = str(limit) + if offset: + params['offset'] = str(offset) + + return _do_mb_query(entity, '', [], params) + +def _do_mb_delete(path): + """Send a DELETE request for the specified object. + """ + return _mb_request(path, 'DELETE', AUTH_YES, True) + +def _do_mb_put(path): + """Send a PUT request for the specified object. + """ + return _mb_request(path, 'PUT', AUTH_YES, True) + +def _do_mb_post(path, body): + """Perform a single POST call for an endpoint with a specified + request body. + """ + return _mb_request(path, 'POST', AUTH_YES, True, body=body) + + +# The main interface! + +# Single entity by ID + +@_docstring_get("area") +def get_area_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the area with the MusicBrainz `id` as a dict with an 'area' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("area", includes, + release_status, release_type) + return _do_mb_query("area", id, includes, params) + +@_docstring_get("artist") +def get_artist_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the artist with the MusicBrainz `id` as a dict with an 'artist' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("artist", includes, + release_status, release_type) + return _do_mb_query("artist", id, includes, params) + +@_docstring_get("instrument") +def get_instrument_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the instrument with the MusicBrainz `id` as a dict with an 'artist' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("instrument", includes, + release_status, release_type) + return _do_mb_query("instrument", id, includes, params) + +@_docstring_get("label") +def get_label_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the label with the MusicBrainz `id` as a dict with a 'label' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("label", includes, + release_status, release_type) + return _do_mb_query("label", id, includes, params) + +@_docstring_get("place") +def get_place_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the place with the MusicBrainz `id` as a dict with an 'place' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("place", includes, + release_status, release_type) + return _do_mb_query("place", id, includes, params) + +@_docstring_get("event") +def get_event_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the event with the MusicBrainz `id` as a dict with an 'event' key. + + The event dict has the following keys: + `id`, `type`, `name`, `time`, `disambiguation` and `life-span`. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("event", includes, + release_status, release_type) + return _do_mb_query("event", id, includes, params) + +@_docstring_get("recording") +def get_recording_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the recording with the MusicBrainz `id` as a dict + with a 'recording' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("recording", includes, + release_status, release_type) + return _do_mb_query("recording", id, includes, params) + +@_docstring_get("release") +def get_release_by_id(id, includes=[], release_status=[], release_type=[]): + """Get the release with the MusicBrainz `id` as a dict with a 'release' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("release", includes, + release_status, release_type) + return _do_mb_query("release", id, includes, params) + +@_docstring_get("release-group") +def get_release_group_by_id(id, includes=[], + release_status=[], release_type=[]): + """Get the release group with the MusicBrainz `id` as a dict + with a 'release-group' key. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("release-group", includes, + release_status, release_type) + return _do_mb_query("release-group", id, includes, params) + +@_docstring_get("series") +def get_series_by_id(id, includes=[]): + """Get the series with the MusicBrainz `id` as a dict with a 'series' key. + + *Available includes*: {includes}""" + return _do_mb_query("series", id, includes) + +@_docstring_get("work") +def get_work_by_id(id, includes=[]): + """Get the work with the MusicBrainz `id` as a dict with a 'work' key. + + *Available includes*: {includes}""" + return _do_mb_query("work", id, includes) + +@_docstring_get("url") +def get_url_by_id(id, includes=[]): + """Get the url with the MusicBrainz `id` as a dict with a 'url' key. + + *Available includes*: {includes}""" + return _do_mb_query("url", id, includes) + + +# Searching + +@_docstring_search("annotation") +def search_annotations(query='', limit=None, offset=None, strict=False, **fields): + """Search for annotations and return a dict with an 'annotation-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('annotation', query, fields, limit, offset, strict) + +@_docstring_search("area") +def search_areas(query='', limit=None, offset=None, strict=False, **fields): + """Search for areas and return a dict with an 'area-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('area', query, fields, limit, offset, strict) + +@_docstring_search("artist") +def search_artists(query='', limit=None, offset=None, strict=False, **fields): + """Search for artists and return a dict with an 'artist-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('artist', query, fields, limit, offset, strict) + +@_docstring_search("event") +def search_events(query='', limit=None, offset=None, strict=False, **fields): + """Search for events and return a dict with an 'event-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('event', query, fields, limit, offset, strict) + +@_docstring_search("instrument") +def search_instruments(query='', limit=None, offset=None, strict=False, **fields): + """Search for instruments and return a dict with a 'instrument-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('instrument', query, fields, limit, offset, strict) + +@_docstring_search("label") +def search_labels(query='', limit=None, offset=None, strict=False, **fields): + """Search for labels and return a dict with a 'label-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('label', query, fields, limit, offset, strict) + +@_docstring_search("place") +def search_places(query='', limit=None, offset=None, strict=False, **fields): + """Search for places and return a dict with a 'place-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('place', query, fields, limit, offset, strict) + +@_docstring_search("recording") +def search_recordings(query='', limit=None, offset=None, + strict=False, **fields): + """Search for recordings and return a dict with a 'recording-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('recording', query, fields, limit, offset, strict) + +@_docstring_search("release") +def search_releases(query='', limit=None, offset=None, strict=False, **fields): + """Search for recordings and return a dict with a 'recording-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('release', query, fields, limit, offset, strict) + +@_docstring_search("release-group") +def search_release_groups(query='', limit=None, offset=None, + strict=False, **fields): + """Search for release groups and return a dict + with a 'release-group-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('release-group', query, fields, limit, offset, strict) + +@_docstring_search("series") +def search_series(query='', limit=None, offset=None, strict=False, **fields): + """Search for series and return a dict with a 'series-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('series', query, fields, limit, offset, strict) + +@_docstring_search("work") +def search_works(query='', limit=None, offset=None, strict=False, **fields): + """Search for works and return a dict with a 'work-list' key. + + *Available search fields*: {fields}""" + return _do_mb_search('work', query, fields, limit, offset, strict) + + +# Lists of entities +@_docstring_get("discid") +def get_releases_by_discid(id, includes=[], toc=None, cdstubs=True, media_format=None): + """Search for releases with a :musicbrainz:`Disc ID` or table of contents. + + When a `toc` is provided and no release with the disc ID is found, + a fuzzy search by the toc is done. + The `toc` should have to same format as :attr:`discid.Disc.toc_string`. + When a `toc` is provided, the format of the discid itself is not + checked server-side, so any value may be passed if searching by only + `toc` is desired. + + If no toc matches in musicbrainz but a :musicbrainz:`CD Stub` does, + the CD Stub will be returned. Prevent this from happening by + passing `cdstubs=False`. + + By default only results that match a format that allows discids + (e.g. CD) are included. To include all media formats, pass + `media_format='all'`. + + The result is a dict with either a 'disc' , a 'cdstub' key + or a 'release-list' (fuzzy match with TOC). + A 'disc' has an 'offset-count', an 'offset-list' and a 'release-list'. + A 'cdstub' key has direct 'artist' and 'title' keys. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("discid", includes, release_status=[], + release_type=[]) + if toc: + params["toc"] = toc + if not cdstubs: + params["cdstubs"] = "no" + if media_format: + params["media-format"] = media_format + return _do_mb_query("discid", id, includes, params) + + +@_docstring_get("recording") +def get_recordings_by_isrc(isrc, includes=[], release_status=[], + release_type=[]): + """Search for recordings with an :musicbrainz:`ISRC`. + The result is a dict with an 'isrc' key, + which again includes a 'recording-list'. + + *Available includes*: {includes}""" + params = _check_filter_and_make_params("isrc", includes, + release_status, release_type) + return _do_mb_query("isrc", isrc, includes, params) + +@_docstring_get("work") +def get_works_by_iswc(iswc, includes=[]): + """Search for works with an :musicbrainz:`ISWC`. + The result is a dict with a`work-list`. + + *Available includes*: {includes}""" + return _do_mb_query("iswc", iswc, includes) + + +def _browse_impl(entity, includes, limit, offset, params, release_status=[], release_type=[]): + includes = includes if isinstance(includes, list) else [includes] + valid_includes = VALID_BROWSE_INCLUDES[entity] + _check_includes_impl(includes, valid_includes) + p = {} + for k,v in params.items(): + if v: + p[k] = v + if len(p) > 1: + raise Exception("Can't have more than one of " + ", ".join(params.keys())) + if limit: p["limit"] = limit + if offset: p["offset"] = offset + filterp = _check_filter_and_make_params(entity, includes, release_status, release_type) + p.update(filterp) + return _do_mb_query(entity, "", includes, p) + +# Browse methods +# Browse include are a subset of regular get includes, so we check them here +# and the test in _do_mb_query will pass anyway. +@_docstring_browse("artist") +def browse_artists(recording=None, release=None, release_group=None, + work=None, includes=[], limit=None, offset=None): + """Get all artists linked to a recording, a release or a release group. + You need to give one MusicBrainz ID. + + *Available includes*: {includes}""" + params = {"recording": recording, + "release": release, + "release-group": release_group, + "work": work} + return _browse_impl("artist", includes, limit, offset, params) + +@_docstring_browse("event") +def browse_events(area=None, artist=None, place=None, + includes=[], limit=None, offset=None): + """Get all events linked to a area, a artist or a place. + You need to give one MusicBrainz ID. + + *Available includes*: {includes}""" + params = {"area": area, + "artist": artist, + "place": place} + return _browse_impl("event", includes, limit, offset, params) + +@_docstring_browse("label") +def browse_labels(release=None, includes=[], limit=None, offset=None): + """Get all labels linked to a relase. You need to give a MusicBrainz ID. + + *Available includes*: {includes}""" + params = {"release": release} + return _browse_impl("label", includes, limit, offset, params) + +@_docstring_browse("place") +def browse_places(area=None, includes=[], limit=None, offset=None): + """Get all places linked to an area. You need to give a MusicBrainz ID. + + *Available includes*: {includes}""" + params = {"area": area} + return _browse_impl("place", includes, limit, offset, params) + +@_docstring_browse("recording") +def browse_recordings(artist=None, release=None, includes=[], + limit=None, offset=None): + """Get all recordings linked to an artist or a release. + You need to give one MusicBrainz ID. + + *Available includes*: {includes}""" + params = {"artist": artist, + "release": release} + return _browse_impl("recording", includes, limit, offset, params) + +@_docstring_browse("release") +def browse_releases(artist=None, track_artist=None, label=None, recording=None, + release_group=None, release_status=[], release_type=[], + includes=[], limit=None, offset=None): + """Get all releases linked to an artist, a label, a recording + or a release group. You need to give one MusicBrainz ID. + + You can also browse by `track_artist`, which gives all releases where some + tracks are attributed to that artist, but not the whole release. + + You can filter by :data:`musicbrainz.VALID_RELEASE_TYPES` or + :data:`musicbrainz.VALID_RELEASE_STATUSES`. + + *Available includes*: {includes}""" + # track_artist param doesn't work yet + params = {"artist": artist, + "track_artist": track_artist, + "label": label, + "recording": recording, + "release-group": release_group} + return _browse_impl("release", includes, limit, offset, + params, release_status, release_type) + +@_docstring_browse("release-group") +def browse_release_groups(artist=None, release=None, release_type=[], + includes=[], limit=None, offset=None): + """Get all release groups linked to an artist or a release. + You need to give one MusicBrainz ID. + + You can filter by :data:`musicbrainz.VALID_RELEASE_TYPES`. + + *Available includes*: {includes}""" + params = {"artist": artist, + "release": release} + return _browse_impl("release-group", includes, limit, + offset, params, [], release_type) + +@_docstring_browse("url") +def browse_urls(resource=None, includes=[], limit=None, offset=None): + """Get urls by actual URL string. + You need to give a URL string as 'resource' + + *Available includes*: {includes}""" + params = {"resource": resource} + return _browse_impl("url", includes, limit, offset, params) + +@_docstring_browse("work") +def browse_works(artist=None, includes=[], limit=None, offset=None): + """Get all works linked to an artist + + *Available includes*: {includes}""" + params = {"artist": artist} + return _browse_impl("work", includes, limit, offset, params) + +# Collections +def get_collections(): + """List the collections for the currently :func:`authenticated ` user + as a dict with a 'collection-list' key.""" + # Missing the count in the reply + return _do_mb_query("collection", '') + +def _do_collection_query(collection, collection_type, limit, offset): + params = {} + if limit: params["limit"] = limit + if offset: params["offset"] = offset + return _do_mb_query("collection", "%s/%s" % (collection, collection_type), [], params) + +def get_artists_in_collection(collection, limit=None, offset=None): + """List the artists in a collection. + Returns a dict with a 'collection' key, which again has a 'artist-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "artists", limit, offset) + +def get_releases_in_collection(collection, limit=None, offset=None): + """List the releases in a collection. + Returns a dict with a 'collection' key, which again has a 'release-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "releases", limit, offset) + +def get_events_in_collection(collection, limit=None, offset=None): + """List the events in a collection. + Returns a dict with a 'collection' key, which again has a 'event-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "events", limit, offset) + +def get_places_in_collection(collection, limit=None, offset=None): + """List the places in a collection. + Returns a dict with a 'collection' key, which again has a 'place-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "places", limit, offset) + +def get_recordings_in_collection(collection, limit=None, offset=None): + """List the recordings in a collection. + Returns a dict with a 'collection' key, which again has a 'recording-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "recordings", limit, offset) + +def get_works_in_collection(collection, limit=None, offset=None): + """List the works in a collection. + Returns a dict with a 'collection' key, which again has a 'work-list'. + + See `Browsing`_ for how to use `limit` and `offset`. + """ + return _do_collection_query(collection, "works", limit, offset) + + +# Submission methods + +def submit_barcodes(release_barcode): + """Submits a set of {release_id1: barcode, ...}""" + query = mbxml.make_barcode_request(release_barcode) + return _do_mb_post("release", query) + + +def submit_isrcs(recording_isrcs): + """Submit ISRCs. + Submits a set of {recording-id1: [isrc1, ...], ...} + or {recording_id1: isrc, ...}. + """ + rec2isrcs = dict() + for (rec, isrcs) in recording_isrcs.items(): + rec2isrcs[rec] = isrcs if isinstance(isrcs, list) else [isrcs] + query = mbxml.make_isrc_request(rec2isrcs) + return _do_mb_post("recording", query) + +def submit_tags(**kwargs): + """Submit user tags. + Takes parameters named e.g. 'artist_tags', 'recording_tags', etc., + and of the form: + {entity_id1: [tag1, ...], ...} + If you only have one tag for an entity you can use a string instead + of a list. + + The user's tags for each entity will be set to that list, adding or + removing tags as necessary. Submitting an empty list for an entity + will remove all tags for that entity by the user. + """ + for k, v in kwargs.items(): + for id, tags in v.items(): + kwargs[k][id] = tags if isinstance(tags, list) else [tags] + + query = mbxml.make_tag_request(**kwargs) + return _do_mb_post("tag", query) + +def submit_ratings(**kwargs): + """Submit user ratings. + Takes parameters named e.g. 'artist_ratings', 'recording_ratings', etc., + and of the form: + {entity_id1: rating, ...} + + Ratings are numbers from 0-100, at intervals of 20 (20 per 'star'). + Submitting a rating of 0 will remove the user's rating. + """ + query = mbxml.make_rating_request(**kwargs) + return _do_mb_post("rating", query) + +def add_releases_to_collection(collection, releases=[]): + """Add releases to a collection. + Collection and releases should be identified by their MBIDs + """ + # XXX: Maximum URI length of 16kb means we should only allow ~400 releases + releaselist = ";".join(releases) + return _do_mb_put("collection/%s/releases/%s" % (collection, releaselist)) + +def remove_releases_from_collection(collection, releases=[]): + """Remove releases from a collection. + Collection and releases should be identified by their MBIDs + """ + releaselist = ";".join(releases) + return _do_mb_delete("collection/%s/releases/%s" % (collection, releaselist)) diff --git a/lib/musicbrainzngs/util.py b/lib/musicbrainzngs/util.py new file mode 100644 index 00000000..37316f53 --- /dev/null +++ b/lib/musicbrainzngs/util.py @@ -0,0 +1,44 @@ +# This file is part of the musicbrainzngs library +# Copyright (C) Alastair Porter, Adrian Sampson, and others +# This file is distributed under a BSD-2-Clause type license. +# See the COPYING file for more information. + +import sys +import locale +import xml.etree.ElementTree as ET + +from . import compat + +def _unicode(string, encoding=None): + """Try to decode byte strings to unicode. + This can only be a guess, but this might be better than failing. + It is safe to use this on numbers or strings that are already unicode. + """ + if isinstance(string, compat.unicode): + unicode_string = string + elif isinstance(string, compat.bytes): + # use given encoding, stdin, preferred until something != None is found + if encoding is None: + encoding = sys.stdin.encoding + if encoding is None: + encoding = locale.getpreferredencoding() + unicode_string = string.decode(encoding, "ignore") + else: + unicode_string = compat.unicode(string) + return unicode_string.replace('\x00', '').strip() + +def bytes_to_elementtree(bytes_or_file): + """Given a bytestring or a file-like object that will produce them, + parse and return an ElementTree. + """ + if isinstance(bytes_or_file, compat.basestring): + s = bytes_or_file + else: + s = bytes_or_file.read() + + if compat.is_py3: + s = _unicode(s, "utf-8") + + f = compat.StringIO(s) + tree = ET.ElementTree(file=f) + return tree