Finished #633 (First part of the queuing)

This commit is contained in:
Jamie.Rees 2016-11-08 14:15:33 +00:00
parent 1c7fb2e93e
commit 2bd7ece9d0
10 changed files with 460 additions and 155 deletions

View file

@ -26,7 +26,9 @@
#endregion
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using PlexRequests.Helpers;
using PlexRequests.Store;
using PlexRequests.Store.Models;
@ -34,7 +36,7 @@ using PlexRequests.Store.Repository;
namespace PlexRequests.Core.Queue
{
public class TransientFaultQueue
public class TransientFaultQueue : ITransientFaultQueue
{
public TransientFaultQueue(IRepository<RequestQueue> queue)
{
@ -44,44 +46,84 @@ namespace PlexRequests.Core.Queue
private IRepository<RequestQueue> RequestQueue { get; }
public void QueueItem(RequestedModel request, RequestType type)
public void QueueItem(RequestedModel request, RequestType type, FaultType faultType)
{
//Ensure there is not a duplicate queued item
var existingItem = RequestQueue.Custom(
connection =>
{
connection.Open();
var result = connection.Query<RequestQueue>("select * from RequestQueue where PrimaryIdentifier = @ProviderId", new { ProviderId = request.ProviderId });
return result;
}).FirstOrDefault();
if (existingItem != null)
{
// It's already in the queue
return;
}
var queue = new RequestQueue
{
Type = type,
Content = ByteConverterHelper.ReturnBytes(request),
PrimaryIdentifier = request.ProviderId
PrimaryIdentifier = request.ProviderId,
FaultType = faultType
};
RequestQueue.Insert(queue);
}
public async Task QueueItemAsync(RequestedModel request, RequestType type)
public async Task QueueItemAsync(RequestedModel request, RequestType type, FaultType faultType)
{
//Ensure there is not a duplicate queued item
var existingItem = await RequestQueue.CustomAsync(async connection =>
{
connection.Open();
var result = await connection.QueryAsync<RequestQueue>("select * from RequestQueue where PrimaryIdentifier = @ProviderId", new { ProviderId = request.ProviderId });
return result;
});
if (existingItem.FirstOrDefault() != null)
{
// It's already in the queue
return;
}
var queue = new RequestQueue
{
Type = type,
Content = ByteConverterHelper.ReturnBytes(request),
PrimaryIdentifier = request.ProviderId
PrimaryIdentifier = request.ProviderId,
FaultType = faultType
};
await RequestQueue.InsertAsync(queue);
}
public IEnumerable<RequestQueue> Dequeue()
public IEnumerable<RequestQueue> GetQueue()
{
var items = RequestQueue.GetAll();
RequestQueue.DeleteAll("RequestQueue");
return items;
}
public async Task<IEnumerable<RequestQueue>> DequeueAsync()
public async Task<IEnumerable<RequestQueue>> GetQueueAsync()
{
var items = RequestQueue.GetAllAsync();
await RequestQueue.DeleteAllAsync("RequestQueue");
return await items;
}
public void Dequeue()
{
RequestQueue.DeleteAll("RequestQueue");
}
public async Task DequeueAsync()
{
await RequestQueue.DeleteAllAsync("RequestQueue");
}
}
}