From bbf9c4ba514da9d02e820a3cb9d33a02bf60db7f Mon Sep 17 00:00:00 2001 From: Jamie Rees Date: Thu, 21 May 2020 23:51:18 +0100 Subject: [PATCH] Fixed the exception being thown when reading from multiple IQuerables --- src/Ombi.Helpers/OmbiQuartz.cs | 19 +++++---- .../Jobs/Emby/EmbyEpisodeSync.cs | 3 +- .../Jobs/Ombi/RefreshMetadata.cs | 10 +++-- .../Jobs/Plex/PlexAvailabilityChecker.cs | 41 ++++++++++++------- .../Jobs/Plex/PlexContentSync.cs | 4 +- .../Jobs/Plex/PlexEpisodeSync.cs | 11 +---- src/Ombi.Store/Repository/BaseRepository.cs | 21 +--------- 7 files changed, 52 insertions(+), 57 deletions(-) diff --git a/src/Ombi.Helpers/OmbiQuartz.cs b/src/Ombi.Helpers/OmbiQuartz.cs index e5be66a6c..e876296d5 100644 --- a/src/Ombi.Helpers/OmbiQuartz.cs +++ b/src/Ombi.Helpers/OmbiQuartz.cs @@ -11,7 +11,7 @@ namespace Ombi.Helpers public class OmbiQuartz { protected IScheduler _scheduler { get; set; } - + public static IScheduler Scheduler => Instance._scheduler; // Singleton @@ -31,14 +31,14 @@ namespace Ombi.Helpers { _scheduler = await new StdSchedulerFactory().GetScheduler(); } - + public IScheduler UseJobFactory(IJobFactory jobFactory) { Scheduler.JobFactory = jobFactory; return Scheduler; } - public static async Task IsJobRunnung(string jobName) + public static async Task IsJobRunning(string jobName) { var running = await Scheduler.GetCurrentlyExecutingJobs(); return running.Any(x => x.JobDetail.Key.Name.Equals(jobName, StringComparison.InvariantCultureIgnoreCase)); @@ -57,7 +57,7 @@ namespace Ombi.Helpers } } - if(!cronExpression.HasValue()) + if (!cronExpression.HasValue()) { jobBuilder.StoreDurably(true); } @@ -67,23 +67,26 @@ namespace Ombi.Helpers { ITrigger jobTrigger = TriggerBuilder.Create() .WithIdentity(name + "Trigger", group) - .WithCronSchedule(cronExpression, + .WithCronSchedule(cronExpression, x => x.WithMisfireHandlingInstructionFireAndProceed()) .ForJob(name, group) .StartNow() .Build(); await Scheduler.ScheduleJob(job, jobTrigger); - } + } else { await Scheduler.AddJob(job, true); } - + } public static async Task TriggerJob(string jobName, string group) { - await Scheduler.TriggerJob(new JobKey(jobName, group)); + if (!(await IsJobRunning(jobName))) + { + await Scheduler.TriggerJob(new JobKey(jobName, group)); + } } public static async Task TriggerJob(string jobName, string group, IDictionary data) diff --git a/src/Ombi.Schedule/Jobs/Emby/EmbyEpisodeSync.cs b/src/Ombi.Schedule/Jobs/Emby/EmbyEpisodeSync.cs index 6dda89979..491ff3363 100644 --- a/src/Ombi.Schedule/Jobs/Emby/EmbyEpisodeSync.cs +++ b/src/Ombi.Schedule/Jobs/Emby/EmbyEpisodeSync.cs @@ -39,6 +39,7 @@ using Ombi.Helpers; using Ombi.Store.Entities; using Ombi.Store.Repository; using Quartz; +using Ombi.Schedule.Jobs.Ombi; namespace Ombi.Schedule.Jobs.Emby { @@ -77,7 +78,7 @@ namespace Ombi.Schedule.Jobs.Emby await _notification.Clients.Clients(NotificationHub.AdminConnectionIds) .SendAsync(NotificationHub.NotificationEvent, "Emby Episode Sync Finished"); - await OmbiQuartz.TriggerJob(nameof(IEmbyAvaliabilityChecker), "Emby"); + await OmbiQuartz.TriggerJob(nameof(IRefreshMetadata), "System"); } private async Task CacheEpisodes(EmbyServers server) diff --git a/src/Ombi.Schedule/Jobs/Ombi/RefreshMetadata.cs b/src/Ombi.Schedule/Jobs/Ombi/RefreshMetadata.cs index c5e86cdb7..aa233ab59 100644 --- a/src/Ombi.Schedule/Jobs/Ombi/RefreshMetadata.cs +++ b/src/Ombi.Schedule/Jobs/Ombi/RefreshMetadata.cs @@ -61,12 +61,16 @@ namespace Ombi.Schedule.Jobs.Ombi if (settings.Enable) { await StartPlex(); + + await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex"); } var embySettings = await _embySettings.GetSettingsAsync(); if (embySettings.Enable) { await StartEmby(embySettings); + + await OmbiQuartz.TriggerJob(nameof(IEmbyAvaliabilityChecker), "Emby"); } } catch (Exception e) @@ -85,8 +89,8 @@ namespace Ombi.Schedule.Jobs.Ombi private async Task StartPlex() { // Ensure we check that we have not linked this item to a request - var allMovies = await _plexRepo.GetAll().Where(x => - x.Type == PlexMediaTypeEntity.Movie && x.RequestId == null && (x.TheMovieDbId == null || x.ImdbId == null)).ToListAsync(); + var allMovies = await _plexRepo.GetAll().Where(x => + x.Type == PlexMediaTypeEntity.Movie && x.RequestId == null && (x.TheMovieDbId == null || x.ImdbId == null)).ToListAsync(); await StartPlexMovies(allMovies); // Now Tv @@ -105,7 +109,7 @@ namespace Ombi.Schedule.Jobs.Ombi private async Task StartPlexTv(List allTv) { foreach (var show in allTv) - { + { // Just double check there is no associated request id if (show.RequestId.HasValue) { diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexAvailabilityChecker.cs b/src/Ombi.Schedule/Jobs/Plex/PlexAvailabilityChecker.cs index de2abafb4..067d3803c 100644 --- a/src/Ombi.Schedule/Jobs/Plex/PlexAvailabilityChecker.cs +++ b/src/Ombi.Schedule/Jobs/Plex/PlexAvailabilityChecker.cs @@ -60,15 +60,15 @@ namespace Ombi.Schedule.Jobs.Plex .SendAsync(NotificationHub.NotificationEvent, "Plex Availability Check Finished"); } - private Task ProcessTv() + private async Task ProcessTv() { - var tv = _tvRepo.GetChild().Where(x => !x.Available).AsNoTracking(); - return ProcessTv(tv); + var tv = await _tvRepo.GetChild().Where(x => !x.Available).ToListAsync(); + await ProcessTv(tv); } - private async Task ProcessTv(IQueryable tv) + private async Task ProcessTv(List tv) { - var plexEpisodes = _repo.GetAllEpisodes().Include(x => x.Series).AsNoTracking(); + var plexEpisodes = _repo.GetAllEpisodes().Include(x => x.Series); foreach (var child in tv) { @@ -119,11 +119,11 @@ namespace Ombi.Schedule.Jobs.Plex { continue; } - var foundEp = await seriesEpisodes.FirstOrDefaultAsync( + var foundEp = await seriesEpisodes.AnyAsync( x => x.EpisodeNumber == episode.EpisodeNumber && x.SeasonNumber == episode.Season.SeasonNumber); - if (foundEp != null) + if (foundEp) { availableEpisode.Add(new AvailabilityModel { @@ -135,18 +135,24 @@ namespace Ombi.Schedule.Jobs.Plex } //TODO Partial avilability notifications here - foreach(var c in availableEpisode) + if (availableEpisode.Any()) { - await _tvRepo.MarkEpisodeAsAvailable(c.Id); + await _tvRepo.Save(); } - + //foreach(var c in availableEpisode) + //{ + // await _tvRepo.MarkEpisodeAsAvailable(c.Id); + //} + // Check to see if all of the episodes in all seasons are available for this request var allAvailable = child.SeasonRequests.All(x => x.Episodes.All(c => c.Available)); if (allAvailable) { + child.Available = true; + child.MarkedAsAvailable = DateTime.UtcNow; _log.LogInformation("[PAC] - Child request {0} is now available, sending notification", $"{child.Title} - {child.Id}"); // We have ful-fulled this request! - await _tvRepo.MarkChildAsAvailable(child.Id); + await _tvRepo.Save(); await _notificationService.Notify(new NotificationOptions { DateTime = DateTime.Now, @@ -164,7 +170,7 @@ namespace Ombi.Schedule.Jobs.Plex private async Task ProcessMovies() { // Get all non available - var movies = _movieRepo.GetAll().Include(x => x.RequestedUser).Where(x => !x.Available).AsNoTracking(); + var movies = _movieRepo.GetAll().Include(x => x.RequestedUser).Where(x => !x.Available); var itemsForAvailbility = new List(); foreach (var movie in movies) @@ -191,8 +197,10 @@ namespace Ombi.Schedule.Jobs.Plex // We don't yet have this continue; } - + _log.LogInformation("[PAC] - Movie request {0} is now available, sending notification", $"{movie.Title} - {movie.Id}"); + movie.Available = true; + movie.MarkedAsAvailable = DateTime.UtcNow; itemsForAvailbility.Add(new AvailabilityModel { Id = movie.Id, @@ -200,9 +208,12 @@ namespace Ombi.Schedule.Jobs.Plex }); } + if (itemsForAvailbility.Any()) + { + await _movieRepo.SaveChangesAsync(); + } foreach (var i in itemsForAvailbility) { - await _movieRepo.MarkAsAvailable(i.Id); await _notificationService.Notify(new NotificationOptions { @@ -214,7 +225,7 @@ namespace Ombi.Schedule.Jobs.Plex }); } - await _repo.SaveChangesAsync(); + //await _repo.SaveChangesAsync(); } private bool _disposed; diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs b/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs index 27e6e3daa..bed0e7eb7 100644 --- a/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs +++ b/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs @@ -116,7 +116,7 @@ namespace Ombi.Schedule.Jobs.Plex if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch) { // Ensure it's not already running - if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker))) + if (await OmbiQuartz.IsJobRunning(nameof(IPlexAvailabilityChecker))) { Logger.LogInformation("Availability checker already running"); } @@ -131,7 +131,7 @@ namespace Ombi.Schedule.Jobs.Plex if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch) { // Ensure it's not already running - if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker))) + if (await OmbiQuartz.IsJobRunning(nameof(IPlexAvailabilityChecker))) { Logger.LogInformation("Availability checker already running"); } diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexEpisodeSync.cs b/src/Ombi.Schedule/Jobs/Plex/PlexEpisodeSync.cs index 5b4e3fa5d..8d92ef238 100644 --- a/src/Ombi.Schedule/Jobs/Plex/PlexEpisodeSync.cs +++ b/src/Ombi.Schedule/Jobs/Plex/PlexEpisodeSync.cs @@ -63,16 +63,9 @@ namespace Ombi.Schedule.Jobs.Plex _log.LogError(LoggingEvents.Cacher, e, "Caching Episodes Failed"); } + await OmbiQuartz.TriggerJob(nameof(IRefreshMetadata), "System"); - // Ensure it's not already running - if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker))) - { - _log.LogInformation("Availability checker already running"); - } - else - { - await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex"); - } + await _notification.Clients.Clients(NotificationHub.AdminConnectionIds) .SendAsync(NotificationHub.NotificationEvent, "Plex Episode Sync Finished"); } diff --git a/src/Ombi.Store/Repository/BaseRepository.cs b/src/Ombi.Store/Repository/BaseRepository.cs index 175f0a84b..2908309ce 100644 --- a/src/Ombi.Store/Repository/BaseRepository.cs +++ b/src/Ombi.Store/Repository/BaseRepository.cs @@ -85,25 +85,8 @@ namespace Ombi.Store.Repository protected async Task InternalSaveChanges() { - var policy = Policy - .Handle() - .WaitAndRetryAsync(new[] - { - TimeSpan.FromSeconds(1), - TimeSpan.FromSeconds(5), - TimeSpan.FromSeconds(10) - }); - - var result = await policy.ExecuteAndCaptureAsync(async () => - { - //using (var tran = await _ctx.Database.BeginTransactionAsync()) - { - var r = await _ctx.SaveChangesAsync(); - //tran.Commit(); - return r; - } - }); - return result.Result; + var r = await _ctx.SaveChangesAsync(); + return r; }