mirror of
https://github.com/Ombi-app/Ombi.git
synced 2025-07-06 05:01:13 -07:00
feat(plex): Show watched status for movie and TV requests
This commit is contained in:
parent
763b64d49f
commit
cff25ca544
13 changed files with 500 additions and 159 deletions
|
@ -24,6 +24,7 @@ namespace Ombi.Api.Plex
|
|||
Task<PlexFriends> GetUsers(string authToken);
|
||||
Task<PlexAccount> GetAccount(string authToken);
|
||||
Task<PlexMetadata> GetRecentlyAdded(string authToken, string uri, string sectionId);
|
||||
Task<PlexMetadata> GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems = 0);
|
||||
Task<OAuthContainer> GetPin(int pinId);
|
||||
Task<Uri> GetOAuthUrl(string code, string applicationUrl);
|
||||
Task<PlexAddWrapper> AddUser(string emailAddress, string serverId, string authToken, int[] libs);
|
||||
|
|
9
src/Ombi.Api.Plex/Models/PlexMediaFilterType.cs
Normal file
9
src/Ombi.Api.Plex/Models/PlexMediaFilterType.cs
Normal file
|
@ -0,0 +1,9 @@
|
|||
namespace Ombi.Api.Plex.Models
|
||||
{
|
||||
public enum PlexMediaFilterType
|
||||
{
|
||||
Movie = 1,
|
||||
Show = 2,
|
||||
Episode = 4,
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
|
@ -212,6 +213,29 @@ namespace Ombi.Api.Plex
|
|||
return await Api.Request<PlexMetadata>(request);
|
||||
}
|
||||
|
||||
|
||||
public async Task<PlexMetadata> GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems)
|
||||
{
|
||||
var request = new Request($"library/sections/{sectionId}/all", uri, HttpMethod.Get);
|
||||
await AddHeaders(request, authToken);
|
||||
request.AddQueryString("unwatched", "0");
|
||||
request.AddQueryString("sort", "lastViewedAt:desc");
|
||||
|
||||
// for some reason, we need to explicitely include episodes for them to be returned by the API (movies are fine)
|
||||
// also the order seems of importance: "4,1" doesn't work but "1,4" does work
|
||||
var types = new List<int> { (int) PlexMediaFilterType.Movie, (int) PlexMediaFilterType.Episode };
|
||||
var typeFilter = string.Join(",", types);
|
||||
request.AddQueryString("type", typeFilter);
|
||||
|
||||
if (maxNumberOfItems != 0)
|
||||
{
|
||||
AddLimitHeaders(request, 0, maxNumberOfItems);
|
||||
}
|
||||
|
||||
|
||||
return await Api.Request<PlexMetadata>(request);
|
||||
}
|
||||
|
||||
public async Task<OAuthContainer> GetPin(int pinId)
|
||||
{
|
||||
var request = new Request($"api/v2/pins/{pinId}", "https://plex.tv/", HttpMethod.Get);
|
||||
|
|
|
@ -30,7 +30,7 @@ namespace Ombi.Core.Tests.Authentication
|
|||
AuthenticationSettings.Setup(x => x.GetSettingsAsync())
|
||||
.ReturnsAsync(new AuthenticationSettings());
|
||||
_um = new OmbiUserManager(UserStore.Object, null, null, null, null, null, null, null, null,
|
||||
PlexApi.Object, null, null, null, null, AuthenticationSettings.Object);
|
||||
PlexApi.Object, null, null, null, null, AuthenticationSettings.Object, null);
|
||||
}
|
||||
|
||||
public OmbiUserManager _um { get; set; }
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Identity;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
@ -41,6 +42,7 @@ using Ombi.Core.Settings.Models.External;
|
|||
using Ombi.Helpers;
|
||||
using Ombi.Settings.Settings.Models;
|
||||
using Ombi.Store.Entities;
|
||||
using Ombi.Store.Repository;
|
||||
|
||||
namespace Ombi.Core.Authentication
|
||||
{
|
||||
|
@ -52,7 +54,7 @@ namespace Ombi.Core.Authentication
|
|||
IdentityErrorDescriber errors, IServiceProvider services, ILogger<UserManager<OmbiUser>> logger, IPlexApi plexApi,
|
||||
IEmbyApiFactory embyApi, ISettingsService<EmbySettings> embySettings,
|
||||
IJellyfinApiFactory jellyfinApi, ISettingsService<JellyfinSettings> jellyfinSettings,
|
||||
ISettingsService<AuthenticationSettings> auth)
|
||||
ISettingsService<AuthenticationSettings> auth, IRepository<PlexWatchlistUserError> userError)
|
||||
: base(store, optionsAccessor, passwordHasher, userValidators, passwordValidators, keyNormalizer, errors, services, logger)
|
||||
{
|
||||
_plexApi = plexApi;
|
||||
|
@ -61,6 +63,8 @@ namespace Ombi.Core.Authentication
|
|||
_embySettings = embySettings;
|
||||
_jellyfinSettings = jellyfinSettings;
|
||||
_authSettings = auth;
|
||||
_userError = userError;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
private readonly IPlexApi _plexApi;
|
||||
|
@ -69,6 +73,8 @@ namespace Ombi.Core.Authentication
|
|||
private readonly ISettingsService<EmbySettings> _embySettings;
|
||||
private readonly ISettingsService<JellyfinSettings> _jellyfinSettings;
|
||||
private readonly ISettingsService<AuthenticationSettings> _authSettings;
|
||||
private readonly IRepository<PlexWatchlistUserError> _userError;
|
||||
private readonly Microsoft.Extensions.Logging.ILogger _logger;
|
||||
private string _clientIpAddress;
|
||||
public string ClientIpAddress { get => _clientIpAddress; set => _clientIpAddress = value; }
|
||||
|
||||
|
@ -139,6 +145,38 @@ namespace Ombi.Core.Authentication
|
|||
|
||||
}
|
||||
|
||||
public async Task<ICollection<OmbiUser>> GetPlexUsersWithValidTokens()
|
||||
{
|
||||
var plexUsersWithTokens = Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList();
|
||||
_logger.LogDebug($"Found {plexUsersWithTokens.Count} users with tokens");
|
||||
var result = new List<OmbiUser>();
|
||||
|
||||
foreach (var user in plexUsersWithTokens)
|
||||
{
|
||||
// Check if the user has errors and the token is the same (not refreshed)
|
||||
var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync();
|
||||
if (failedUser != null)
|
||||
{
|
||||
if (failedUser.MediaServerToken.Equals(user.MediaServerToken))
|
||||
{
|
||||
_logger.LogWarning($"Skipping user '{user.UserName}' as they failed previously and the token has not yet been refreshed. They need to re-authenticate with Ombi");
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
// remove that guy
|
||||
await _userError.Delete(failedUser);
|
||||
failedUser = null;
|
||||
}
|
||||
}
|
||||
if (failedUser == null)
|
||||
{
|
||||
result.Add(user);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sign the user into plex and make sure we can get the authentication token.
|
||||
|
|
|
@ -244,6 +244,7 @@ namespace Ombi.DependencyInjection
|
|||
services.AddSingleton<IJobFactory, IoCJobFactory>();
|
||||
|
||||
services.AddTransient<IPlexContentSync, PlexContentSync>();
|
||||
services.AddTransient<IPlexPlayedSync, PlexPlayedSync>();
|
||||
services.AddTransient<IPlexWatchlistImport, PlexWatchlistImport>();
|
||||
services.AddTransient<IEmbyContentSync, EmbyContentSync>();
|
||||
services.AddTransient<IEmbyPlayedSync, EmbyPlayedSync>();
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
using Quartz;
|
||||
|
||||
namespace Ombi.Schedule.Jobs
|
||||
{
|
||||
public interface IPlexPlayedSync : IJob
|
||||
{
|
||||
}
|
||||
}
|
|
@ -35,82 +35,56 @@ using Ombi.Api.Plex;
|
|||
using Ombi.Api.Plex.Models;
|
||||
using Ombi.Api.TheMovieDb;
|
||||
using Ombi.Api.TheMovieDb.Models;
|
||||
using Ombi.Core.Services;
|
||||
using Ombi.Core.Settings;
|
||||
using Ombi.Core.Settings.Models.External;
|
||||
using Ombi.Helpers;
|
||||
using Ombi.Hubs;
|
||||
using Ombi.Schedule.Jobs.Plex.Interfaces;
|
||||
using Ombi.Schedule.Jobs.Plex.Models;
|
||||
using Ombi.Settings.Settings.Models;
|
||||
using Ombi.Store.Entities;
|
||||
using Ombi.Store.Repository;
|
||||
using Quartz;
|
||||
|
||||
namespace Ombi.Schedule.Jobs.Plex
|
||||
{
|
||||
public class PlexContentSync : IPlexContentSync
|
||||
public class PlexContentSync : PlexLibrarySync, IPlexContentSync
|
||||
{
|
||||
private readonly IMovieDbApi _movieApi;
|
||||
private readonly IMediaCacheService _mediaCacheService;
|
||||
private readonly IFeatureService _feature;
|
||||
private ProcessedContent _processedContent;
|
||||
|
||||
public PlexContentSync(ISettingsService<PlexSettings> plex, IPlexApi plexApi, ILogger<PlexContentSync> logger, IPlexContentRepository repo,
|
||||
IPlexEpisodeSync epsiodeSync, INotificationHubService notificationHubService, IMovieDbApi movieDbApi, IMediaCacheService mediaCacheService)
|
||||
|
||||
public PlexContentSync(
|
||||
ISettingsService<PlexSettings> plex,
|
||||
IPlexApi plexApi, ILogger<PlexLibrarySync> logger,
|
||||
IPlexContentRepository repo,
|
||||
IPlexEpisodeSync epsiodeSync,
|
||||
INotificationHubService notificationHubService,
|
||||
IMovieDbApi movieDbApi,
|
||||
IMediaCacheService mediaCacheService,
|
||||
IFeatureService feature):
|
||||
base(plex, plexApi, logger, notificationHubService)
|
||||
{
|
||||
Plex = plex;
|
||||
PlexApi = plexApi;
|
||||
Logger = logger;
|
||||
Repo = repo;
|
||||
EpisodeSync = epsiodeSync;
|
||||
Notification = notificationHubService;
|
||||
_movieApi = movieDbApi;
|
||||
_mediaCacheService = mediaCacheService;
|
||||
_feature = feature;
|
||||
Plex.ClearCache();
|
||||
}
|
||||
|
||||
private ISettingsService<PlexSettings> Plex { get; }
|
||||
private IPlexApi PlexApi { get; }
|
||||
private ILogger<PlexContentSync> Logger { get; }
|
||||
private IPlexContentRepository Repo { get; }
|
||||
private IPlexEpisodeSync EpisodeSync { get; }
|
||||
private INotificationHubService Notification { get; set; }
|
||||
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
public async override Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
JobDataMap dataMap = context.JobDetail.JobDataMap;
|
||||
var recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch);
|
||||
|
||||
var plexSettings = await Plex.GetSettingsAsync();
|
||||
if (!plexSettings.Enable)
|
||||
{
|
||||
return;
|
||||
}
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started");
|
||||
if (!ValidateSettings(plexSettings))
|
||||
{
|
||||
Logger.LogError("Plex Settings are not valid");
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid");
|
||||
return;
|
||||
}
|
||||
var processedContent = new ProcessedContent();
|
||||
Logger.LogInformation(recentlyAddedSearch
|
||||
? "Starting Plex Content Cacher Recently Added Scan"
|
||||
: "Starting Plex Content Cacher");
|
||||
try
|
||||
{
|
||||
if (recentlyAddedSearch)
|
||||
{
|
||||
processedContent = await StartTheCache(plexSettings, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
await StartTheCache(plexSettings, false);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored");
|
||||
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content");
|
||||
}
|
||||
|
||||
_processedContent = new ProcessedContent();
|
||||
|
||||
await base.Execute(context);
|
||||
|
||||
if (!recentlyAddedSearch)
|
||||
{
|
||||
await NotifyClient("Plex Sync - Starting Episode Sync");
|
||||
|
@ -118,53 +92,32 @@ namespace Ombi.Schedule.Jobs.Plex
|
|||
await OmbiQuartz.TriggerJob(nameof(IPlexEpisodeSync), "Plex");
|
||||
}
|
||||
|
||||
if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
|
||||
if ((_processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
|
||||
{
|
||||
await NotifyClient("Plex Sync - Checking if any requests are now available");
|
||||
Logger.LogInformation("Kicking off Plex Availability Checker");
|
||||
await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex");
|
||||
}
|
||||
var processedCont = processedContent?.Content?.Count() ?? 0;
|
||||
var processedEp = processedContent?.Episodes?.Count() ?? 0;
|
||||
var processedCont = _processedContent?.Content?.Count() ?? 0;
|
||||
var processedEp = _processedContent?.Episodes?.Count() ?? 0;
|
||||
Logger.LogInformation("Finished Plex Content Cacher, with processed content: {0}, episodes: {1}. Recently Added Scan: {2}", processedCont, processedEp, recentlyAddedSearch);
|
||||
|
||||
await NotifyClient(recentlyAddedSearch ? $"Plex Recently Added Sync Finished, We processed {processedCont}, and {processedEp} Episodes" : "Plex Content Sync Finished");
|
||||
|
||||
await _mediaCacheService.Purge();
|
||||
}
|
||||
|
||||
private async Task<ProcessedContent> StartTheCache(PlexSettings plexSettings, bool recentlyAddedSearch)
|
||||
{
|
||||
var processedContent = new ProcessedContent();
|
||||
foreach (var servers in plexSettings.Servers ?? new List<PlexServers>())
|
||||
// Played state
|
||||
var isPlayedSyncEnabled = await _feature.FeatureEnabled(FeatureNames.PlayedSync);
|
||||
if(isPlayedSyncEnabled)
|
||||
{
|
||||
try
|
||||
{
|
||||
Logger.LogInformation("Starting to cache the content on server {0}", servers.Name);
|
||||
|
||||
if (recentlyAddedSearch)
|
||||
{
|
||||
// If it's recently added search then we want the results to pass to the metadata job
|
||||
// This way the metadata job is smaller in size to process, it only need to look at newly added shit
|
||||
return await ProcessServer(servers, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
await ProcessServer(servers, false);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name);
|
||||
}
|
||||
await OmbiQuartz.Scheduler.TriggerJob(new JobKey(nameof(IPlexPlayedSync), "Plex"), new JobDataMap(new Dictionary<string, string> { { JobDataKeys.RecentlyAddedSearch, recentlyAddedSearch.ToString() } }));
|
||||
}
|
||||
|
||||
return processedContent;
|
||||
await _mediaCacheService.Purge();
|
||||
|
||||
}
|
||||
|
||||
private async Task<ProcessedContent> ProcessServer(PlexServers servers, bool recentlyAddedSearch)
|
||||
|
||||
protected override async Task ProcessServer(PlexServers servers)
|
||||
{
|
||||
var retVal = new ProcessedContent();
|
||||
var contentProcessed = new Dictionary<int, string>();
|
||||
var episodesProcessed = new List<int>();
|
||||
Logger.LogDebug("Getting all content from server {0}", servers.Name);
|
||||
|
@ -282,9 +235,8 @@ namespace Ombi.Schedule.Jobs.Plex
|
|||
}
|
||||
}
|
||||
|
||||
retVal.Content = contentProcessed.Values;
|
||||
retVal.Episodes = episodesProcessed;
|
||||
return retVal;
|
||||
_processedContent.Content = contentProcessed.Values;
|
||||
_processedContent.Episodes = episodesProcessed;
|
||||
}
|
||||
|
||||
public async Task MovieLoop(PlexServers servers, Mediacontainer content, HashSet<PlexServerContent> contentToAdd,
|
||||
|
@ -693,45 +645,27 @@ namespace Ombi.Schedule.Jobs.Plex
|
|||
/// <returns></returns>
|
||||
private async Task<List<Mediacontainer>> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch)
|
||||
{
|
||||
var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri);
|
||||
|
||||
var libs = new List<Mediacontainer>();
|
||||
if (sections != null)
|
||||
{
|
||||
foreach (var dir in sections.MediaContainer.Directory ?? new List<Directory>())
|
||||
{
|
||||
if (plexSettings.PlexSelectedLibraries.Any())
|
||||
{
|
||||
if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled))
|
||||
{
|
||||
// Only get the enabled libs
|
||||
var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled)
|
||||
.Select(x => x.Key.ToString()).ToList();
|
||||
if (!keys.Contains(dir.key))
|
||||
{
|
||||
Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key);
|
||||
// We are not monitoring this lib
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (recentlyAddedSearch)
|
||||
var directories = await GetEnabledLibraries(plexSettings);
|
||||
|
||||
foreach (var directory in directories)
|
||||
{
|
||||
if (recentlyAddedSearch)
|
||||
{
|
||||
var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri,
|
||||
directory.key);
|
||||
if (container != null)
|
||||
{
|
||||
var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri,
|
||||
dir.key);
|
||||
if (container != null)
|
||||
{
|
||||
libs.Add(container.MediaContainer);
|
||||
}
|
||||
libs.Add(container.MediaContainer);
|
||||
}
|
||||
else
|
||||
}
|
||||
else
|
||||
{
|
||||
var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, directory.key);
|
||||
if (lib != null)
|
||||
{
|
||||
var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, dir.key);
|
||||
if (lib != null)
|
||||
{
|
||||
libs.Add(lib.MediaContainer);
|
||||
}
|
||||
libs.Add(lib.MediaContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -739,25 +673,6 @@ namespace Ombi.Schedule.Jobs.Plex
|
|||
return libs;
|
||||
}
|
||||
|
||||
private async Task NotifyClient(string message)
|
||||
{
|
||||
await Notification.SendNotificationToAdmins($"Plex Sync - {message}");
|
||||
}
|
||||
|
||||
private static bool ValidateSettings(PlexSettings plex)
|
||||
{
|
||||
if (plex.Enable)
|
||||
{
|
||||
foreach (var server in plex.Servers ?? new List<PlexServers>())
|
||||
{
|
||||
if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return plex.Enable;
|
||||
}
|
||||
|
||||
private bool _disposed;
|
||||
|
||||
|
|
163
src/Ombi.Schedule/Jobs/Plex/PlexLibrarySync.cs
Normal file
163
src/Ombi.Schedule/Jobs/Plex/PlexLibrarySync.cs
Normal file
|
@ -0,0 +1,163 @@
|
|||
#region Copyright
|
||||
// /************************************************************************
|
||||
// Copyright (c) 2017 Jamie Rees
|
||||
// File: PlexServerContentCacher.cs
|
||||
// Created By: Jamie Rees
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining
|
||||
// a copy of this software and associated documentation files (the
|
||||
// "Software"), to deal in the Software without restriction, including
|
||||
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
// distribute, sublicense, and/or sell copies of the Software, and to
|
||||
// permit persons to whom the Software is furnished to do so, subject to
|
||||
// the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be
|
||||
// included in all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
// ************************************************************************/
|
||||
#endregion
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Ombi.Api.Plex;
|
||||
using Ombi.Api.Plex.Models;
|
||||
using Ombi.Core.Settings;
|
||||
using Ombi.Core.Settings.Models.External;
|
||||
using Ombi.Helpers;
|
||||
using Ombi.Hubs;
|
||||
using Quartz;
|
||||
|
||||
namespace Ombi.Schedule.Jobs.Plex
|
||||
{
|
||||
public abstract class PlexLibrarySync
|
||||
{
|
||||
|
||||
public PlexLibrarySync(
|
||||
ISettingsService<PlexSettings> plex,
|
||||
IPlexApi plexApi,
|
||||
ILogger<PlexLibrarySync> logger,
|
||||
INotificationHubService notificationHubService)
|
||||
{
|
||||
PlexApi = plexApi;
|
||||
Plex = plex;
|
||||
Logger = logger;
|
||||
Notification = notificationHubService;
|
||||
}
|
||||
protected ILogger<PlexLibrarySync> Logger { get; }
|
||||
protected IPlexApi PlexApi { get; }
|
||||
protected ISettingsService<PlexSettings> Plex { get; }
|
||||
private INotificationHubService Notification { get; set; }
|
||||
protected bool recentlyAddedSearch;
|
||||
public virtual async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
JobDataMap dataMap = context.MergedJobDataMap;
|
||||
recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch);
|
||||
|
||||
var plexSettings = await Plex.GetSettingsAsync();
|
||||
if (!plexSettings.Enable)
|
||||
{
|
||||
return;
|
||||
}
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started");
|
||||
if (!ValidateSettings(plexSettings))
|
||||
{
|
||||
Logger.LogError("Plex Settings are not valid");
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid");
|
||||
return;
|
||||
}
|
||||
Logger.LogInformation(recentlyAddedSearch
|
||||
? "Starting Plex Content Cacher Recently Added Scan"
|
||||
: "Starting Plex Content Cacher");
|
||||
try
|
||||
{
|
||||
await StartTheCache(plexSettings);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored");
|
||||
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StartTheCache(PlexSettings plexSettings)
|
||||
{
|
||||
foreach (var servers in plexSettings.Servers ?? new List<PlexServers>())
|
||||
{
|
||||
try
|
||||
{
|
||||
Logger.LogInformation("Starting to cache the content on server {0}", servers.Name);
|
||||
await ProcessServer(servers);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected async Task<List<Directory>> GetEnabledLibraries(PlexServers plexSettings)
|
||||
{
|
||||
var result = new List<Directory>();
|
||||
var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri);
|
||||
|
||||
if (sections != null)
|
||||
{
|
||||
foreach (var dir in sections.MediaContainer.Directory ?? new List<Directory>())
|
||||
{
|
||||
if (plexSettings.PlexSelectedLibraries.Any())
|
||||
{
|
||||
if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled))
|
||||
{
|
||||
// Only get the enabled libs
|
||||
var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled)
|
||||
.Select(x => x.Key.ToString()).ToList();
|
||||
if (!keys.Contains(dir.key))
|
||||
{
|
||||
Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key);
|
||||
// We are not monitoring this lib
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
result.Add(dir);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
protected abstract Task ProcessServer(PlexServers servers);
|
||||
|
||||
protected async Task NotifyClient(string message)
|
||||
{
|
||||
await Notification.SendNotificationToAdmins($"Plex Sync - {message}");
|
||||
}
|
||||
private static bool ValidateSettings(PlexSettings plex)
|
||||
{
|
||||
if (plex.Enable)
|
||||
{
|
||||
foreach (var server in plex.Servers ?? new List<PlexServers>())
|
||||
{
|
||||
if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return plex.Enable;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
199
src/Ombi.Schedule/Jobs/Plex/PlexPlayedSync.cs
Normal file
199
src/Ombi.Schedule/Jobs/Plex/PlexPlayedSync.cs
Normal file
|
@ -0,0 +1,199 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Ombi.Api.Plex;
|
||||
using Ombi.Api.Plex.Models;
|
||||
using Ombi.Core.Authentication;
|
||||
using Ombi.Core.Settings;
|
||||
using Ombi.Core.Settings.Models.External;
|
||||
using Ombi.Helpers;
|
||||
using Ombi.Hubs;
|
||||
using Ombi.Store.Entities;
|
||||
using Ombi.Store.Repository;
|
||||
|
||||
namespace Ombi.Schedule.Jobs.Plex
|
||||
{
|
||||
public class PlexPlayedSync : PlexLibrarySync, IPlexPlayedSync
|
||||
{
|
||||
public PlexPlayedSync(
|
||||
ISettingsService<PlexSettings> plex,
|
||||
IPlexApi plexApi,
|
||||
ILogger<PlexLibrarySync> logger,
|
||||
IPlexContentRepository contentRepo,
|
||||
INotificationHubService notificationHubService,
|
||||
OmbiUserManager user,
|
||||
IUserPlayedMovieRepository movieRepo,
|
||||
IUserPlayedEpisodeRepository episodeRepo):
|
||||
base(plex, plexApi, logger, notificationHubService)
|
||||
{
|
||||
_contentRepo = contentRepo;
|
||||
_userManager = user;
|
||||
_movieRepo = movieRepo;
|
||||
_episodeRepo = episodeRepo;
|
||||
Plex.ClearCache();
|
||||
}
|
||||
|
||||
private IPlexContentRepository _contentRepo { get; }
|
||||
private OmbiUserManager _userManager { get; }
|
||||
private readonly IUserPlayedMovieRepository _movieRepo;
|
||||
private readonly IUserPlayedEpisodeRepository _episodeRepo;
|
||||
|
||||
private const int recentlyAddedAmountToTake = 5;
|
||||
|
||||
protected override async Task ProcessServer(PlexServers servers)
|
||||
{
|
||||
var allUsers = await _userManager.GetPlexUsersWithValidTokens();
|
||||
foreach (var user in allUsers)
|
||||
{
|
||||
await ProcessUser(servers, recentlyAddedSearch, user);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessUser(PlexServers servers, bool recentlyAddedSearch, OmbiUser user)
|
||||
{
|
||||
|
||||
var contentProcessed = new Dictionary<int, string>();
|
||||
var episodesProcessed = new List<int>();
|
||||
Logger.LogDebug($"Getting all played content from server {servers.Name} for user {user.Alias}");
|
||||
var allContent = await GetAllContent(servers, recentlyAddedSearch, user);
|
||||
Logger.LogDebug("We found {0} items", allContent.Count);
|
||||
|
||||
|
||||
// Let's now process this.
|
||||
var episodesToAdd = new HashSet<UserPlayedEpisode>();
|
||||
var moviesToAdd = new HashSet<UserPlayedMovie>();
|
||||
|
||||
|
||||
foreach (var content in allContent.OrderByDescending(x => x.viewGroup))
|
||||
{
|
||||
Logger.LogDebug($"Got type '{content.viewGroup}' to process");
|
||||
if (content.viewGroup.Equals(PlexMediaType.Show.ToString(), StringComparison.InvariantCultureIgnoreCase))
|
||||
{
|
||||
foreach (var epInfo in content.Metadata ?? new Metadata[] { })
|
||||
{
|
||||
await ProcessEpisode(epInfo, user, episodesToAdd);
|
||||
}
|
||||
|
||||
}
|
||||
if (content.viewGroup.Equals(PlexMediaType.Movie.ToString(), StringComparison.InvariantCultureIgnoreCase))
|
||||
{
|
||||
Logger.LogDebug("Processing Movies");
|
||||
foreach (var movie in content?.Metadata ?? Array.Empty<Metadata>())
|
||||
{
|
||||
await ProcessMovie(movie, user, moviesToAdd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await _movieRepo.AddRange(moviesToAdd);
|
||||
await _episodeRepo.AddRange(episodesToAdd);
|
||||
|
||||
}
|
||||
|
||||
private async Task ProcessEpisode(Metadata epInfo, OmbiUser user, ICollection<UserPlayedEpisode> content)
|
||||
{
|
||||
var episode = await _contentRepo.GetEpisodeByKey(epInfo.ratingKey);
|
||||
if (episode == null || episode.Series == null)
|
||||
{
|
||||
Logger.LogInformation($"The episode {epInfo.title} does not relate to a series, so we cannot save this");
|
||||
return;
|
||||
}
|
||||
if (episode.Series.TheMovieDbId.IsNullOrEmpty())
|
||||
{
|
||||
Logger.LogWarning($"Episode {epInfo.title} is not linked to a TMDB series. Skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
await AddToContent(content, new UserPlayedEpisode()
|
||||
{
|
||||
TheMovieDbId = int.Parse(episode.Series.TheMovieDbId),
|
||||
SeasonNumber = episode.SeasonNumber,
|
||||
EpisodeNumber = episode.EpisodeNumber,
|
||||
UserId = user.Id
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private async Task AddToContent(ICollection<UserPlayedEpisode> content, UserPlayedEpisode episode)
|
||||
{
|
||||
|
||||
// Check if it exists
|
||||
var existingEpisode = await _episodeRepo.Get(episode.TheMovieDbId, episode.SeasonNumber, episode.EpisodeNumber, episode.UserId);
|
||||
var alreadyGoingToAdd = content.Any(x =>
|
||||
x.TheMovieDbId == episode.TheMovieDbId
|
||||
&& x.SeasonNumber == episode.SeasonNumber
|
||||
&& x.EpisodeNumber == episode.EpisodeNumber
|
||||
&& x.UserId == episode.UserId);
|
||||
if (existingEpisode == null && !alreadyGoingToAdd)
|
||||
{
|
||||
content.Add(episode);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task ProcessMovie(Metadata movie, OmbiUser user, ICollection<UserPlayedMovie> content)
|
||||
{
|
||||
var cachedMovie = await _contentRepo.GetByKey(movie.ratingKey);
|
||||
if (cachedMovie == null || cachedMovie.TheMovieDbId.IsNullOrEmpty() )
|
||||
{
|
||||
Logger.LogWarning($"Movie {movie.title} has no relevant metadata. Skipping.");
|
||||
return;
|
||||
}
|
||||
var userPlayedMovie = new UserPlayedMovie()
|
||||
{
|
||||
TheMovieDbId = int.Parse(cachedMovie.TheMovieDbId),
|
||||
UserId = user.Id
|
||||
};
|
||||
// Check if it exists
|
||||
var existingMovie = await _movieRepo.Get(userPlayedMovie.TheMovieDbId, userPlayedMovie.UserId);
|
||||
var alreadyGoingToAdd = content.Any(x => x.TheMovieDbId == userPlayedMovie.TheMovieDbId && x.UserId == userPlayedMovie.UserId);
|
||||
if (existingMovie == null && !alreadyGoingToAdd)
|
||||
{
|
||||
content.Add(userPlayedMovie);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task<List<Mediacontainer>> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch, OmbiUser user)
|
||||
{
|
||||
var libs = new List<Mediacontainer>();
|
||||
|
||||
var directories = await GetEnabledLibraries(plexSettings);
|
||||
|
||||
foreach (var directory in directories)
|
||||
{
|
||||
var maxNumberOfItems = 0;
|
||||
if (recentlyAddedSearch)
|
||||
{
|
||||
maxNumberOfItems = recentlyAddedAmountToTake;
|
||||
}
|
||||
var container = await PlexApi.GetPlayed(user.MediaServerToken, plexSettings.FullUri,
|
||||
directory.key, maxNumberOfItems);
|
||||
if (container != null)
|
||||
{
|
||||
libs.Add(container.MediaContainer);
|
||||
}
|
||||
}
|
||||
|
||||
return libs;
|
||||
}
|
||||
|
||||
|
||||
private bool _disposed;
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (_disposed)
|
||||
return;
|
||||
|
||||
_disposed = true;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -64,30 +64,12 @@ namespace Ombi.Schedule.Jobs.Plex
|
|||
return;
|
||||
}
|
||||
|
||||
var plexUsersWithTokens = _ombiUserManager.Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList();
|
||||
_logger.LogInformation($"Found {plexUsersWithTokens.Count} users with tokens");
|
||||
await NotifyClient("Starting Watchlist Import");
|
||||
|
||||
foreach (var user in plexUsersWithTokens)
|
||||
var plexUsers = await _ombiUserManager.GetPlexUsersWithValidTokens();
|
||||
foreach (var user in plexUsers)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Check if the user has errors and the token is the same (not refreshed)
|
||||
var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync();
|
||||
if (failedUser != null)
|
||||
{
|
||||
if (failedUser.MediaServerToken.Equals(user.MediaServerToken))
|
||||
{
|
||||
_logger.LogInformation($"Skipping Plex Watchlist Import for user '{user.UserName}' as they failed previously and the token has not yet been refreshed");
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
// remove that guy
|
||||
await _userError.Delete(failedUser);
|
||||
failedUser = null;
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogDebug($"Starting Watchlist Import for {user.UserName} with token {user.MediaServerToken}");
|
||||
var watchlist = await _plexApi.GetWatchlist(user.MediaServerToken, context?.CancellationToken ?? CancellationToken.None);
|
||||
|
|
|
@ -90,6 +90,7 @@ namespace Ombi.Schedule
|
|||
await OmbiQuartz.Instance.AddJob<IPlexContentSync>(nameof(IPlexContentSync) + "RecentlyAdded", "Plex", JobSettingsHelper.PlexRecentlyAdded(s), new Dictionary<string, string> { { JobDataKeys.RecentlyAddedSearch, "true" } });
|
||||
await OmbiQuartz.Instance.AddJob<IPlexUserImporter>(nameof(IPlexUserImporter), "Plex", JobSettingsHelper.UserImporter(s));
|
||||
await OmbiQuartz.Instance.AddJob<IPlexEpisodeSync>(nameof(IPlexEpisodeSync), "Plex", null);
|
||||
await OmbiQuartz.Instance.AddJob<IPlexPlayedSync>(nameof(IPlexPlayedSync), "Plex", null);
|
||||
await OmbiQuartz.Instance.AddJob<IPlexAvailabilityChecker>(nameof(IPlexAvailabilityChecker), "Plex", null);
|
||||
await OmbiQuartz.Instance.AddJob<IPlexWatchlistImport>(nameof(IPlexWatchlistImport), "Plex", JobSettingsHelper.PlexWatchlistImport(s));
|
||||
}
|
||||
|
|
|
@ -156,7 +156,7 @@ namespace Ombi.Store.Repository
|
|||
|
||||
public async Task<PlexEpisode> GetEpisodeByKey(string key)
|
||||
{
|
||||
return await Db.PlexEpisode.FirstOrDefaultAsync(x => x.Key == key);
|
||||
return await Db.PlexEpisode.Include(x => x.Series).FirstOrDefaultAsync(x => x.Key == key);
|
||||
}
|
||||
public override async Task AddRange(IEnumerable<IMediaServerEpisode> content)
|
||||
{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue