Update beets to 1.3.18:

Dependencies:
* PyYAML 3.11
* Unidecode 0.4.19
* beets 1.3.18
* colorama 0.3.7
* enum34 1.1.6
* jellyfish 0.5.4
* munkres 1.0.7
* musicbrainzngs 0.6
* mutagen 1.32
This commit is contained in:
Labrys 2016-06-06 12:08:03 -04:00
parent f093fafd8d
commit 1111074dc3
165 changed files with 48385 additions and 7424 deletions

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# This file is part of beets.
# Copyright 2013, Adrian Sampson.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@ -30,18 +31,19 @@ up a bottleneck stage by dividing its work among multiple threads.
To do so, pass an iterable of coroutines to the Pipeline constructor
in place of any single coroutine.
"""
from __future__ import print_function
from __future__ import division, absolute_import, print_function
import Queue
from threading import Thread, Lock
import sys
import types
BUBBLE = '__PIPELINE_BUBBLE__'
POISON = '__PIPELINE_POISON__'
DEFAULT_QUEUE_SIZE = 16
def _invalidate_queue(q, val=None, sync=True):
"""Breaks a Queue such that it never blocks, always has size 1,
and has no maximum size. get()ing from the queue returns `val`,
@ -50,8 +52,10 @@ def _invalidate_queue(q, val=None, sync=True):
"""
def _qsize(len=len):
return 1
def _put(item):
pass
def _get():
return val
@ -70,6 +74,7 @@ def _invalidate_queue(q, val=None, sync=True):
if sync:
q.mutex.release()
class CountedQueue(Queue.Queue):
"""A queue that keeps track of the number of threads that are
still feeding into it. The queue is poisoned when all threads are
@ -104,6 +109,7 @@ class CountedQueue(Queue.Queue):
# Replacement _get invalidates when no items remain.
_old_get = self._get
def _get():
out = _old_get()
if not self.queue:
@ -117,18 +123,67 @@ class CountedQueue(Queue.Queue):
# No items. Invalidate immediately.
_invalidate_queue(self, POISON, False)
class MultiMessage(object):
"""A message yielded by a pipeline stage encapsulating multiple
values to be sent to the next stage.
"""
def __init__(self, messages):
self.messages = messages
def multiple(messages):
"""Yield multiple([message, ..]) from a pipeline stage to send
multiple values to the next pipeline stage.
"""
return MultiMessage(messages)
def stage(func):
"""Decorate a function to become a simple stage.
>>> @stage
... def add(n, i):
... return i + n
>>> pipe = Pipeline([
... iter([1, 2, 3]),
... add(2),
... ])
>>> list(pipe.pull())
[3, 4, 5]
"""
def coro(*args):
task = None
while True:
task = yield task
task = func(*(args + (task,)))
return coro
def mutator_stage(func):
"""Decorate a function that manipulates items in a coroutine to
become a simple stage.
>>> @mutator_stage
... def setkey(key, item):
... item[key] = True
>>> pipe = Pipeline([
... iter([{'x': False}, {'a': False}]),
... setkey('x'),
... ])
>>> list(pipe.pull())
[{'x': True}, {'a': False, 'x': True}]
"""
def coro(*args):
task = None
while True:
task = yield task
func(*(args + (task,)))
return coro
def _allmsgs(obj):
"""Returns a list of all the messages encapsulated in obj. If obj
is a MultiMessage, returns its enclosed messages. If obj is BUBBLE,
@ -141,6 +196,7 @@ def _allmsgs(obj):
else:
return [obj]
class PipelineThread(Thread):
"""Abstract base class for pipeline-stage threads."""
def __init__(self, all_threads):
@ -169,6 +225,7 @@ class PipelineThread(Thread):
for thread in self.all_threads:
thread.abort()
class FirstPipelineThread(PipelineThread):
"""The thread running the first stage in a parallel pipeline setup.
The coroutine should just be a generator.
@ -191,7 +248,7 @@ class FirstPipelineThread(PipelineThread):
# Get the value from the generator.
try:
msg = self.coro.next()
msg = next(self.coro)
except StopIteration:
break
@ -209,6 +266,7 @@ class FirstPipelineThread(PipelineThread):
# Generator finished; shut down the pipeline.
self.out_queue.release()
class MiddlePipelineThread(PipelineThread):
"""A thread running any stage in the pipeline except the first or
last.
@ -223,7 +281,7 @@ class MiddlePipelineThread(PipelineThread):
def run(self):
try:
# Prime the coroutine.
self.coro.next()
next(self.coro)
while True:
with self.abort_lock:
@ -256,6 +314,7 @@ class MiddlePipelineThread(PipelineThread):
# Pipeline is shutting down normally.
self.out_queue.release()
class LastPipelineThread(PipelineThread):
"""A thread running the last stage in a pipeline. The coroutine
should yield nothing.
@ -267,7 +326,7 @@ class LastPipelineThread(PipelineThread):
def run(self):
# Prime the coroutine.
self.coro.next()
next(self.coro)
try:
while True:
@ -291,6 +350,7 @@ class LastPipelineThread(PipelineThread):
self.abort_all(sys.exc_info())
return
class Pipeline(object):
"""Represents a staged pattern of work. Each stage in the pipeline
is a coroutine that receives messages from the previous stage and
@ -301,7 +361,7 @@ class Pipeline(object):
be at least two stages.
"""
if len(stages) < 2:
raise ValueError('pipeline must have at least two stages')
raise ValueError(u'pipeline must have at least two stages')
self.stages = []
for stage in stages:
if isinstance(stage, (list, tuple)):
@ -322,7 +382,8 @@ class Pipeline(object):
messages between the stages are stored in queues of the given
size.
"""
queues = [CountedQueue(queue_size) for i in range(len(self.stages)-1)]
queue_count = len(self.stages) - 1
queues = [CountedQueue(queue_size) for i in range(queue_count)]
threads = []
# Set up first stage.
@ -330,10 +391,10 @@ class Pipeline(object):
threads.append(FirstPipelineThread(coro, queues[0], threads))
# Middle stages.
for i in range(1, len(self.stages)-1):
for i in range(1, queue_count):
for coro in self.stages[i]:
threads.append(MiddlePipelineThread(
coro, queues[i-1], queues[i], threads
coro, queues[i - 1], queues[i], threads
))
# Last stage.
@ -383,7 +444,7 @@ class Pipeline(object):
# "Prime" the coroutines.
for coro in coros[1:]:
coro.next()
next(coro)
# Begin the pipeline.
for out in coros[0]:
@ -405,20 +466,23 @@ if __name__ == '__main__':
# in parallel.
def produce():
for i in range(5):
print('generating %i' % i)
print(u'generating %i' % i)
time.sleep(1)
yield i
def work():
num = yield
while True:
print('processing %i' % num)
print(u'processing %i' % num)
time.sleep(2)
num = yield num*2
num = yield num * 2
def consume():
while True:
num = yield
time.sleep(1)
print('received %i' % num)
print(u'received %i' % num)
ts_start = time.time()
Pipeline([produce(), work(), consume()]).run_sequential()
ts_seq = time.time()
@ -426,29 +490,30 @@ if __name__ == '__main__':
ts_par = time.time()
Pipeline([produce(), (work(), work()), consume()]).run_parallel()
ts_end = time.time()
print('Sequential time:', ts_seq - ts_start)
print('Parallel time:', ts_par - ts_seq)
print('Multiply-parallel time:', ts_end - ts_par)
print(u'Sequential time:', ts_seq - ts_start)
print(u'Parallel time:', ts_par - ts_seq)
print(u'Multiply-parallel time:', ts_end - ts_par)
print()
# Test a pipeline that raises an exception.
def exc_produce():
for i in range(10):
print('generating %i' % i)
print(u'generating %i' % i)
time.sleep(1)
yield i
def exc_work():
num = yield
while True:
print('processing %i' % num)
print(u'processing %i' % num)
time.sleep(3)
if num == 3:
raise Exception()
num = yield num * 2
def exc_consume():
while True:
num = yield
#if num == 4:
# raise Exception()
print('received %i' % num)
print(u'received %i' % num)
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)