Bump plexapi from 4.12.1 to 4.13.1 (#1888)

Bumps [plexapi](https://github.com/pkkid/python-plexapi) from 4.12.1 to 4.13.1.
- [Release notes](https://github.com/pkkid/python-plexapi/releases)
- [Commits](https://github.com/pkkid/python-plexapi/compare/4.12.1...4.13.1)

---
updated-dependencies:
- dependency-name: plexapi
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2022-11-12 17:29:35 -08:00 committed by GitHub
parent 3af08f0d07
commit e79da07973
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1791 additions and 724 deletions

View file

@ -3,11 +3,12 @@ import copy
import html
import threading
import time
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from xml.etree import ElementTree
import requests
from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_ENABLE_FAST_CONNECT,
X_PLEX_IDENTIFIER, log, logfilter, utils)
from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_CONTAINER_SIZE,
X_PLEX_ENABLE_FAST_CONNECT, X_PLEX_IDENTIFIER, log, logfilter, utils)
from plexapi.base import PlexObject
from plexapi.client import PlexClient
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
@ -47,6 +48,7 @@ class MyPlexAccount(PlexObject):
locale (str): Your Plex locale
mailing_list_status (str): Your current mailing list status.
maxHomeSize (int): Unknown.
pin (str): The hashed Plex Home PIN.
queueEmail (str): Email address to add items to your `Watch Later` queue.
queueUid (str): Unknown.
restricted (bool): Unknown.
@ -65,16 +67,19 @@ class MyPlexAccount(PlexObject):
_session (obj): Requests session object used to access this client.
"""
FRIENDINVITE = 'https://plex.tv/api/servers/{machineId}/shared_servers' # post with data
HOMEUSERS = 'https://plex.tv/api/home/users'
HOMEUSERCREATE = 'https://plex.tv/api/home/users?title={title}' # post with data
EXISTINGUSER = 'https://plex.tv/api/home/users?invitedEmail={username}' # post with data
FRIENDSERVERS = 'https://plex.tv/api/servers/{machineId}/shared_servers/{serverId}' # put with data
PLEXSERVERS = 'https://plex.tv/api/servers/{machineId}' # get
FRIENDUPDATE = 'https://plex.tv/api/friends/{userId}' # put with args, delete
REMOVEHOMEUSER = 'https://plex.tv/api/home/users/{userId}' # delete
HOMEUSER = 'https://plex.tv/api/home/users/{userId}' # delete, put
MANAGEDHOMEUSER = 'https://plex.tv/api/v2/home/users/restricted/{userId}' # put
SIGNIN = 'https://plex.tv/users/sign_in.xml' # get with auth
WEBHOOKS = 'https://plex.tv/api/v2/user/webhooks' # get, post with data
OPTOUTS = 'https://plex.tv/api/v2/user/{userUUID}/settings/opt_outs' # get
LINK = 'https://plex.tv/api/v2/pins/link' # put
VIEWSTATESYNC = 'https://plex.tv/api/v2/user/view_state_sync' # put
# Hub sections
VOD = 'https://vod.provider.plex.tv' # get
MUSIC = 'https://music.provider.plex.tv' # get
@ -115,6 +120,7 @@ class MyPlexAccount(PlexObject):
self.locale = data.attrib.get('locale')
self.mailing_list_status = data.attrib.get('mailing_list_status')
self.maxHomeSize = utils.cast(int, data.attrib.get('maxHomeSize'))
self.pin = data.attrib.get('pin')
self.queueEmail = data.attrib.get('queueEmail')
self.queueUid = data.attrib.get('queueUid')
self.restricted = utils.cast(bool, data.attrib.get('restricted'))
@ -150,7 +156,7 @@ class MyPlexAccount(PlexObject):
for device in self.devices():
if (name and device.name.lower() == name.lower() or device.clientIdentifier == clientId):
return device
raise NotFound('Unable to find device %s' % name)
raise NotFound(f'Unable to find device {name}')
def devices(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexDevice` objects connected to the server. """
@ -174,7 +180,7 @@ class MyPlexAccount(PlexObject):
if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
message = '(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext)
message = f'({response.status_code}) {codename}; {response.url} {errtext}'
if response.status_code == 401:
raise Unauthorized(message)
elif response.status_code == 404:
@ -195,7 +201,7 @@ class MyPlexAccount(PlexObject):
for resource in self.resources():
if resource.name.lower() == name.lower():
return resource
raise NotFound('Unable to find resource %s' % name)
raise NotFound(f'Unable to find resource {name}')
def resources(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexResource` objects connected to the server. """
@ -366,7 +372,8 @@ class MyPlexAccount(PlexObject):
""" Remove the specified user from your friends.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed.
user (:class:`~plexapi.myplex.MyPlexUser` or str): :class:`~plexapi.myplex.MyPlexUser`,
username, or email of the user to be removed.
"""
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.FRIENDUPDATE.format(userId=user.id)
@ -376,17 +383,89 @@ class MyPlexAccount(PlexObject):
""" Remove the specified user from your home users.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed.
user (:class:`~plexapi.myplex.MyPlexUser` or str): :class:`~plexapi.myplex.MyPlexUser`,
username, or email of the user to be removed.
"""
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.REMOVEHOMEUSER.format(userId=user.id)
url = self.HOMEUSER.format(userId=user.id)
return self.query(url, self._session.delete)
def acceptInvite(self, user):
""" Accept a pending firend invite from the specified user.
def switchHomeUser(self, user):
""" Returns a new :class:`~plexapi.myplex.MyPlexAccount` object switched to the given home user.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to accept.
user (:class:`~plexapi.myplex.MyPlexUser` or str): :class:`~plexapi.myplex.MyPlexUser`,
username, or email of the home user to switch to.
Example:
.. code-block:: python
from plexapi.myplex import MyPlexAccount
# Login to a Plex Home account
account = MyPlexAccount('<USERNAME>', '<PASSWORD>')
# Switch to a different Plex Home user
userAccount = account.switchHomeUser('Username')
"""
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = f'{self.HOMEUSERS}/{user.id}/switch'
data = self.query(url, self._session.post)
userToken = data.attrib.get('authenticationToken')
return MyPlexAccount(token=userToken)
def setPin(self, newPin, currentPin=None):
""" Set a new Plex Home PIN for the account.
Parameters:
newPin (str): New PIN to set for the account.
currentPin (str): Current PIN for the account (required to change the PIN).
"""
url = self.HOMEUSER.format(userId=self.id)
params = {'pin': newPin}
if currentPin:
params['currentPin'] = currentPin
return self.query(url, self._session.put, params=params)
def removePin(self, currentPin):
""" Remove the Plex Home PIN for the account.
Parameters:
currentPin (str): Current PIN for the account (required to remove the PIN).
"""
return self.setPin('', currentPin)
def setManagedUserPin(self, user, newPin):
""" Set a new Plex Home PIN for a managed home user. This must be done from the Plex Home admin account.
Parameters:
user (:class:`~plexapi.myplex.MyPlexUser` or str): :class:`~plexapi.myplex.MyPlexUser`
or username of the managed home user.
newPin (str): New PIN to set for the managed home user.
"""
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.MANAGEDHOMEUSER.format(userId=user.id)
params = {'pin': newPin}
return self.query(url, self._session.post, params=params)
def removeManagedUserPin(self, user):
""" Remove the Plex Home PIN for a managed home user. This must be done from the Plex Home admin account.
Parameters:
user (:class:`~plexapi.myplex.MyPlexUser` or str): :class:`~plexapi.myplex.MyPlexUser`
or username of the managed home user.
"""
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.MANAGEDHOMEUSER.format(userId=user.id)
params = {'removePin': 1}
return self.query(url, self._session.post, params=params)
def acceptInvite(self, user):
""" Accept a pending friend invite from the specified user.
Parameters:
user (:class:`~plexapi.myplex.MyPlexInvite` or str): :class:`~plexapi.myplex.MyPlexInvite`,
username, or email of the friend invite to accept.
"""
invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeSent=False)
params = {
@ -394,14 +473,15 @@ class MyPlexAccount(PlexObject):
'home': int(invite.home),
'server': int(invite.server)
}
url = MyPlexInvite.REQUESTS + '/%s' % invite.id + utils.joinArgs(params)
url = MyPlexInvite.REQUESTS + f'/{invite.id}' + utils.joinArgs(params)
return self.query(url, self._session.put)
def cancelInvite(self, user):
""" Cancel a pending firend invite for the specified user.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to cancel.
user (:class:`~plexapi.myplex.MyPlexInvite` or str): :class:`~plexapi.myplex.MyPlexInvite`,
username, or email of the friend invite to cancel.
"""
invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeReceived=False)
params = {
@ -409,7 +489,7 @@ class MyPlexAccount(PlexObject):
'home': int(invite.home),
'server': int(invite.server)
}
url = MyPlexInvite.REQUESTED + '/%s' % invite.id + utils.joinArgs(params)
url = MyPlexInvite.REQUESTED + f'/{invite.id}' + utils.joinArgs(params)
return self.query(url, self._session.delete)
def updateFriend(self, user, server, sections=None, removeSections=False, allowSync=None, allowCameraUpload=None,
@ -497,7 +577,7 @@ class MyPlexAccount(PlexObject):
(user.username.lower(), user.email.lower(), str(user.id))):
return user
raise NotFound('Unable to find user %s' % username)
raise NotFound(f'Unable to find user {username}')
def users(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexUser` objects connected to your account.
@ -520,7 +600,7 @@ class MyPlexAccount(PlexObject):
(invite.username.lower(), invite.email.lower(), str(invite.id))):
return invite
raise NotFound('Unable to find invite %s' % username)
raise NotFound(f'Unable to find invite {username}')
def pendingInvites(self, includeSent=True, includeReceived=True):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexInvite` objects connected to your account.
@ -545,7 +625,7 @@ class MyPlexAccount(PlexObject):
# Get a list of all section ids for looking up each section.
allSectionIds = {}
machineIdentifier = server.machineIdentifier if isinstance(server, PlexServer) else server
url = self.PLEXSERVERS.replace('{machineId}', machineIdentifier)
url = self.PLEXSERVERS.format(machineId=machineIdentifier)
data = self.query(url, self._session.get)
for elem in data[0]:
_id = utils.cast(int, elem.attrib.get('id'))
@ -567,8 +647,8 @@ class MyPlexAccount(PlexObject):
values = []
for key, vals in filterDict.items():
if key not in ('contentRating', 'label', 'contentRating!', 'label!'):
raise BadRequest('Unknown filter key: %s', key)
values.append('%s=%s' % (key, '%2C'.join(vals)))
raise BadRequest(f'Unknown filter key: {key}')
values.append(f"{key}={'%2C'.join(vals)}")
return '|'.join(values)
def addWebhook(self, url):
@ -579,12 +659,12 @@ class MyPlexAccount(PlexObject):
def deleteWebhook(self, url):
urls = copy.copy(self._webhooks)
if url not in urls:
raise BadRequest('Webhook does not exist: %s' % url)
raise BadRequest(f'Webhook does not exist: {url}')
urls.remove(url)
return self.setWebhooks(urls)
def setWebhooks(self, urls):
log.info('Setting webhooks: %s' % urls)
log.info('Setting webhooks: %s', urls)
data = {'urls[]': urls} if len(urls) else {'urls': ''}
data = self.query(self.WEBHOOKS, self._session.post, data=data)
self._webhooks = self.listAttrs(data, 'url', etag='webhook')
@ -655,7 +735,7 @@ class MyPlexAccount(PlexObject):
break
if not client:
raise BadRequest('Unable to find client by clientId=%s', clientId)
raise BadRequest(f'Unable to find client by clientId={clientId}')
if 'sync-target' not in client.provides:
raise BadRequest("Received client doesn't provides sync-target")
@ -694,7 +774,7 @@ class MyPlexAccount(PlexObject):
if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}')
return response.json()['token']
def history(self, maxresults=9999999, mindate=None):
@ -730,7 +810,7 @@ class MyPlexAccount(PlexObject):
data = self.query(f'{self.MUSIC}/hubs')
return self.findItems(data)
def watchlist(self, filter=None, sort=None, libtype=None, **kwargs):
def watchlist(self, filter=None, sort=None, libtype=None, maxresults=9999999, **kwargs):
""" Returns a list of :class:`~plexapi.video.Movie` and :class:`~plexapi.video.Show` items in the user's watchlist.
Note: The objects returned are from Plex's online metadata. To get the matching item on a Plex server,
search for the media using the guid.
@ -742,6 +822,7 @@ class MyPlexAccount(PlexObject):
``titleSort`` (Title), ``originallyAvailableAt`` (Release Date), or ``rating`` (Critic Rating).
``dir`` can be ``asc`` or ``desc``.
libtype (str, optional): 'movie' or 'show' to only return movies or shows, otherwise return all items.
maxresults (int, optional): Only return the specified number of results.
**kwargs (dict): Additional custom filters to apply to the search results.
@ -769,9 +850,18 @@ class MyPlexAccount(PlexObject):
if libtype:
params['type'] = utils.searchType(libtype)
params['X-Plex-Container-Start'] = 0
params['X-Plex-Container-Size'] = min(X_PLEX_CONTAINER_SIZE, maxresults)
params.update(kwargs)
data = self.query(f'{self.METADATA}/library/sections/watchlist/{filter}', params=params)
return self.findItems(data)
results, subresults = [], '_init'
while subresults and maxresults > len(results):
data = self.query(f'{self.METADATA}/library/sections/watchlist/{filter}', params=params)
subresults = self.findItems(data)
results += subresults[:maxresults - len(results)]
params['X-Plex-Container-Start'] += params['X-Plex-Container-Size']
return self._toOnlineMetadata(results)
def onWatchlist(self, item):
""" Returns True if the item is on the user's watchlist.
@ -780,9 +870,7 @@ class MyPlexAccount(PlexObject):
item (:class:`~plexapi.video.Movie` or :class:`~plexapi.video.Show`): Item to check
if it is on the user's watchlist.
"""
ratingKey = item.guid.rsplit('/', 1)[-1]
data = self.query(f"{self.METADATA}/library/metadata/{ratingKey}/userState")
return bool(data.find('UserState').attrib.get('watchlistedAt'))
return bool(self.userState(item).watchlistedAt)
def addToWatchlist(self, items):
""" Add media items to the user's watchlist
@ -800,9 +888,10 @@ class MyPlexAccount(PlexObject):
for item in items:
if self.onWatchlist(item):
raise BadRequest('"%s" is already on the watchlist' % item.title)
raise BadRequest(f'"{item.title}" is already on the watchlist')
ratingKey = item.guid.rsplit('/', 1)[-1]
self.query(f'{self.METADATA}/actions/addToWatchlist?ratingKey={ratingKey}', method=self._session.put)
return self
def removeFromWatchlist(self, items):
""" Remove media items from the user's watchlist
@ -820,33 +909,49 @@ class MyPlexAccount(PlexObject):
for item in items:
if not self.onWatchlist(item):
raise BadRequest('"%s" is not on the watchlist' % item.title)
raise BadRequest(f'"{item.title}" is not on the watchlist')
ratingKey = item.guid.rsplit('/', 1)[-1]
self.query(f'{self.METADATA}/actions/removeFromWatchlist?ratingKey={ratingKey}', method=self._session.put)
return self
def searchDiscover(self, query, limit=30):
def userState(self, item):
""" Returns a :class:`~plexapi.myplex.UserState` object for the specified item.
Parameters:
item (:class:`~plexapi.video.Movie` or :class:`~plexapi.video.Show`): Item to return the user state.
"""
ratingKey = item.guid.rsplit('/', 1)[-1]
data = self.query(f"{self.METADATA}/library/metadata/{ratingKey}/userState")
return self.findItem(data, cls=UserState)
def searchDiscover(self, query, limit=30, libtype=None):
""" Search for movies and TV shows in Discover.
Returns a list of :class:`~plexapi.video.Movie` and :class:`~plexapi.video.Show` objects.
Parameters:
query (str): Search query.
limit (int, optional): Limit to the specified number of results. Default 30.
libtype (str, optional): 'movie' or 'show' to only return movies or shows, otherwise return all items.
"""
libtypes = {'movie': 'movies', 'show': 'tv'}
libtype = libtypes.get(libtype, 'movies,tv')
headers = {
'Accept': 'application/json'
}
params = {
'query': query,
'limit ': limit,
'searchTypes': 'movies,tv',
'searchTypes': libtype,
'includeMetadata': 1
}
data = self.query(f'{self.METADATA}/library/search', headers=headers, params=params)
searchResults = data['MediaContainer'].get('SearchResult', [])
searchResults = data['MediaContainer'].get('SearchResults', [])
searchResult = next((s.get('SearchResult', []) for s in searchResults if s.get('id') == 'external'), [])
results = []
for result in searchResults:
for result in searchResult:
metadata = result['Metadata']
type = metadata['type']
if type == 'movie':
@ -859,7 +964,33 @@ class MyPlexAccount(PlexObject):
xml = f'<{tag} {attrs}/>'
results.append(self._manuallyLoadXML(xml))
return results
return self._toOnlineMetadata(results)
@property
def viewStateSync(self):
""" Returns True or False if syncing of watch state and ratings
is enabled or disabled, respectively, for the account.
"""
headers = {'Accept': 'application/json'}
data = self.query(self.VIEWSTATESYNC, headers=headers)
return data.get('consent')
def enableViewStateSync(self):
""" Enable syncing of watch state and ratings for the account. """
self._updateViewStateSync(True)
def disableViewStateSync(self):
""" Disable syncing of watch state and ratings for the account. """
self._updateViewStateSync(False)
def _updateViewStateSync(self, consent):
""" Enable or disable syncing of watch state and ratings for the account.
Parameters:
consent (bool): True to enable, False to disable.
"""
params = {'consent': consent}
self.query(self.VIEWSTATESYNC, method=self._session.put, params=params)
def link(self, pin):
""" Link a device to the account using a pin code.
@ -874,6 +1005,26 @@ class MyPlexAccount(PlexObject):
data = {'code': pin}
self.query(self.LINK, self._session.put, headers=headers, data=data)
def _toOnlineMetadata(self, objs):
""" Convert a list of media objects to online metadata objects. """
# TODO: Add proper support for metadata.provider.plex.tv
# Temporary workaround to allow reloading and browsing of online media objects
server = PlexServer(self.METADATA, self._token)
if not isinstance(objs, list):
objs = [objs]
for obj in objs:
obj._server = server
# Parse details key to modify query string
url = urlsplit(obj._details_key)
query = dict(parse_qsl(url.query))
query['includeUserState'] = 1
query.pop('includeFields', None)
obj._details_key = urlunsplit((url.scheme, url.netloc, url.path, urlencode(query), url.fragment))
return objs
class MyPlexUser(PlexObject):
""" This object represents non-signed in users such as friends and linked
@ -937,7 +1088,7 @@ class MyPlexUser(PlexObject):
if utils.cast(int, item.attrib.get('userID')) == self.id:
return item.attrib.get('accessToken')
except Exception:
log.exception('Failed to get access token for %s' % self.title)
log.exception('Failed to get access token for %s', self.title)
def server(self, name):
""" Returns the :class:`~plexapi.myplex.MyPlexServerShare` that matches the name specified.
@ -949,7 +1100,7 @@ class MyPlexUser(PlexObject):
if name.lower() == server.name.lower():
return server
raise NotFound('Unable to find server %s' % name)
raise NotFound(f'Unable to find server {name}')
def history(self, maxresults=9999999, mindate=None):
""" Get all Play History for a user in all shared servers.
@ -1077,7 +1228,7 @@ class MyPlexServerShare(PlexObject):
if name.lower() == section.title.lower():
return section
raise NotFound('Unable to find section %s' % name)
raise NotFound(f'Unable to find section {name}')
def sections(self):
""" Returns a list of all :class:`~plexapi.myplex.Section` objects shared with this user.
@ -1158,7 +1309,6 @@ class MyPlexResource(PlexObject):
def preferred_connections(
self,
ssl=None,
timeout=None,
locations=DEFAULT_LOCATION_ORDER,
schemes=DEFAULT_SCHEME_ORDER,
):
@ -1170,7 +1320,6 @@ class MyPlexResource(PlexObject):
ssl (bool, optional): Set True to only connect to HTTPS connections. Set False to
only connect to HTTP connections. Set None (default) to connect to any
HTTP or HTTPS connection.
timeout (int, optional): The timeout in seconds to attempt each connection.
"""
connections_dict = {location: {scheme: [] for scheme in schemes} for location in locations}
for connection in self.connections:
@ -1212,7 +1361,7 @@ class MyPlexResource(PlexObject):
Raises:
:exc:`~plexapi.exceptions.NotFound`: When unable to connect to any addresses for this resource.
"""
connections = self.preferred_connections(ssl, timeout, locations, schemes)
connections = self.preferred_connections(ssl, locations, schemes)
# Try connecting to all known resource connections in parallel, but
# only return the first server (in order) that provides a response.
cls = PlexServer if 'server' in self.provides else PlexClient
@ -1244,7 +1393,7 @@ class ResourceConnection(PlexObject):
self.port = utils.cast(int, data.attrib.get('port'))
self.uri = data.attrib.get('uri')
self.local = utils.cast(bool, data.attrib.get('local'))
self.httpuri = 'http://%s:%s' % (self.address, self.port)
self.httpuri = f'http://{self.address}:{self.port}'
self.relay = utils.cast(bool, data.attrib.get('relay'))
@ -1318,7 +1467,7 @@ class MyPlexDevice(PlexObject):
def delete(self):
""" Remove this device from your account. """
key = 'https://plex.tv/devices/%s.xml' % self.id
key = f'https://plex.tv/devices/{self.id}.xml'
self._server.query(key, self._server._session.delete)
def syncItems(self):
@ -1355,11 +1504,12 @@ class MyPlexPinLogin:
session (requests.Session, optional): Use your own session object if you want to
cache the http responses from PMS
requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT).
headers (dict): A dict of X-Plex headers to send with requests.
oauth (bool): True to use Plex OAuth instead of PIN login.
Attributes:
PINS (str): 'https://plex.tv/api/v2/pins'
CHECKPINS (str): 'https://plex.tv/api/v2/pins/{pinid}'
LINK (str): 'https://plex.tv/api/v2/pins/link'
POLLINTERVAL (int): 1
finished (bool): Whether the pin login has finished or not.
expired (bool): Whether the pin login has expired or not.
@ -1370,12 +1520,13 @@ class MyPlexPinLogin:
CHECKPINS = 'https://plex.tv/api/v2/pins/{pinid}' # get
POLLINTERVAL = 1
def __init__(self, session=None, requestTimeout=None, headers=None):
def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False):
super(MyPlexPinLogin, self).__init__()
self._session = session or requests.Session()
self._requestTimeout = requestTimeout or TIMEOUT
self.headers = headers
self._oauth = oauth
self._loginTimeout = None
self._callback = None
self._thread = None
@ -1390,8 +1541,36 @@ class MyPlexPinLogin:
@property
def pin(self):
""" Return the 4 character PIN used for linking a device at https://plex.tv/link. """
if self._oauth:
raise BadRequest('Cannot use PIN for Plex OAuth login')
return self._code
def oauthUrl(self, forwardUrl=None):
""" Return the Plex OAuth url for login.
Parameters:
forwardUrl (str, optional): The url to redirect the client to after login.
"""
if not self._oauth:
raise BadRequest('Must use "MyPlexPinLogin(oauth=True)" for Plex OAuth login.')
headers = self._headers()
params = {
'clientID': headers['X-Plex-Client-Identifier'],
'context[device][product]': headers['X-Plex-Product'],
'context[device][version]': headers['X-Plex-Version'],
'context[device][platform]': headers['X-Plex-Platform'],
'context[device][platformVersion]': headers['X-Plex-Platform-Version'],
'context[device][device]': headers['X-Plex-Device'],
'context[device][deviceName]': headers['X-Plex-Device-Name'],
'code': self._code
}
if forwardUrl:
params['forwardUrl'] = forwardUrl
return f'https://app.plex.tv/auth/#!?{urlencode(params)}'
def run(self, callback=None, timeout=None):
""" Starts the thread which monitors the PIN login state.
Parameters:
@ -1455,7 +1634,13 @@ class MyPlexPinLogin:
def _getCode(self):
url = self.PINS
response = self._query(url, self._session.post)
if self._oauth:
params = {'strong': True}
else:
params = None
response = self._query(url, self._session.post, params=params)
if not response:
return None
@ -1520,7 +1705,7 @@ class MyPlexPinLogin:
if not response.ok: # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}')
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
@ -1564,7 +1749,7 @@ def _chooseConnection(ctype, name, results):
if results:
log.debug('Connecting to %s: %s?X-Plex-Token=%s', ctype, results[0]._baseurl, results[0]._token)
return results[0]
raise NotFound('Unable to connect to %s: %s' % (ctype.lower(), name))
raise NotFound(f'Unable to connect to {ctype.lower()}: {name}')
class AccountOptOut(PlexObject):
@ -1593,8 +1778,8 @@ class AccountOptOut(PlexObject):
:exc:`~plexapi.exceptions.NotFound`: ``option`` str not found in CHOICES.
"""
if option not in self.CHOICES:
raise NotFound('%s not found in available choices: %s' % (option, self.CHOICES))
url = self._server.OPTOUTS % {'userUUID': self._server.uuid}
raise NotFound(f'{option} not found in available choices: {self.CHOICES}')
url = self._server.OPTOUTS.format(userUUID=self._server.uuid)
params = {'key': self.key, 'value': option}
self._server.query(url, method=self._server._session.post, params=params)
self.value = option # assume query successful and set the value to option
@ -1614,5 +1799,35 @@ class AccountOptOut(PlexObject):
:exc:`~plexapi.exceptions.BadRequest`: When trying to opt out music.
"""
if self.key == 'tv.plex.provider.music':
raise BadRequest('%s does not have the option to opt out managed users.' % self.key)
raise BadRequest(f'{self.key} does not have the option to opt out managed users.')
self._updateOptOut('opt_out_managed')
class UserState(PlexObject):
""" Represents a single UserState
Attributes:
TAG (str): UserState
lastViewedAt (datetime): Datetime the item was last played.
ratingKey (str): Unique key identifying the item.
type (str): The media type of the item.
viewCount (int): Count of times the item was played.
viewedLeafCount (int): Number of items marked as played in the show/season.
viewOffset (int): Time offset in milliseconds from the start of the content
viewState (bool): True or False if the item has been played.
watchlistedAt (datetime): Datetime the item was added to the watchlist.
"""
TAG = 'UserState'
def __repr__(self):
return f'<{self.__class__.__name__}:{self.ratingKey}>'
def _loadData(self, data):
self.lastViewedAt = utils.toDatetime(data.attrib.get('lastViewedAt'))
self.ratingKey = data.attrib.get('ratingKey')
self.type = data.attrib.get('type')
self.viewCount = utils.cast(int, data.attrib.get('viewCount', 0))
self.viewedLeafCount = utils.cast(int, data.attrib.get('viewedLeafCount', 0))
self.viewOffset = utils.cast(int, data.attrib.get('viewOffset', 0))
self.viewState = data.attrib.get('viewState') == 'complete'
self.watchlistedAt = utils.toDatetime(data.attrib.get('watchlistedAt'))