Initial Commit

This commit is contained in:
Tim 2015-02-22 18:32:50 +02:00
commit 88daa3fb91
1311 changed files with 256240 additions and 0 deletions

965
lib/mutagen/mp4/__init__.py Normal file
View file

@ -0,0 +1,965 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Read and write MPEG-4 audio files with iTunes metadata.
This module will read MPEG-4 audio information and metadata,
as found in Apple's MP4 (aka M4A, M4B, M4P) files.
There is no official specification for this format. The source code
for TagLib, FAAD, and various MPEG specifications at
* http://developer.apple.com/documentation/QuickTime/QTFF/
* http://www.geocities.com/xhelmboyx/quicktime/formats/mp4-layout.txt
* http://standards.iso.org/ittf/PubliclyAvailableStandards/\
c041828_ISO_IEC_14496-12_2005(E).zip
* http://wiki.multimedia.cx/index.php?title=Apple_QuickTime
were all consulted.
"""
import struct
import sys
from mutagen import FileType, Metadata, StreamInfo
from mutagen._constants import GENRES
from mutagen._util import (cdata, insert_bytes, DictProxy, MutagenError,
hashable, enum)
from mutagen._compat import (reraise, PY2, string_types, text_type, chr_,
iteritems, PY3, cBytesIO)
from ._atom import Atoms, Atom, AtomError
from ._util import parse_full_atom
from ._as_entry import AudioSampleEntry, ASEntryError
class error(IOError, MutagenError):
pass
class MP4MetadataError(error):
pass
class MP4StreamInfoError(error):
pass
class MP4MetadataValueError(ValueError, MP4MetadataError):
pass
__all__ = ['MP4', 'Open', 'delete', 'MP4Cover', 'MP4FreeForm', 'AtomDataType']
@enum
class AtomDataType(object):
"""Enum for `dataformat` attribute of MP4FreeForm.
.. versionadded:: 1.25
"""
IMPLICIT = 0
"""for use with tags for which no type needs to be indicated because
only one type is allowed"""
UTF8 = 1
"""without any count or null terminator"""
UTF16 = 2
"""also known as UTF-16BE"""
SJIS = 3
"""deprecated unless it is needed for special Japanese characters"""
HTML = 6
"""the HTML file header specifies which HTML version"""
XML = 7
"""the XML header must identify the DTD or schemas"""
UUID = 8
"""also known as GUID; stored as 16 bytes in binary (valid as an ID)"""
ISRC = 9
"""stored as UTF-8 text (valid as an ID)"""
MI3P = 10
"""stored as UTF-8 text (valid as an ID)"""
GIF = 12
"""(deprecated) a GIF image"""
JPEG = 13
"""a JPEG image"""
PNG = 14
"""PNG image"""
URL = 15
"""absolute, in UTF-8 characters"""
DURATION = 16
"""in milliseconds, 32-bit integer"""
DATETIME = 17
"""in UTC, counting seconds since midnight, January 1, 1904;
32 or 64-bits"""
GENRES = 18
"""a list of enumerated values"""
INTEGER = 21
"""a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"""
RIAA_PA = 24
"""RIAA parental advisory; { -1=no, 1=yes, 0=unspecified },
8-bit ingteger"""
UPC = 25
"""Universal Product Code, in text UTF-8 format (valid as an ID)"""
BMP = 27
"""Windows bitmap image"""
@hashable
class MP4Cover(bytes):
"""A cover artwork.
Attributes:
* imageformat -- format of the image (either FORMAT_JPEG or FORMAT_PNG)
"""
FORMAT_JPEG = AtomDataType.JPEG
FORMAT_PNG = AtomDataType.PNG
def __new__(cls, data, *args, **kwargs):
return bytes.__new__(cls, data)
def __init__(self, data, imageformat=FORMAT_JPEG):
self.imageformat = imageformat
__hash__ = bytes.__hash__
def __eq__(self, other):
if not isinstance(other, MP4Cover):
return NotImplemented
if not bytes.__eq__(self, other):
return False
if self.imageformat != other.imageformat:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return "%s(%r, %r)" % (
type(self).__name__, bytes(self),
AtomDataType(self.imageformat))
@hashable
class MP4FreeForm(bytes):
"""A freeform value.
Attributes:
* dataformat -- format of the data (see AtomDataType)
"""
FORMAT_DATA = AtomDataType.IMPLICIT # deprecated
FORMAT_TEXT = AtomDataType.UTF8 # deprecated
def __new__(cls, data, *args, **kwargs):
return bytes.__new__(cls, data)
def __init__(self, data, dataformat=AtomDataType.UTF8, version=0):
self.dataformat = dataformat
self.version = version
__hash__ = bytes.__hash__
def __eq__(self, other):
if not isinstance(other, MP4FreeForm):
return NotImplemented
if not bytes.__eq__(self, other):
return False
if self.dataformat != other.dataformat:
return False
if self.version != other.version:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return "%s(%r, %r)" % (
type(self).__name__, bytes(self),
AtomDataType(self.dataformat))
def _name2key(name):
if PY2:
return name
return name.decode("latin-1")
def _key2name(key):
if PY2:
return key
return key.encode("latin-1")
class MP4Tags(DictProxy, Metadata):
r"""Dictionary containing Apple iTunes metadata list key/values.
Keys are four byte identifiers, except for freeform ('----')
keys. Values are usually unicode strings, but some atoms have a
special structure:
Text values (multiple values per key are supported):
* '\\xa9nam' -- track title
* '\\xa9alb' -- album
* '\\xa9ART' -- artist
* 'aART' -- album artist
* '\\xa9wrt' -- composer
* '\\xa9day' -- year
* '\\xa9cmt' -- comment
* 'desc' -- description (usually used in podcasts)
* 'purd' -- purchase date
* '\\xa9grp' -- grouping
* '\\xa9gen' -- genre
* '\\xa9lyr' -- lyrics
* 'purl' -- podcast URL
* 'egid' -- podcast episode GUID
* 'catg' -- podcast category
* 'keyw' -- podcast keywords
* '\\xa9too' -- encoded by
* 'cprt' -- copyright
* 'soal' -- album sort order
* 'soaa' -- album artist sort order
* 'soar' -- artist sort order
* 'sonm' -- title sort order
* 'soco' -- composer sort order
* 'sosn' -- show sort order
* 'tvsh' -- show name
Boolean values:
* 'cpil' -- part of a compilation
* 'pgap' -- part of a gapless album
* 'pcst' -- podcast (iTunes reads this only on import)
Tuples of ints (multiple values per key are supported):
* 'trkn' -- track number, total tracks
* 'disk' -- disc number, total discs
Others:
* 'tmpo' -- tempo/BPM, 16 bit int
* 'covr' -- cover artwork, list of MP4Cover objects (which are
tagged strs)
* 'gnre' -- ID3v1 genre. Not supported, use '\\xa9gen' instead.
The freeform '----' frames use a key in the format '----:mean:name'
where 'mean' is usually 'com.apple.iTunes' and 'name' is a unique
identifier for this frame. The value is a str, but is probably
text that can be decoded as UTF-8. Multiple values per key are
supported.
MP4 tag data cannot exist outside of the structure of an MP4 file,
so this class should not be manually instantiated.
Unknown non-text tags and tags that failed to parse will be written
back as is.
"""
def __init__(self, *args, **kwargs):
self._failed_atoms = {}
super(MP4Tags, self).__init__(*args, **kwargs)
def load(self, atoms, fileobj):
try:
ilst = atoms[b"moov.udta.meta.ilst"]
except KeyError as key:
raise MP4MetadataError(key)
for atom in ilst.children:
ok, data = atom.read(fileobj)
if not ok:
raise MP4MetadataError("Not enough data")
try:
if atom.name in self.__atoms:
info = self.__atoms[atom.name]
info[0](self, atom, data)
else:
# unknown atom, try as text
self.__parse_text(atom, data, implicit=False)
except MP4MetadataError:
# parsing failed, save them so we can write them back
key = _name2key(atom.name)
self._failed_atoms.setdefault(key, []).append(data)
def __setitem__(self, key, value):
if not isinstance(key, str):
raise TypeError("key has to be str")
super(MP4Tags, self).__setitem__(key, value)
@classmethod
def _can_load(cls, atoms):
return b"moov.udta.meta.ilst" in atoms
@staticmethod
def __key_sort(item):
(key, v) = item
# iTunes always writes the tags in order of "relevance", try
# to copy it as closely as possible.
order = [b"\xa9nam", b"\xa9ART", b"\xa9wrt", b"\xa9alb",
b"\xa9gen", b"gnre", b"trkn", b"disk",
b"\xa9day", b"cpil", b"pgap", b"pcst", b"tmpo",
b"\xa9too", b"----", b"covr", b"\xa9lyr"]
order = dict(zip(order, range(len(order))))
last = len(order)
# If there's no key-based way to distinguish, order by length.
# If there's still no way, go by string comparison on the
# values, so we at least have something determinstic.
return (order.get(key[:4], last), len(repr(v)), repr(v))
def save(self, filename):
"""Save the metadata to the given filename."""
values = []
items = sorted(self.items(), key=self.__key_sort)
for key, value in items:
atom_name = _key2name(key)[:4]
if atom_name in self.__atoms:
render_func = self.__atoms[atom_name][1]
else:
render_func = type(self).__render_text
try:
values.append(render_func(self, key, value))
except (TypeError, ValueError) as s:
reraise(MP4MetadataValueError, s, sys.exc_info()[2])
for atom_name, failed in iteritems(self._failed_atoms):
# don't write atoms back if we have added a new one with
# the same name, this excludes freeform which can have
# multiple atoms with the same key (most parsers seem to be able
# to handle that)
if atom_name in self:
assert atom_name != b"----"
continue
for data in failed:
values.append(Atom.render(_key2name(atom_name), data))
data = Atom.render(b"ilst", b"".join(values))
# Find the old atoms.
with open(filename, "rb+") as fileobj:
try:
atoms = Atoms(fileobj)
except AtomError as err:
reraise(error, err, sys.exc_info()[2])
try:
path = atoms.path(b"moov", b"udta", b"meta", b"ilst")
except KeyError:
self.__save_new(fileobj, atoms, data)
else:
self.__save_existing(fileobj, atoms, path, data)
def __pad_ilst(self, data, length=None):
if length is None:
length = ((len(data) + 1023) & ~1023) - len(data)
return Atom.render(b"free", b"\x00" * length)
def __save_new(self, fileobj, atoms, ilst):
hdlr = Atom.render(b"hdlr", b"\x00" * 8 + b"mdirappl" + b"\x00" * 9)
meta = Atom.render(
b"meta", b"\x00\x00\x00\x00" + hdlr + ilst + self.__pad_ilst(ilst))
try:
path = atoms.path(b"moov", b"udta")
except KeyError:
# moov.udta not found -- create one
path = atoms.path(b"moov")
meta = Atom.render(b"udta", meta)
offset = path[-1].offset + 8
insert_bytes(fileobj, len(meta), offset)
fileobj.seek(offset)
fileobj.write(meta)
self.__update_parents(fileobj, path, len(meta))
self.__update_offsets(fileobj, atoms, len(meta), offset)
def __save_existing(self, fileobj, atoms, path, data):
# Replace the old ilst atom.
ilst = path.pop()
offset = ilst.offset
length = ilst.length
# Check for padding "free" atoms
meta = path[-1]
index = meta.children.index(ilst)
try:
prev = meta.children[index - 1]
if prev.name == b"free":
offset = prev.offset
length += prev.length
except IndexError:
pass
try:
next = meta.children[index + 1]
if next.name == b"free":
length += next.length
except IndexError:
pass
delta = len(data) - length
if delta > 0 or (delta < 0 and delta > -8):
data += self.__pad_ilst(data)
delta = len(data) - length
insert_bytes(fileobj, delta, offset)
elif delta < 0:
data += self.__pad_ilst(data, -delta - 8)
delta = 0
fileobj.seek(offset)
fileobj.write(data)
self.__update_parents(fileobj, path, delta)
self.__update_offsets(fileobj, atoms, delta, offset)
def __update_parents(self, fileobj, path, delta):
"""Update all parent atoms with the new size."""
for atom in path:
fileobj.seek(atom.offset)
size = cdata.uint_be(fileobj.read(4))
if size == 1: # 64bit
# skip name (4B) and read size (8B)
size = cdata.ulonglong_be(fileobj.read(12)[4:])
fileobj.seek(atom.offset + 8)
fileobj.write(cdata.to_ulonglong_be(size + delta))
else: # 32bit
fileobj.seek(atom.offset)
fileobj.write(cdata.to_uint_be(size + delta))
def __update_offset_table(self, fileobj, fmt, atom, delta, offset):
"""Update offset table in the specified atom."""
if atom.offset > offset:
atom.offset += delta
fileobj.seek(atom.offset + 12)
data = fileobj.read(atom.length - 12)
fmt = fmt % cdata.uint_be(data[:4])
offsets = struct.unpack(fmt, data[4:])
offsets = [o + (0, delta)[offset < o] for o in offsets]
fileobj.seek(atom.offset + 16)
fileobj.write(struct.pack(fmt, *offsets))
def __update_tfhd(self, fileobj, atom, delta, offset):
if atom.offset > offset:
atom.offset += delta
fileobj.seek(atom.offset + 9)
data = fileobj.read(atom.length - 9)
flags = cdata.uint_be(b"\x00" + data[:3])
if flags & 1:
o = cdata.ulonglong_be(data[7:15])
if o > offset:
o += delta
fileobj.seek(atom.offset + 16)
fileobj.write(cdata.to_ulonglong_be(o))
def __update_offsets(self, fileobj, atoms, delta, offset):
"""Update offset tables in all 'stco' and 'co64' atoms."""
if delta == 0:
return
moov = atoms[b"moov"]
for atom in moov.findall(b'stco', True):
self.__update_offset_table(fileobj, ">%dI", atom, delta, offset)
for atom in moov.findall(b'co64', True):
self.__update_offset_table(fileobj, ">%dQ", atom, delta, offset)
try:
for atom in atoms[b"moof"].findall(b'tfhd', True):
self.__update_tfhd(fileobj, atom, delta, offset)
except KeyError:
pass
def __parse_data(self, atom, data):
pos = 0
while pos < atom.length - 8:
head = data[pos:pos + 12]
if len(head) != 12:
raise MP4MetadataError("truncated atom % r" % atom.name)
length, name = struct.unpack(">I4s", head[:8])
version = ord(head[8:9])
flags = struct.unpack(">I", b"\x00" + head[9:12])[0]
if name != b"data":
raise MP4MetadataError(
"unexpected atom %r inside %r" % (name, atom.name))
chunk = data[pos + 16:pos + length]
if len(chunk) != length - 16:
raise MP4MetadataError("truncated atom % r" % atom.name)
yield version, flags, chunk
pos += length
def __add(self, key, value, single=False):
assert isinstance(key, str)
if single:
self[key] = value
else:
self.setdefault(key, []).extend(value)
def __render_data(self, key, version, flags, value):
return Atom.render(_key2name(key), b"".join([
Atom.render(
b"data", struct.pack(">2I", version << 24 | flags, 0) + data)
for data in value]))
def __parse_freeform(self, atom, data):
length = cdata.uint_be(data[:4])
mean = data[12:length]
pos = length
length = cdata.uint_be(data[pos:pos + 4])
name = data[pos + 12:pos + length]
pos += length
value = []
while pos < atom.length - 8:
length, atom_name = struct.unpack(">I4s", data[pos:pos + 8])
if atom_name != b"data":
raise MP4MetadataError(
"unexpected atom %r inside %r" % (atom_name, atom.name))
version = ord(data[pos + 8:pos + 8 + 1])
flags = struct.unpack(">I", b"\x00" + data[pos + 9:pos + 12])[0]
value.append(MP4FreeForm(data[pos + 16:pos + length],
dataformat=flags, version=version))
pos += length
key = _name2key(atom.name + b":" + mean + b":" + name)
self.__add(key, value)
def __render_freeform(self, key, value):
if isinstance(value, bytes):
value = [value]
dummy, mean, name = _key2name(key).split(b":", 2)
mean = struct.pack(">I4sI", len(mean) + 12, b"mean", 0) + mean
name = struct.pack(">I4sI", len(name) + 12, b"name", 0) + name
data = b""
for v in value:
flags = AtomDataType.UTF8
version = 0
if isinstance(v, MP4FreeForm):
flags = v.dataformat
version = v.version
data += struct.pack(
">I4s2I", len(v) + 16, b"data", version << 24 | flags, 0)
data += v
return Atom.render(b"----", mean + name + data)
def __parse_pair(self, atom, data):
key = _name2key(atom.name)
values = [struct.unpack(">2H", d[2:6]) for
version, flags, d in self.__parse_data(atom, data)]
self.__add(key, values)
def __render_pair(self, key, value):
data = []
for (track, total) in value:
if 0 <= track < 1 << 16 and 0 <= total < 1 << 16:
data.append(struct.pack(">4H", 0, track, total, 0))
else:
raise MP4MetadataValueError(
"invalid numeric pair %r" % ((track, total),))
return self.__render_data(key, 0, AtomDataType.IMPLICIT, data)
def __render_pair_no_trailing(self, key, value):
data = []
for (track, total) in value:
if 0 <= track < 1 << 16 and 0 <= total < 1 << 16:
data.append(struct.pack(">3H", 0, track, total))
else:
raise MP4MetadataValueError(
"invalid numeric pair %r" % ((track, total),))
return self.__render_data(key, 0, AtomDataType.IMPLICIT, data)
def __parse_genre(self, atom, data):
values = []
for version, flags, data in self.__parse_data(atom, data):
# version = 0, flags = 0
if len(data) != 2:
raise MP4MetadataValueError("invalid genre")
genre = cdata.short_be(data)
# Translate to a freeform genre.
try:
genre = GENRES[genre - 1]
except IndexError:
# this will make us write it back at least
raise MP4MetadataValueError("unknown genre")
values.append(genre)
key = _name2key(b"\xa9gen")
self.__add(key, values)
def __parse_tempo(self, atom, data):
values = []
for version, flags, data in self.__parse_data(atom, data):
# version = 0, flags = 0 or 21
if len(data) != 2:
raise MP4MetadataValueError("invalid tempo")
values.append(cdata.ushort_be(data))
key = _name2key(atom.name)
self.__add(key, values)
def __render_tempo(self, key, value):
try:
if len(value) == 0:
return self.__render_data(key, 0, AtomDataType.INTEGER, b"")
if (min(value) < 0) or (max(value) >= 2 ** 16):
raise MP4MetadataValueError(
"invalid 16 bit integers: %r" % value)
except TypeError:
raise MP4MetadataValueError(
"tmpo must be a list of 16 bit integers")
values = [cdata.to_ushort_be(v) for v in value]
return self.__render_data(key, 0, AtomDataType.INTEGER, values)
def __parse_bool(self, atom, data):
for version, flags, data in self.__parse_data(atom, data):
if len(data) != 1:
raise MP4MetadataValueError("invalid bool")
value = bool(ord(data))
key = _name2key(atom.name)
self.__add(key, value, single=True)
def __render_bool(self, key, value):
return self.__render_data(
key, 0, AtomDataType.INTEGER, [chr_(bool(value))])
def __parse_cover(self, atom, data):
values = []
pos = 0
while pos < atom.length - 8:
length, name, imageformat = struct.unpack(">I4sI",
data[pos:pos + 12])
if name != b"data":
if name == b"name":
pos += length
continue
raise MP4MetadataError(
"unexpected atom %r inside 'covr'" % name)
if imageformat not in (MP4Cover.FORMAT_JPEG, MP4Cover.FORMAT_PNG):
# Sometimes AtomDataType.IMPLICIT or simply wrong.
# In all cases it was jpeg, so default to it
imageformat = MP4Cover.FORMAT_JPEG
cover = MP4Cover(data[pos + 16:pos + length], imageformat)
values.append(cover)
pos += length
key = _name2key(atom.name)
self.__add(key, values)
def __render_cover(self, key, value):
atom_data = []
for cover in value:
try:
imageformat = cover.imageformat
except AttributeError:
imageformat = MP4Cover.FORMAT_JPEG
atom_data.append(Atom.render(
b"data", struct.pack(">2I", imageformat, 0) + cover))
return Atom.render(_key2name(key), b"".join(atom_data))
def __parse_text(self, atom, data, implicit=True):
# implicit = False, for parsing unknown atoms only take utf8 ones.
# For known ones we can assume the implicit are utf8 too.
values = []
for version, flags, atom_data in self.__parse_data(atom, data):
if implicit:
if flags not in (AtomDataType.IMPLICIT, AtomDataType.UTF8):
raise MP4MetadataError(
"Unknown atom type %r for %r" % (flags, atom.name))
else:
if flags != AtomDataType.UTF8:
raise MP4MetadataError(
"%r is not text, ignore" % atom.name)
try:
text = atom_data.decode("utf-8")
except UnicodeDecodeError as e:
raise MP4MetadataError("%s: %s" % (atom.name, e))
values.append(text)
key = _name2key(atom.name)
self.__add(key, values)
def __render_text(self, key, value, flags=AtomDataType.UTF8):
if isinstance(value, string_types):
value = [value]
encoded = []
for v in value:
if not isinstance(v, text_type):
if PY3:
raise TypeError("%r not str" % v)
v = v.decode("utf-8")
encoded.append(v.encode("utf-8"))
return self.__render_data(key, 0, flags, encoded)
def delete(self, filename):
"""Remove the metadata from the given filename."""
self._failed_atoms.clear()
self.clear()
self.save(filename)
__atoms = {
b"----": (__parse_freeform, __render_freeform),
b"trkn": (__parse_pair, __render_pair),
b"disk": (__parse_pair, __render_pair_no_trailing),
b"gnre": (__parse_genre, None),
b"tmpo": (__parse_tempo, __render_tempo),
b"cpil": (__parse_bool, __render_bool),
b"pgap": (__parse_bool, __render_bool),
b"pcst": (__parse_bool, __render_bool),
b"covr": (__parse_cover, __render_cover),
b"purl": (__parse_text, __render_text),
b"egid": (__parse_text, __render_text),
}
# these allow implicit flags and parse as text
for name in [b"\xa9nam", b"\xa9alb", b"\xa9ART", b"aART", b"\xa9wrt",
b"\xa9day", b"\xa9cmt", b"desc", b"purd", b"\xa9grp",
b"\xa9gen", b"\xa9lyr", b"catg", b"keyw", b"\xa9too",
b"cprt", b"soal", b"soaa", b"soar", b"sonm", b"soco",
b"sosn", b"tvsh"]:
__atoms[name] = (__parse_text, __render_text)
def pprint(self):
values = []
for key, value in iteritems(self):
if not isinstance(key, text_type):
key = key.decode("latin-1")
if key == "covr":
values.append("%s=%s" % (key, ", ".join(
["[%d bytes of data]" % len(data) for data in value])))
elif isinstance(value, list):
values.append("%s=%s" %
(key, " / ".join(map(text_type, value))))
else:
values.append("%s=%s" % (key, value))
return "\n".join(values)
class MP4Info(StreamInfo):
"""MPEG-4 stream information.
Attributes:
* bitrate -- bitrate in bits per second, as an int
* length -- file length in seconds, as a float
* channels -- number of audio channels
* sample_rate -- audio sampling rate in Hz
* bits_per_sample -- bits per sample
* codec (string):
* if starting with ``"mp4a"`` uses an mp4a audio codec
(see the codec parameter in rfc6381 for details e.g. ``"mp4a.40.2"``)
* for everything else see a list of possible values at
http://www.mp4ra.org/codecs.html
e.g. ``"mp4a"``, ``"alac"``, ``"mp4a.40.2"``, ``"ac-3"`` etc.
* codec_description (string):
Name of the codec used (ALAC, AAC LC, AC-3...). Values might change in
the future, use for display purposes only.
"""
bitrate = 0
channels = 0
sample_rate = 0
bits_per_sample = 0
codec = u""
codec_name = u""
def __init__(self, atoms, fileobj):
try:
moov = atoms[b"moov"]
except KeyError:
raise MP4StreamInfoError("not a MP4 file")
for trak in moov.findall(b"trak"):
hdlr = trak[b"mdia", b"hdlr"]
ok, data = hdlr.read(fileobj)
if not ok:
raise MP4StreamInfoError("Not enough data")
if data[8:12] == b"soun":
break
else:
raise MP4StreamInfoError("track has no audio data")
mdhd = trak[b"mdia", b"mdhd"]
ok, data = mdhd.read(fileobj)
if not ok:
raise MP4StreamInfoError("Not enough data")
try:
version, flags, data = parse_full_atom(data)
except ValueError as e:
raise MP4StreamInfoError(e)
if version == 0:
offset = 8
fmt = ">2I"
elif version == 1:
offset = 16
fmt = ">IQ"
else:
raise MP4StreamInfoError("Unknown mdhd version %d" % version)
end = offset + struct.calcsize(fmt)
unit, length = struct.unpack(fmt, data[offset:end])
try:
self.length = float(length) / unit
except ZeroDivisionError:
self.length = 0
try:
atom = trak[b"mdia", b"minf", b"stbl", b"stsd"]
except KeyError:
pass
else:
self._parse_stsd(atom, fileobj)
def _parse_stsd(self, atom, fileobj):
"""Sets channels, bits_per_sample, sample_rate and optionally bitrate.
Can raise MP4StreamInfoError.
"""
assert atom.name == b"stsd"
ok, data = atom.read(fileobj)
if not ok:
raise MP4StreamInfoError("Invalid stsd")
try:
version, flags, data = parse_full_atom(data)
except ValueError as e:
raise MP4StreamInfoError(e)
if version != 0:
raise MP4StreamInfoError("Unsupported stsd version")
try:
num_entries, offset = cdata.uint32_be_from(data, 0)
except cdata.error as e:
raise MP4StreamInfoError(e)
if num_entries == 0:
return
# look at the first entry if there is one
entry_fileobj = cBytesIO(data[offset:])
try:
entry_atom = Atom(entry_fileobj)
except AtomError as e:
raise MP4StreamInfoError(e)
try:
entry = AudioSampleEntry(entry_atom, entry_fileobj)
except ASEntryError as e:
raise MP4StreamInfoError(e)
else:
self.channels = entry.channels
self.bits_per_sample = entry.sample_size
self.sample_rate = entry.sample_rate
self.bitrate = entry.bitrate
self.codec = entry.codec
self.codec_description = entry.codec_description
def pprint(self):
return "MPEG-4 audio (%s), %.2f seconds, %d bps" % (
self.codec_description, self.length, self.bitrate)
class MP4(FileType):
"""An MPEG-4 audio file, probably containing AAC.
If more than one track is present in the file, the first is used.
Only audio ('soun') tracks will be read.
:ivar info: :class:`MP4Info`
:ivar tags: :class:`MP4Tags`
"""
MP4Tags = MP4Tags
_mimes = ["audio/mp4", "audio/x-m4a", "audio/mpeg4", "audio/aac"]
def load(self, filename):
self.filename = filename
with open(filename, "rb") as fileobj:
try:
atoms = Atoms(fileobj)
except AtomError as err:
reraise(error, err, sys.exc_info()[2])
try:
self.info = MP4Info(atoms, fileobj)
except error:
raise
except Exception as err:
reraise(MP4StreamInfoError, err, sys.exc_info()[2])
if not MP4Tags._can_load(atoms):
self.tags = None
else:
try:
self.tags = self.MP4Tags(atoms, fileobj)
except error:
raise
except Exception as err:
reraise(MP4MetadataError, err, sys.exc_info()[2])
def add_tags(self):
if self.tags is None:
self.tags = self.MP4Tags()
else:
raise error("an MP4 tag already exists")
@staticmethod
def score(filename, fileobj, header_data):
return (b"ftyp" in header_data) + (b"mp4" in header_data)
Open = MP4
def delete(filename):
"""Remove tags from a file."""
MP4(filename).delete()

View file

@ -0,0 +1,541 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
from mutagen._compat import cBytesIO, xrange
from mutagen.aac import ProgramConfigElement
from mutagen._util import BitReader, BitReaderError, cdata, text_type
from ._util import parse_full_atom
from ._atom import Atom, AtomError
class ASEntryError(Exception):
pass
class AudioSampleEntry(object):
"""Parses an AudioSampleEntry atom.
Private API.
Attrs:
channels (int): number of channels
sample_size (int): sample size in bits
sample_rate (int): sample rate in Hz
bitrate (int): bits per second (0 means unknown)
codec (string):
audio codec, either 'mp4a[.*][.*]' (rfc6381) or 'alac'
codec_description (string): descriptive codec name e.g. "AAC LC+SBR"
Can raise ASEntryError.
"""
channels = 0
sample_size = 0
sample_rate = 0
bitrate = 0
codec = None
codec_description = None
def __init__(self, atom, fileobj):
ok, data = atom.read(fileobj)
if not ok:
raise ASEntryError("too short %r atom" % atom.name)
fileobj = cBytesIO(data)
r = BitReader(fileobj)
try:
# SampleEntry
r.skip(6 * 8) # reserved
r.skip(2 * 8) # data_ref_index
# AudioSampleEntry
r.skip(8 * 8) # reserved
self.channels = r.bits(16)
self.sample_size = r.bits(16)
r.skip(2 * 8) # pre_defined
r.skip(2 * 8) # reserved
self.sample_rate = r.bits(32) >> 16
except BitReaderError as e:
raise ASEntryError(e)
assert r.is_aligned()
try:
extra = Atom(fileobj)
except AtomError as e:
raise ASEntryError(e)
self.codec = atom.name.decode("latin-1")
self.codec_description = None
if atom.name == b"mp4a" and extra.name == b"esds":
self._parse_esds(extra, fileobj)
elif atom.name == b"alac" and extra.name == b"alac":
self._parse_alac(extra, fileobj)
elif atom.name == b"ac-3" and extra.name == b"dac3":
self._parse_dac3(extra, fileobj)
if self.codec_description is None:
self.codec_description = self.codec.upper()
def _parse_dac3(self, atom, fileobj):
# ETSI TS 102 366
assert atom.name == b"dac3"
ok, data = atom.read(fileobj)
if not ok:
raise ASEntryError("truncated %s atom" % atom.name)
fileobj = cBytesIO(data)
r = BitReader(fileobj)
# sample_rate in AudioSampleEntry covers values in
# fscod2 and not just fscod, so ignore fscod here.
try:
r.skip(2 + 5 + 3) # fscod, bsid, bsmod
acmod = r.bits(3)
lfeon = r.bits(1)
bit_rate_code = r.bits(5)
r.skip(5) # reserved
except BitReaderError as e:
raise ASEntryError(e)
self.channels = [2, 1, 2, 3, 3, 4, 4, 5][acmod] + lfeon
try:
self.bitrate = [
32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192,
224, 256, 320, 384, 448, 512, 576, 640][bit_rate_code] * 1000
except IndexError:
pass
def _parse_alac(self, atom, fileobj):
# https://alac.macosforge.org/trac/browser/trunk/
# ALACMagicCookieDescription.txt
assert atom.name == b"alac"
ok, data = atom.read(fileobj)
if not ok:
raise ASEntryError("truncated %s atom" % atom.name)
try:
version, flags, data = parse_full_atom(data)
except ValueError as e:
raise ASEntryError(e)
if version != 0:
raise ASEntryError("Unsupported version %d" % version)
fileobj = cBytesIO(data)
r = BitReader(fileobj)
try:
# for some files the AudioSampleEntry values default to 44100/2chan
# and the real info is in the alac cookie, so prefer it
r.skip(32) # frameLength
compatibleVersion = r.bits(8)
if compatibleVersion != 0:
return
self.sample_size = r.bits(8)
r.skip(8 + 8 + 8)
self.channels = r.bits(8)
r.skip(16 + 32)
self.bitrate = r.bits(32)
self.sample_rate = r.bits(32)
except BitReaderError as e:
raise ASEntryError(e)
def _parse_esds(self, esds, fileobj):
assert esds.name == b"esds"
ok, data = esds.read(fileobj)
if not ok:
raise ASEntryError("truncated %s atom" % esds.name)
try:
version, flags, data = parse_full_atom(data)
except ValueError as e:
raise ASEntryError(e)
if version != 0:
raise ASEntryError("Unsupported version %d" % version)
fileobj = cBytesIO(data)
r = BitReader(fileobj)
try:
tag = r.bits(8)
if tag != ES_Descriptor.TAG:
raise ASEntryError("unexpected descriptor: %d" % tag)
assert r.is_aligned()
except BitReaderError as e:
raise ASEntryError(e)
try:
decSpecificInfo = ES_Descriptor.parse(fileobj)
except DescriptorError as e:
raise ASEntryError(e)
dec_conf_desc = decSpecificInfo.decConfigDescr
self.bitrate = dec_conf_desc.avgBitrate
self.codec += dec_conf_desc.codec_param
self.codec_description = dec_conf_desc.codec_desc
decSpecificInfo = dec_conf_desc.decSpecificInfo
if decSpecificInfo is not None:
if decSpecificInfo.channels != 0:
self.channels = decSpecificInfo.channels
if decSpecificInfo.sample_rate != 0:
self.sample_rate = decSpecificInfo.sample_rate
class DescriptorError(Exception):
pass
class BaseDescriptor(object):
TAG = None
@classmethod
def _parse_desc_length_file(cls, fileobj):
"""May raise ValueError"""
value = 0
for i in xrange(4):
try:
b = cdata.uint8(fileobj.read(1))
except cdata.error as e:
raise ValueError(e)
value = (value << 7) | (b & 0x7f)
if not b >> 7:
break
else:
raise ValueError("invalid descriptor length")
return value
@classmethod
def parse(cls, fileobj):
"""Returns a parsed instance of the called type.
The file position is right after the descriptor after this returns.
Raises DescriptorError
"""
try:
length = cls._parse_desc_length_file(fileobj)
except ValueError as e:
raise DescriptorError(e)
pos = fileobj.tell()
instance = cls(fileobj, length)
left = length - (fileobj.tell() - pos)
if left < 0:
raise DescriptorError("descriptor parsing read too much data")
fileobj.seek(left, 1)
return instance
class ES_Descriptor(BaseDescriptor):
TAG = 0x3
def __init__(self, fileobj, length):
"""Raises DescriptorError"""
r = BitReader(fileobj)
try:
self.ES_ID = r.bits(16)
self.streamDependenceFlag = r.bits(1)
self.URL_Flag = r.bits(1)
self.OCRstreamFlag = r.bits(1)
self.streamPriority = r.bits(5)
if self.streamDependenceFlag:
self.dependsOn_ES_ID = r.bits(16)
if self.URL_Flag:
URLlength = r.bits(8)
self.URLstring = r.bytes(URLlength)
if self.OCRstreamFlag:
self.OCR_ES_Id = r.bits(16)
tag = r.bits(8)
except BitReaderError as e:
raise DescriptorError(e)
if tag != DecoderConfigDescriptor.TAG:
raise DescriptorError("unexpected DecoderConfigDescrTag %d" % tag)
assert r.is_aligned()
self.decConfigDescr = DecoderConfigDescriptor.parse(fileobj)
class DecoderConfigDescriptor(BaseDescriptor):
TAG = 0x4
decSpecificInfo = None
"""A DecoderSpecificInfo, optional"""
def __init__(self, fileobj, length):
"""Raises DescriptorError"""
r = BitReader(fileobj)
try:
self.objectTypeIndication = r.bits(8)
self.streamType = r.bits(6)
self.upStream = r.bits(1)
self.reserved = r.bits(1)
self.bufferSizeDB = r.bits(24)
self.maxBitrate = r.bits(32)
self.avgBitrate = r.bits(32)
if (self.objectTypeIndication, self.streamType) != (0x40, 0x5):
return
# all from here is optional
if length * 8 == r.get_position():
return
tag = r.bits(8)
except BitReaderError as e:
raise DescriptorError(e)
if tag == DecoderSpecificInfo.TAG:
assert r.is_aligned()
self.decSpecificInfo = DecoderSpecificInfo.parse(fileobj)
@property
def codec_param(self):
"""string"""
param = u".%X" % self.objectTypeIndication
info = self.decSpecificInfo
if info is not None:
param += u".%d" % info.audioObjectType
return param
@property
def codec_desc(self):
"""string or None"""
info = self.decSpecificInfo
desc = None
if info is not None:
desc = info.description
return desc
class DecoderSpecificInfo(BaseDescriptor):
TAG = 0x5
_TYPE_NAMES = [
None, "AAC MAIN", "AAC LC", "AAC SSR", "AAC LTP", "SBR",
"AAC scalable", "TwinVQ", "CELP", "HVXC", None, None, "TTSI",
"Main synthetic", "Wavetable synthesis", "General MIDI",
"Algorithmic Synthesis and Audio FX", "ER AAC LC", None, "ER AAC LTP",
"ER AAC scalable", "ER Twin VQ", "ER BSAC", "ER AAC LD", "ER CELP",
"ER HVXC", "ER HILN", "ER Parametric", "SSC", "PS", "MPEG Surround",
None, "Layer-1", "Layer-2", "Layer-3", "DST", "ALS", "SLS",
"SLS non-core", "ER AAC ELD", "SMR Simple", "SMR Main", "USAC",
"SAOC", "LD MPEG Surround", "USAC"
]
_FREQS = [
96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000,
12000, 11025, 8000, 7350,
]
@property
def description(self):
"""string or None if unknown"""
name = None
try:
name = self._TYPE_NAMES[self.audioObjectType]
except IndexError:
pass
if name is None:
return
if self.sbrPresentFlag == 1:
name += "+SBR"
if self.psPresentFlag == 1:
name += "+PS"
return text_type(name)
@property
def sample_rate(self):
"""0 means unknown"""
if self.sbrPresentFlag == 1:
return self.extensionSamplingFrequency
elif self.sbrPresentFlag == 0:
return self.samplingFrequency
else:
# these are all types that support SBR
aot_can_sbr = (1, 2, 3, 4, 6, 17, 19, 20, 22)
if self.audioObjectType not in aot_can_sbr:
return self.samplingFrequency
# there shouldn't be SBR for > 48KHz
if self.samplingFrequency > 24000:
return self.samplingFrequency
# either samplingFrequency or samplingFrequency * 2
return 0
@property
def channels(self):
"""channel count or 0 for unknown"""
# from ProgramConfigElement()
if hasattr(self, "pce_channels"):
return self.pce_channels
conf = getattr(
self, "extensionChannelConfiguration", self.channelConfiguration)
if conf == 1:
if self.psPresentFlag == -1:
return 0
elif self.psPresentFlag == 1:
return 2
else:
return 1
elif conf == 7:
return 8
elif conf > 7:
return 0
else:
return conf
def _get_audio_object_type(self, r):
"""Raises BitReaderError"""
audioObjectType = r.bits(5)
if audioObjectType == 31:
audioObjectTypeExt = r.bits(6)
audioObjectType = 32 + audioObjectTypeExt
return audioObjectType
def _get_sampling_freq(self, r):
"""Raises BitReaderError"""
samplingFrequencyIndex = r.bits(4)
if samplingFrequencyIndex == 0xf:
samplingFrequency = r.bits(24)
else:
try:
samplingFrequency = self._FREQS[samplingFrequencyIndex]
except IndexError:
samplingFrequency = 0
return samplingFrequency
def __init__(self, fileobj, length):
"""Raises DescriptorError"""
r = BitReader(fileobj)
try:
self._parse(r, length)
except BitReaderError as e:
raise DescriptorError(e)
def _parse(self, r, length):
"""Raises BitReaderError"""
def bits_left():
return length * 8 - r.get_position()
self.audioObjectType = self._get_audio_object_type(r)
self.samplingFrequency = self._get_sampling_freq(r)
self.channelConfiguration = r.bits(4)
self.sbrPresentFlag = -1
self.psPresentFlag = -1
if self.audioObjectType in (5, 29):
self.extensionAudioObjectType = 5
self.sbrPresentFlag = 1
if self.audioObjectType == 29:
self.psPresentFlag = 1
self.extensionSamplingFrequency = self._get_sampling_freq(r)
self.audioObjectType = self._get_audio_object_type(r)
if self.audioObjectType == 22:
self.extensionChannelConfiguration = r.bits(4)
else:
self.extensionAudioObjectType = 0
if self.audioObjectType in (1, 2, 3, 4, 6, 7, 17, 19, 20, 21, 22, 23):
try:
GASpecificConfig(r, self)
except NotImplementedError:
# unsupported, (warn?)
return
else:
# unsupported
return
if self.audioObjectType in (
17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 39):
epConfig = r.bits(2)
if epConfig in (2, 3):
# unsupported
return
if self.extensionAudioObjectType != 5 and bits_left() >= 16:
syncExtensionType = r.bits(11)
if syncExtensionType == 0x2b7:
self.extensionAudioObjectType = self._get_audio_object_type(r)
if self.extensionAudioObjectType == 5:
self.sbrPresentFlag = r.bits(1)
if self.sbrPresentFlag == 1:
self.extensionSamplingFrequency = \
self._get_sampling_freq(r)
if bits_left() >= 12:
syncExtensionType = r.bits(11)
if syncExtensionType == 0x548:
self.psPresentFlag = r.bits(1)
if self.extensionAudioObjectType == 22:
self.sbrPresentFlag = r.bits(1)
if self.sbrPresentFlag == 1:
self.extensionSamplingFrequency = \
self._get_sampling_freq(r)
self.extensionChannelConfiguration = r.bits(4)
def GASpecificConfig(r, info):
"""Reads GASpecificConfig which is needed to get the data after that
(there is no length defined to skip it) and to read program_config_element
which can contain channel counts.
May raise BitReaderError on error or
NotImplementedError if some reserved data was set.
"""
assert isinstance(info, DecoderSpecificInfo)
r.skip(1) # frameLengthFlag
dependsOnCoreCoder = r.bits(1)
if dependsOnCoreCoder:
r.skip(14)
extensionFlag = r.bits(1)
if not info.channelConfiguration:
pce = ProgramConfigElement(r)
info.pce_channels = pce.channels
if info.audioObjectType == 6 or info.audioObjectType == 20:
r.skip(3)
if extensionFlag:
if info.audioObjectType == 22:
r.skip(5 + 11)
if info.audioObjectType in (17, 19, 20, 23):
r.skip(1 + 1 + 1)
extensionFlag3 = r.bits(1)
if extensionFlag3 != 0:
raise NotImplementedError("extensionFlag3 set")

190
lib/mutagen/mp4/_atom.py Normal file
View file

@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
import struct
from mutagen._compat import PY2
# This is not an exhaustive list of container atoms, but just the
# ones this module needs to peek inside.
_CONTAINERS = [b"moov", b"udta", b"trak", b"mdia", b"meta", b"ilst",
b"stbl", b"minf", b"moof", b"traf"]
_SKIP_SIZE = {b"meta": 4}
class AtomError(Exception):
pass
class Atom(object):
"""An individual atom.
Attributes:
children -- list child atoms (or None for non-container atoms)
length -- length of this atom, including length and name
name -- four byte name of the atom, as a str
offset -- location in the constructor-given fileobj of this atom
This structure should only be used internally by Mutagen.
"""
children = None
def __init__(self, fileobj, level=0):
"""May raise AtomError"""
self.offset = fileobj.tell()
try:
self.length, self.name = struct.unpack(">I4s", fileobj.read(8))
except struct.error:
raise AtomError("truncated data")
self._dataoffset = self.offset + 8
if self.length == 1:
try:
self.length, = struct.unpack(">Q", fileobj.read(8))
except struct.error:
raise AtomError("truncated data")
self._dataoffset += 8
if self.length < 16:
raise AtomError(
"64 bit atom length can only be 16 and higher")
elif self.length == 0:
if level != 0:
raise AtomError(
"only a top-level atom can have zero length")
# Only the last atom is supposed to have a zero-length, meaning it
# extends to the end of file.
fileobj.seek(0, 2)
self.length = fileobj.tell() - self.offset
fileobj.seek(self.offset + 8, 0)
elif self.length < 8:
raise AtomError(
"atom length can only be 0, 1 or 8 and higher")
if self.name in _CONTAINERS:
self.children = []
fileobj.seek(_SKIP_SIZE.get(self.name, 0), 1)
while fileobj.tell() < self.offset + self.length:
self.children.append(Atom(fileobj, level + 1))
else:
fileobj.seek(self.offset + self.length, 0)
def read(self, fileobj):
"""Return if all data could be read and the atom payload"""
fileobj.seek(self._dataoffset, 0)
length = self.length - (self._dataoffset - self.offset)
data = fileobj.read(length)
return len(data) == length, data
@staticmethod
def render(name, data):
"""Render raw atom data."""
# this raises OverflowError if Py_ssize_t can't handle the atom data
size = len(data) + 8
if size <= 0xFFFFFFFF:
return struct.pack(">I4s", size, name) + data
else:
return struct.pack(">I4sQ", 1, name, size + 8) + data
def findall(self, name, recursive=False):
"""Recursively find all child atoms by specified name."""
if self.children is not None:
for child in self.children:
if child.name == name:
yield child
if recursive:
for atom in child.findall(name, True):
yield atom
def __getitem__(self, remaining):
"""Look up a child atom, potentially recursively.
e.g. atom['udta', 'meta'] => <Atom name='meta' ...>
"""
if not remaining:
return self
elif self.children is None:
raise KeyError("%r is not a container" % self.name)
for child in self.children:
if child.name == remaining[0]:
return child[remaining[1:]]
else:
raise KeyError("%r not found" % remaining[0])
def __repr__(self):
cls = self.__class__.__name__
if self.children is None:
return "<%s name=%r length=%r offset=%r>" % (
cls, self.name, self.length, self.offset)
else:
children = "\n".join([" " + line for child in self.children
for line in repr(child).splitlines()])
return "<%s name=%r length=%r offset=%r\n%s>" % (
cls, self.name, self.length, self.offset, children)
class Atoms(object):
"""Root atoms in a given file.
Attributes:
atoms -- a list of top-level atoms as Atom objects
This structure should only be used internally by Mutagen.
"""
def __init__(self, fileobj):
self.atoms = []
fileobj.seek(0, 2)
end = fileobj.tell()
fileobj.seek(0)
while fileobj.tell() + 8 <= end:
self.atoms.append(Atom(fileobj))
def path(self, *names):
"""Look up and return the complete path of an atom.
For example, atoms.path('moov', 'udta', 'meta') will return a
list of three atoms, corresponding to the moov, udta, and meta
atoms.
"""
path = [self]
for name in names:
path.append(path[-1][name, ])
return path[1:]
def __contains__(self, names):
try:
self[names]
except KeyError:
return False
return True
def __getitem__(self, names):
"""Look up a child atom.
'names' may be a list of atoms (['moov', 'udta']) or a string
specifying the complete path ('moov.udta').
"""
if PY2:
if isinstance(names, basestring):
names = names.split(b".")
else:
if isinstance(names, bytes):
names = names.split(b".")
for child in self.atoms:
if child.name == names[0]:
return child[names[1:]]
else:
raise KeyError("%s not found" % names[0])
def __repr__(self):
return "\n".join([repr(child) for child in self.atoms])

21
lib/mutagen/mp4/_util.py Normal file
View file

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
from mutagen._util import cdata
def parse_full_atom(data):
"""Some atoms are versioned. Split them up in (version, flags, payload).
Can raise ValueError.
"""
if len(data) < 4:
raise ValueError("not enough data")
version = ord(data[0:1])
flags = cdata.uint_be(b"\x00" + data[1:4])
return version, flags, data[4:]