diff --git a/Hangfire.AzureDocumentDB.nuspec b/Hangfire.AzureDocumentDB.nuspec new file mode 100644 index 0000000..c5760c0 --- /dev/null +++ b/Hangfire.AzureDocumentDB.nuspec @@ -0,0 +1,26 @@ + + + + Hangfire.Firebase + 1.0.0-beta + Imran Momin + Imran Momin + https://github.com/imranmomin/Hangfire.AzureDocumentDB/blob/master/LICENSE + https://github.com/imranmomin/Hangfire.AzureDocumentDB + false + This repo will add Microsoft Azure DocumentDB storage provider for Hangfire + Copyright 2017 + Hangfire Azure DocumentDB + en-US + + + + + + + + + + + + \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB.sln b/Hangfire.AzureDocumentDB.sln new file mode 100644 index 0000000..0e69caf --- /dev/null +++ b/Hangfire.AzureDocumentDB.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 14 +VisualStudioVersion = 14.0.25123.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Hangfire.AzureDocumentDB", "Hangfire.AzureDocumentDB\Hangfire.AzureDocumentDB.csproj", "{E0AD3801-5504-49A7-80C6-8B4373DDE0E5}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {E0AD3801-5504-49A7-80C6-8B4373DDE0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E0AD3801-5504-49A7-80C6-8B4373DDE0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E0AD3801-5504-49A7-80C6-8B4373DDE0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E0AD3801-5504-49A7-80C6-8B4373DDE0E5}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/Hangfire.AzureDocumentDB/App.config b/Hangfire.AzureDocumentDB/App.config new file mode 100644 index 0000000..302a320 --- /dev/null +++ b/Hangfire.AzureDocumentDB/App.config @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbConnection.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbConnection.cs new file mode 100644 index 0000000..503c95b --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbConnection.cs @@ -0,0 +1,512 @@ +using System; +using System.Net; +using System.Linq; +using System.Threading; +using System.Globalization; +using System.Collections.Generic; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +using Hangfire.Common; +using Hangfire.Server; +using Hangfire.Storage; +using Hangfire.AzureDocumentDB.Queue; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ + internal sealed class AzureDocumentDbConnection : JobStorageConnection + { + public AzureDocumentDbStorage Storage { get; } + public PersistentJobQueueProviderCollection QueueProviders { get; } + + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = -1 }; + private readonly Uri JobDocumentCollectionUri; + private readonly Uri StateDocumentCollectionUri; + private readonly Uri SetDocumentCollectionUri; + private readonly Uri CounterDocumentCollectionUri; + private readonly Uri ServerDocumentCollectionUri; + private readonly Uri HashDocumentCollectionUri; + private readonly Uri ListDocumentCollectionUri; + + public AzureDocumentDbConnection(AzureDocumentDbStorage storage) + { + Storage = storage; + QueueProviders = storage.QueueProviders; + + JobDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "jobs"); + StateDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "states"); + SetDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "sets"); + CounterDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "counters"); + ServerDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "servers"); + HashDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "hashes"); + ListDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(Storage.Options.DatabaseName, "lists"); + } + + public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout) => new AzureDocumentDbDistributedLock(resource, timeout, Storage); + public override IWriteOnlyTransaction CreateWriteTransaction() => new AzureDocumentDbWriteOnlyTransaction(this); + + #region Job + + public override string CreateExpiredJob(Common.Job job, IDictionary parameters, DateTime createdAt, TimeSpan expireIn) + { + if (job == null) throw new ArgumentNullException(nameof(job)); + if (parameters == null) throw new ArgumentNullException(nameof(parameters)); + + InvocationData invocationData = InvocationData.Serialize(job); + Entities.Job entityJob = new Entities.Job + { + InvocationData = invocationData, + Arguments = invocationData.Arguments, + CreatedOn = createdAt, + ExpireOn = createdAt.Add(expireIn), + + Parameters = parameters.Select(p => new Parameter + { + Name = p.Key, + Value = p.Value + }).ToArray() + }; + + ResourceResponse response = Storage.Client.CreateDocumentAsync(JobDocumentCollectionUri, entityJob).GetAwaiter().GetResult(); + if (response.StatusCode == HttpStatusCode.Created || response.StatusCode == HttpStatusCode.OK) + { + return entityJob.Id; + } + + return string.Empty; + } + + public override IFetchedJob FetchNextJob(string[] queues, CancellationToken cancellationToken) + { + if (queues == null || queues.Length == 0) throw new ArgumentNullException(nameof(queues)); + + IPersistentJobQueueProvider[] providers = queues.Select(q => QueueProviders.GetProvider(q)) + .Distinct() + .ToArray(); + + if (providers.Length != 1) + { + throw new InvalidOperationException($"Multiple provider instances registered for queues: {string.Join(", ", queues)}. You should choose only one type of persistent queues per server instance."); + } + + IPersistentJobQueue persistentQueue = providers.Single().GetJobQueue(); + IFetchedJob queue = persistentQueue.Dequeue(queues, cancellationToken); + return queue; + } + + public override JobData GetJobData(string jobId) + { + if (jobId == null) throw new ArgumentNullException(nameof(jobId)); + + Entities.Job data = Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .AsEnumerable() + .FirstOrDefault(); + + if (data != null) + { + InvocationData invocationData = data.InvocationData; + invocationData.Arguments = data.Arguments; + + Common.Job job = null; + JobLoadException loadException = null; + + try + { + job = invocationData.Deserialize(); + } + catch (JobLoadException ex) + { + loadException = ex; + } + + return new JobData + { + Job = job, + State = data.StateName, + CreatedAt = data.CreatedOn, + LoadException = loadException + }; + } + + return null; + } + + public override StateData GetStateData(string jobId) + { + if (jobId == null) throw new ArgumentNullException(nameof(jobId)); + + string stateId = Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .Select(j => j.StateId) + .AsEnumerable() + .FirstOrDefault(); + + if (!string.IsNullOrEmpty(stateId)) + { + State state = Storage.Client.CreateDocumentQuery(StateDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == stateId) + .AsEnumerable() + .FirstOrDefault(); + + if (state != null) + { + return new StateData + { + Name = state.Name, + Reason = state.Reason, + Data = state.Data + }; + } + } + + return null; + } + + #endregion + + #region Parameter + + public override string GetJobParameter(string id, string name) + { + if (id == null) throw new ArgumentNullException(nameof(id)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + List parameters = Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == id) + .AsEnumerable() + .SelectMany(j => j.Parameters) + .ToList(); + + return parameters.Where(p => p.Name == name).Select(p => p.Value).FirstOrDefault(); + } + + public override void SetJobParameter(string id, string name, string value) + { + if (id == null) throw new ArgumentNullException(nameof(id)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + Entities.Job job = Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == id) + .AsEnumerable() + .FirstOrDefault(); + + if (job != null) + { + List parameters = job.Parameters.ToList(); + + Parameter parameter = parameters.Find(p => p.Name == name); + if (parameter != null) parameter.Value = value; + else + { + parameter = new Parameter + { + Name = name, + Value = value + }; + parameters.Add(parameter); + } + + job.Parameters = parameters.ToArray(); + Storage.Client.ReplaceDocumentAsync(job.SelfLink, job).GetAwaiter().GetResult(); + } + } + + #endregion + + #region Set + + public override TimeSpan GetSetTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + DateTime? expireOn = Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key) + .Min(s => s.ExpireOn); + + return expireOn.HasValue ? expireOn.Value - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + public override List GetRangeFromSet(string key, int startingFrom, int endingAt) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key) + .AsEnumerable() + .Skip(startingFrom).Take(endingAt) + .Select(c => c.Value) + .ToList(); + } + + public override long GetCounter(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(CounterDocumentCollectionUri, QueryOptions) + .Where(c => c.Key == key) + .Sum(c => c.Value); + } + + public override long GetSetCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key) + .Select(s => s.Id) + .AsEnumerable() + .LongCount(); + } + + public override HashSet GetAllItemsFromSet(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + List sets = Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key) + .Select(s => s.Value) + .AsEnumerable() + .ToList(); + + return new HashSet(sets); + } + + public override string GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (toScore < fromScore) throw new ArgumentException("The `toScore` value must be higher or equal to the `fromScore` value."); + + return Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key) + .OrderBy(s => s.Score) + .Where(s => s.Score >= fromScore && s.Score <= toScore) + .Select(s => s.Value) + .AsEnumerable() + .FirstOrDefault(); + } + + #endregion + + #region Server + + public override void AnnounceServer(string serverId, ServerContext context) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + if (context == null) throw new ArgumentNullException(nameof(context)); + + Entities.Server server = Storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .Where(s => s.ServerId == serverId) + .AsEnumerable() + .FirstOrDefault(); + + if (server != null) + { + server.LastHeartbeat = DateTime.UtcNow; + server.Workers = context.WorkerCount; + server.Queues = context.Queues; + } + else + { + server = new Entities.Server + { + ServerId = serverId, + Workers = context.WorkerCount, + Queues = context.Queues, + CreatedOn = DateTime.UtcNow, + LastHeartbeat = DateTime.UtcNow + }; + } + + Storage.Client.UpsertDocumentAsync(ServerDocumentCollectionUri, server).GetAwaiter().GetResult(); + } + + public override void Heartbeat(string serverId) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + + Entities.Server server = Storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .Where(s => s.ServerId == serverId) + .AsEnumerable() + .FirstOrDefault(); + + if (server != null) + { + server.LastHeartbeat = DateTime.UtcNow; + Storage.Client.ReplaceDocumentAsync(server.SelfLink, server).GetAwaiter().GetResult(); + } + } + + public override void RemoveServer(string serverId) + { + if (serverId == null) throw new ArgumentNullException(nameof(serverId)); + + Entities.Server server = Storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .Where(s => s.ServerId == serverId) + .AsEnumerable() + .FirstOrDefault(); + + if (server != null) + { + Storage.Client.DeleteDocumentAsync(server.SelfLink).GetAwaiter().GetResult(); + } + } + + public override int RemoveTimedOutServers(TimeSpan timeOut) + { + if (timeOut.Duration() != timeOut) + { + throw new ArgumentException(@"The `timeOut` value must be positive.", nameof(timeOut)); + } + + DateTime lastHeartbeat = DateTime.UtcNow.Add(timeOut.Negate()); + string[] selfLinks = Storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .AsEnumerable() + .Where(s => s.LastHeartbeat < lastHeartbeat) + .Select(s => s.SelfLink) + .ToArray(); + + Array.ForEach(selfLinks, selfLink => Storage.Client.DeleteDocumentAsync(selfLink).GetAwaiter().GetResult()); + return selfLinks.Length; + } + + #endregion + + #region Hash + + public override Dictionary GetAllEntriesFromHash(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .AsEnumerable() + .ToDictionary(h => h.Field, h => h.Value); + } + + public override void SetRangeInHash(string key, IEnumerable> keyValuePairs) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (keyValuePairs == null) throw new ArgumentNullException(nameof(keyValuePairs)); + + Func epoch = s => + { + DateTime date; + if (DateTime.TryParse(s, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out date)) + { + if (date.Equals(DateTime.MinValue)) return int.MinValue.ToString(); + DateTime epochDateTime = new DateTime(1970, 1, 1); + TimeSpan epochTimeSpan = date - epochDateTime; + return ((int)epochTimeSpan.TotalSeconds).ToString(CultureInfo.InvariantCulture); + } + return s; + }; + + List sources = keyValuePairs.Select(k => new Hash + { + Key = key, + Field = k.Key, + Value = epoch(k.Value) + }).ToList(); + + + List hashes = Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .AsEnumerable() + .ToList(); + + sources.ForEach(source => + { + Hash hash = hashes.FirstOrDefault(h => h.Key == source.Key && h.Field == source.Field); + if (hash != null) source.Id = hash.Id; + }); + + sources.ForEach(hash => Storage.Client.UpsertDocumentAsync(HashDocumentCollectionUri, hash).GetAwaiter().GetResult()); + } + + public override long GetHashCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .LongCount(); + } + + public override string GetValueFromHash(string key, string name) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + return Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key && h.Field == name) + .Select(h => h.Value) + .AsEnumerable() + .FirstOrDefault(); + } + + public override TimeSpan GetHashTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + DateTime? expireOn = Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .Min(h => h.ExpireOn); + + return expireOn.HasValue ? expireOn.Value - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + #endregion + + #region List + + public override List GetAllItemsFromList(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key) + .Select(l => l.Value) + .AsEnumerable() + .ToList(); + } + + public override List GetRangeFromList(string key, int startingFrom, int endingAt) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key) + .AsEnumerable() + .OrderBy(l => l.ExpireOn) + .Skip(startingFrom).Take(endingAt) + .Select(l => l.Value) + .ToList(); + } + + public override TimeSpan GetListTtl(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + DateTime? expireOn = Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key) + .Min(l => l.ExpireOn); + + return expireOn.HasValue ? expireOn.Value - DateTime.UtcNow : TimeSpan.FromSeconds(-1); + } + + public override long GetListCount(string key) + { + if (key == null) throw new ArgumentNullException(nameof(key)); + + return Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key) + .LongCount(); + } + + #endregion + + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLock.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLock.cs new file mode 100644 index 0000000..8cba7ec --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLock.cs @@ -0,0 +1,70 @@ +using System; +using System.Net; +using System.Linq; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ + internal class AzureDocumentDbDistributedLock : IDisposable + { + private readonly AzureDocumentDbStorage storage; + private string selfLink; + private readonly object syncLock = new object(); + + public AzureDocumentDbDistributedLock(string resource, TimeSpan timeout, AzureDocumentDbStorage storage) + { + this.storage = storage; + Acquire(resource, timeout); + } + + public void Dispose() => Relase(); + + private void Acquire(string name, TimeSpan timeout) + { + Uri documentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "locks"); + FeedOptions queryOptions = new FeedOptions { MaxItemCount = 1 }; + + System.Diagnostics.Stopwatch acquireStart = new System.Diagnostics.Stopwatch(); + acquireStart.Start(); + + while (true) + { + Lock @lock = storage.Client.CreateDocumentQuery(documentCollectionUri, queryOptions) + .Where(l => l.Name == name) + .AsEnumerable() + .FirstOrDefault(); + + if (@lock == null) + { + @lock = new Lock { Name = name, ExpireOn = DateTime.UtcNow.Add(timeout) }; + ResourceResponse response = storage.Client.CreateDocumentAsync(documentCollectionUri, @lock).GetAwaiter().GetResult(); + if (response.StatusCode == HttpStatusCode.Created) + { + selfLink = response.Resource.SelfLink; + break; + } + } + + // check the timeout + if (acquireStart.ElapsedMilliseconds > timeout.TotalMilliseconds) + { + throw new AzureDocumentDbDistributedLockException($"Could not place a lock on the resource '{name}': Lock timeout."); + } + + // sleep for 500 millisecond + System.Threading.Thread.Sleep(500); + } + } + + private void Relase() + { + lock (syncLock) + { + storage.Client.DeleteDocumentAsync(selfLink).GetAwaiter().GetResult(); + } + } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLockException.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLockException.cs new file mode 100644 index 0000000..db1ac61 --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLockException.cs @@ -0,0 +1,19 @@ +using System; + +namespace Hangfire.AzureDocumentDB +{ + /// + /// Represents errors that occur while acquiring a distributed lock. + /// + [Serializable] + public class AzureDocumentDbDistributedLockException : Exception + { + /// + /// Initializes a new instance of the FirebaseDistributedLockException class with serialized data. + /// + /// The message that describes the error. + public AzureDocumentDbDistributedLockException(string message) : base(message) + { + } + } +} diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbMonitoringApi.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbMonitoringApi.cs new file mode 100644 index 0000000..7f0be63 --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbMonitoringApi.cs @@ -0,0 +1,379 @@ +using System; +using System.Linq; +using System.Collections.Generic; + +using Microsoft.Azure.Documents.Client; + +using Hangfire.Common; +using Hangfire.Storage; +using Hangfire.Storage.Monitoring; +using Hangfire.AzureDocumentDB.Queue; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ + internal sealed class AzureDocumentDbMonitoringApi : IMonitoringApi + { + private readonly AzureDocumentDbStorage storage; + + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = -1 }; + private readonly Uri JobDocumentCollectionUri; + private readonly Uri StateDocumentCollectionUri; + private readonly Uri SetDocumentCollectionUri; + private readonly Uri CounterDocumentCollectionUri; + private readonly Uri ServerDocumentCollectionUri; + private readonly Uri QueueDocumentCollectionUri; + + public AzureDocumentDbMonitoringApi(AzureDocumentDbStorage storage) + { + this.storage = storage; + + JobDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "jobs"); + StateDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "states"); + SetDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "sets"); + CounterDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "counters"); + ServerDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "servers"); + QueueDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "queues"); + } + + public IList Queues() + { + List queueJobs = new List(); + + Array.ForEach(storage.Options.Queues, queue => + { + long enqueueCount = EnqueuedCount(queue); + JobList jobs = EnqueuedJobs(queue, 0, 1); + queueJobs.Add(new QueueWithTopEnqueuedJobsDto + { + Length = enqueueCount, + Fetched = 0, + Name = queue, + FirstJobs = jobs + }); + }); + + return queueJobs; + } + + public IList Servers() + { + List servers = storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .AsEnumerable() + .ToList(); + + return servers.Select(server => new ServerDto + { + Name = server.ServerId, + Heartbeat = server.LastHeartbeat, + Queues = server.Queues, + StartedAt = server.CreatedOn, + WorkersCount = server.Workers + }).ToList(); + } + + public JobDetailsDto JobDetails(string jobId) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + + Entities.Job job = storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .AsEnumerable() + .FirstOrDefault(); + + if (job != null) + { + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + List states = storage.Client.CreateDocumentQuery(StateDocumentCollectionUri, QueryOptions) + .Where(s => s.JobId == jobId) + .AsEnumerable() + .Select(s => new StateHistoryDto + { + Data = s.Data, + CreatedAt = s.CreatedOn, + Reason = s.Reason, + StateName = s.Name + }).ToList(); + + return new JobDetailsDto + { + Job = invocationData.Deserialize(), + CreatedAt = job.CreatedOn, + ExpireAt = job.ExpireOn, + Properties = job.Parameters.ToDictionary(p => p.Name, p => p.Value), + History = states + }; + } + + return null; + } + + public StatisticsDto GetStatistics() + { + Dictionary results = new Dictionary(); + + // get counts of jobs groupby on state + Dictionary states = storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Select(j => j.StateName) + .AsEnumerable() + .Where(j => !string.IsNullOrEmpty(j)) + .GroupBy(j => j) + .ToDictionary(g => g.Key, g => g.LongCount()); + + results = results.Concat(states).ToDictionary(k => k.Key, v => v.Value); + + // get counts of servers + long servers = storage.Client.CreateDocumentQuery(ServerDocumentCollectionUri, QueryOptions) + .Select(s => s.Id) + .AsEnumerable() + .LongCount(); + results.Add("Servers", servers); + + // get sum of stats:succeeded counters raw / aggregate + Dictionary counters = storage.Client.CreateDocumentQuery(CounterDocumentCollectionUri, QueryOptions) + .Where(c => c.Key == "stats:succeeded" || c.Key == "stats:deleted") + .AsEnumerable() + .GroupBy(c => c.Key) + .ToDictionary(g => g.Key, g => (long)g.Sum(c => c.Value)); + + results = results.Concat(counters).ToDictionary(k => k.Key, v => v.Value); + + long count = 0; + count += storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == "recurring-jobs") + .Select(s => s.Id) + .AsEnumerable() + .LongCount(); + + results.Add("recurring-jobs", count); + + Func getValueOrDefault = (key) => results.Where(r => r.Key == key).Select(r => r.Value).SingleOrDefault(); + return new StatisticsDto + { + Enqueued = getValueOrDefault("Enqueued"), + Failed = getValueOrDefault("Failed"), + Processing = getValueOrDefault("Processing"), + Scheduled = getValueOrDefault("Scheduled"), + Succeeded = getValueOrDefault("stats:succeeded"), + Deleted = getValueOrDefault("stats:deleted"), + Recurring = getValueOrDefault("recurring-jobs"), + Servers = getValueOrDefault("Servers"), + Queues = storage.Options.Queues.LongLength + }; + } + + #region Job List + + public JobList EnqueuedJobs(string queue, int from, int perPage) + { + return GetJobsOnQueue(queue, from, perPage, (state, job) => new EnqueuedJobDto + { + Job = job, + State = state + }); + } + + public JobList FetchedJobs(string queue, int from, int perPage) + { + return GetJobsOnQueue(queue, from, perPage, (state, job) => new FetchedJobDto + { + Job = job, + State = state + }); + } + + public JobList ProcessingJobs(int from, int count) + { + return GetJobsOnState(States.ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto + { + Job = job, + ServerId = state.Data.ContainsKey("ServerId") ? state.Data["ServerId"] : state.Data["ServerName"], + StartedAt = JobHelper.DeserializeDateTime(state.Data["StartedAt"]) + }); + } + + public JobList ScheduledJobs(int from, int count) + { + return GetJobsOnState(States.ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto + { + Job = job, + EnqueueAt = JobHelper.DeserializeDateTime(state.Data["EnqueueAt"]), + ScheduledAt = JobHelper.DeserializeDateTime(state.Data["ScheduledAt"]) + }); + } + + public JobList SucceededJobs(int from, int count) + { + return GetJobsOnState(States.SucceededState.StateName, from, count, (state, job) => new SucceededJobDto + { + Job = job, + Result = state.Data.ContainsKey("Result") ? state.Data["Result"] : null, + TotalDuration = state.Data.ContainsKey("PerformanceDuration") && state.Data.ContainsKey("Latency") + ? (long?)long.Parse(state.Data["PerformanceDuration"]) + long.Parse(state.Data["Latency"]) + : null, + SucceededAt = JobHelper.DeserializeNullableDateTime(state.Data["SucceededAt"]) + }); + } + + public JobList FailedJobs(int from, int count) + { + return GetJobsOnState(States.FailedState.StateName, from, count, (state, job) => new FailedJobDto + { + Job = job, + Reason = state.Reason, + FailedAt = JobHelper.DeserializeNullableDateTime(state.Data["FailedAt"]), + ExceptionDetails = state.Data["ExceptionDetails"], + ExceptionMessage = state.Data["ExceptionMessage"], + ExceptionType = state.Data["ExceptionType"], + }); + } + + public JobList DeletedJobs(int from, int count) + { + return GetJobsOnState(States.DeletedState.StateName, from, count, (state, job) => new DeletedJobDto + { + Job = job, + DeletedAt = JobHelper.DeserializeNullableDateTime(state.Data["DeletedAt"]) + }); + } + + private JobList GetJobsOnState(string stateName, int from, int count, Func selector) + { + List> jobs = new List>(); + + List filterJobs = storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.StateName == stateName) + .AsEnumerable() + .Skip(from).Take(count) + .ToList(); + + List states = storage.Client.CreateDocumentQuery(StateDocumentCollectionUri, QueryOptions) + .AsEnumerable() + .Where(s => filterJobs.Any(j => j.StateId == s.Id)) + .ToList(); + + filterJobs.ForEach(job => + { + State state = states.Single(s => s.Id == job.StateId); + state.Data = state.Data; + + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + T data = selector(state, invocationData.Deserialize()); + jobs.Add(new KeyValuePair(job.Id, data)); + }); + + return new JobList(jobs); + } + + private JobList GetJobsOnQueue(string queue, int from, int count, Func selector) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + + List> jobs = new List>(); + + List queues = storage.Client.CreateDocumentQuery(QueueDocumentCollectionUri, QueryOptions) + .Where(q => q.Name == queue) + .AsEnumerable() + .Skip(from).Take(count) + .ToList(); + + List filterJobs = storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .AsEnumerable() + .Where(j => queues.Any(q => q.JobId == j.Id)) + .ToList(); + + queues.ForEach(queueItem => + { + Entities.Job job = filterJobs.Single(j => j.Id == queueItem.JobId); + InvocationData invocationData = job.InvocationData; + invocationData.Arguments = job.Arguments; + + T data = selector(job.StateName, invocationData.Deserialize()); + jobs.Add(new KeyValuePair(job.Id, data)); + }); + + return new JobList(jobs); + } + + #endregion + + #region Counts + + public long EnqueuedCount(string queue) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + + IPersistentJobQueueProvider provider = storage.QueueProviders.GetProvider(queue); + IPersistentJobQueueMonitoringApi monitoringApi = provider.GetJobQueueMonitoringApi(); + return monitoringApi.GetEnqueuedCount(queue); + } + + public long FetchedCount(string queue) => EnqueuedCount(queue); + + public long ScheduledCount() => GetNumberOfJobsByStateName(States.ScheduledState.StateName); + + public long FailedCount() => GetNumberOfJobsByStateName(States.FailedState.StateName); + + public long ProcessingCount() => GetNumberOfJobsByStateName(States.ProcessingState.StateName); + + public long SucceededListCount() => GetNumberOfJobsByStateName(States.SucceededState.StateName); + + public long DeletedListCount() => GetNumberOfJobsByStateName(States.DeletedState.StateName); + + private long GetNumberOfJobsByStateName(string state) + { + return storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.StateName == state) + .Select(s => s.Id) + .AsEnumerable() + .LongCount(); + } + + public IDictionary SucceededByDatesCount() => GetDatesTimelineStats("succeeded"); + + public IDictionary FailedByDatesCount() => GetDatesTimelineStats("failed"); + + public IDictionary HourlySucceededJobs() => GetHourlyTimelineStats("succeeded"); + + public IDictionary HourlyFailedJobs() => GetHourlyTimelineStats("failed"); + + private Dictionary GetHourlyTimelineStats(string type) + { + List dates = Enumerable.Range(0, 24).Select(x => DateTime.UtcNow.AddHours(-x)).ToList(); + Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd-HH}", x => x); + return GetTimelineStats(keys); + } + + private Dictionary GetDatesTimelineStats(string type) + { + List dates = Enumerable.Range(0, 7).Select(x => DateTime.UtcNow.AddDays(-x)).ToList(); + Dictionary keys = dates.ToDictionary(x => $"stats:{type}:{x:yyyy-MM-dd}", x => x); + return GetTimelineStats(keys); + } + + private Dictionary GetTimelineStats(Dictionary keys) + { + Dictionary result = keys.ToDictionary(k => k.Value, v => default(long)); + + Dictionary data = storage.Client.CreateDocumentQuery(CounterDocumentCollectionUri, QueryOptions) + .Where(c => c.Type == CounterTypes.Aggregrate) + .AsEnumerable() + .Where(c => keys.ContainsKey(c.Key)) + .ToDictionary(k => k.Key, k => k.Value); + + foreach (string key in keys.Keys) + { + DateTime date = keys.Where(k => k.Key == key).Select(k => k.Value).First(); + result[date] = data.ContainsKey(key) ? data[key] : 0; + } + + return result; + } + + #endregion + } +} diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbStorage.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbStorage.cs new file mode 100644 index 0000000..26cc6c5 --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbStorage.cs @@ -0,0 +1,161 @@ +using System; +using System.Collections.Generic; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +using Hangfire.Server; +using Hangfire.Storage; +using Hangfire.Logging; +using Hangfire.AzureDocumentDB.Queue; + +namespace Hangfire.AzureDocumentDB +{ + /// + /// FirebaseStorage extend the storage option for Hangfire. + /// + public sealed class AzureDocumentDbStorage : JobStorage + { + internal AzureDocumentDbStorageOptions Options { get; } + + internal PersistentJobQueueProviderCollection QueueProviders { get; } + + internal DocumentClient Client { get; } + + /// + /// Initializes the FirebaseStorage form the url auth secret provide. + /// + /// The url string to Firebase Database + /// The secret key for the Firebase Database + /// The name of the database to connect with + /// argument is null. + /// argument is null. + public AzureDocumentDbStorage(string url, string authSecret, string database) : this(new AzureDocumentDbStorageOptions { Endpoint = new Uri(url), AuthSecret = authSecret, DatabaseName = database }) { } + + /// + /// Initializes the FirebaseStorage form the url auth secret provide. + /// + /// The url string to Firebase Database + /// The secret key for the Firebase Database + /// The name of the database to connect with + /// The FirebaseStorage object to override any of the options + /// argument is null. + /// argument is null. + public AzureDocumentDbStorage(string url, string authSecret, string database, AzureDocumentDbStorageOptions options) : this(Transform(url, authSecret, database, options)) { } + + /// + /// Initializes the FirebaseStorage form the url auth secret provide. + /// + /// The FirebaseStorage object to override any of the options + /// argument is null. + private AzureDocumentDbStorage(AzureDocumentDbStorageOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + Options = options; + + ConnectionPolicy connectionPolicy = ConnectionPolicy.Default; + connectionPolicy.RequestTimeout = options.RequestTimeout; + Client = new DocumentClient(options.Endpoint, options.AuthSecret, connectionPolicy); + Client.OpenAsync().GetAwaiter().GetResult(); + + // create the database all the collections + Initialize(); + + Newtonsoft.Json.JsonConvert.DefaultSettings = () => new Newtonsoft.Json.JsonSerializerSettings + { + NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore, + DefaultValueHandling = Newtonsoft.Json.DefaultValueHandling.Ignore, + DateTimeZoneHandling = Newtonsoft.Json.DateTimeZoneHandling.Utc, + TypeNameHandling = Newtonsoft.Json.TypeNameHandling.All + }; + + JobQueueProvider provider = new JobQueueProvider(this); + QueueProviders = new PersistentJobQueueProviderCollection(provider); + } + + /// + /// + /// + /// + public override IStorageConnection GetConnection() => new AzureDocumentDbConnection(this); + + /// + /// + /// + /// + public override IMonitoringApi GetMonitoringApi() => new AzureDocumentDbMonitoringApi(this); + +#pragma warning disable 618 + /// + /// + /// + /// + public override IEnumerable GetComponents() +#pragma warning restore 618 + { + yield return new ExpirationManager(this); + yield return new CountersAggregator(this); + } + + /// + /// Prints out the storage options + /// + /// + public override void WriteOptionsToLog(ILog logger) + { + logger.Info("Using the following options for Firebase job storage:"); + logger.Info($" Firebase Url: {Options.Endpoint.AbsoluteUri}"); + logger.Info($" Request Timeout: {Options.RequestTimeout}"); + logger.Info($" Counter Agggerate Interval: {Options.CountersAggregateInterval.TotalSeconds} seconds"); + logger.Info($" Queue Poll Interval: {Options.QueuePollInterval.TotalSeconds} seconds"); + logger.Info($" Expiration Check Interval: {Options.ExpirationCheckInterval.TotalSeconds} seconds"); + logger.Info($" Queue: {string.Join(",", Options.Queues)}"); + } + + /// + /// Return the name of the database + /// + /// + public override string ToString() => $"DoucmentDb Database : {Options.DatabaseName}"; + + private void Initialize() + { + ILog logger = LogProvider.For(); + Uri databaseUri = UriFactory.CreateDatabaseUri(Options.DatabaseName); + + // create database + logger.Info($"Creating database : {Options.DatabaseName}"); + Client.CreateDatabaseIfNotExistsAsync(new Database { Id = Options.DatabaseName }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : servers"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "servers" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : queues"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "queues" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : hashes"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "hashes" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : lists"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "lists" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : counters"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "counters" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : jobs"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "jobs" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : states"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "states" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : sets"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "sets" }).GetAwaiter().GetResult(); + logger.Info("Creating document collection : locks"); + Client.CreateDocumentCollectionIfNotExistsAsync(databaseUri, new DocumentCollection { Id = "locks" }).GetAwaiter().GetResult(); + } + + private static AzureDocumentDbStorageOptions Transform(string url, string authSecret, string database, AzureDocumentDbStorageOptions options) + { + if (options == null) options = new AzureDocumentDbStorageOptions(); + + options.Endpoint = new Uri(url); + options.AuthSecret = authSecret; + options.DatabaseName = database; + + return options; + } + + } +} diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbStorageExtensions.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbStorageExtensions.cs new file mode 100644 index 0000000..4b47eef --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbStorageExtensions.cs @@ -0,0 +1,49 @@ +using System; +using Hangfire.AzureDocumentDB; + +namespace Hangfire +{ + /// + /// Extension methods to user AzureDocumentDb Storage. + /// + public static class AzureDocumentDbStorageExtensions + { + /// + /// Enables to attache AzureDocumentDb to Hangfire + /// + /// The IGlobalConfiguration object + /// The url string to AzureDocumentDb Database + /// The secret key for the AzureDocumentDb Database + /// The name of the database to connect with + /// + public static IGlobalConfiguration UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database) + { + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); + if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret)); + + AzureDocumentDbStorage storage = new AzureDocumentDbStorage(url, authSecret, database); + return configuration.UseStorage(storage); + } + + /// + /// Enables to attache AzureDocumentDb to Hangfire + /// + /// The IGlobalConfiguration object + /// The url string to AzureDocumentDb Database + /// The secret key for the AzureDocumentDb Database + /// The name of the database to connect with + /// The AzureDocumentDbStorage object to override any of the options + /// + public static IGlobalConfiguration UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, AzureDocumentDbStorageOptions options) + { + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); + if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret)); + if (options == null) throw new ArgumentNullException(nameof(options)); + + AzureDocumentDbStorage storage = new AzureDocumentDbStorage(url, authSecret, database, options); + return configuration.UseStorage(storage); + } + } +} diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbStorageOptions.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbStorageOptions.cs new file mode 100644 index 0000000..5659feb --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbStorageOptions.cs @@ -0,0 +1,59 @@ +using System; +// ReSharper disable MemberCanBePrivate.Global +// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global + +namespace Hangfire.AzureDocumentDB +{ + /// + /// Options for FirebaseStorage + /// + public class AzureDocumentDbStorageOptions + { + internal Uri Endpoint { get; set; } + + internal string AuthSecret { get; set; } + + /// + /// Get or sets the name of the database to connect. + /// + internal string DatabaseName { get; set; } + + /// + /// Get or sets the request timemout for IFirebaseConfig. Default value set to 30 seconds + /// + public TimeSpan RequestTimeout { get; set; } + + /// + /// Get or set list of queues to process. Default values "default", "critical" + /// + public string[] Queues { get; set; } + + /// + /// Get or set the interval timespan to process expired enteries. Default value 15 minutes + /// Expired items under "locks", "jobs", "lists", "sets", "hashs", "counters/aggregrated" will be checked + /// + public TimeSpan ExpirationCheckInterval { get; set; } + + /// + /// Get or sets the interval timespan to aggreated the counters. Default value 1 minute + /// + public TimeSpan CountersAggregateInterval { get; set; } + + /// + /// Gets or sets the interval timespan to poll the queue for processing any new jobs. Default value 2 minutes + /// + public TimeSpan QueuePollInterval { get; set; } + + /// + /// Create an instance of AzureDocumentDB Storage option with default values + /// + public AzureDocumentDbStorageOptions() + { + RequestTimeout = TimeSpan.FromSeconds(30); + Queues = new[] { "default", "critical" }; + ExpirationCheckInterval = TimeSpan.FromMinutes(5); + CountersAggregateInterval = TimeSpan.FromMinutes(1); + QueuePollInterval = TimeSpan.FromSeconds(2); + } + } +} diff --git a/Hangfire.AzureDocumentDB/AzureDocumentDbWriteOnlyTransaction.cs b/Hangfire.AzureDocumentDB/AzureDocumentDbWriteOnlyTransaction.cs new file mode 100644 index 0000000..663ba56 --- /dev/null +++ b/Hangfire.AzureDocumentDB/AzureDocumentDbWriteOnlyTransaction.cs @@ -0,0 +1,411 @@ +using System; +using System.Linq; +using System.Globalization; +using System.Collections.Generic; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +using Hangfire.States; +using Hangfire.Storage; +using Hangfire.AzureDocumentDB.Queue; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ + internal class AzureDocumentDbWriteOnlyTransaction : IWriteOnlyTransaction + { + private readonly AzureDocumentDbConnection connection; + private readonly List commands = new List(); + + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 100 }; + private readonly Uri JobDocumentCollectionUri; + private readonly Uri SetDocumentCollectionUri; + private readonly Uri StateDocumentCollectionUri; + private readonly Uri CounterDocumentCollectionUri; + private readonly Uri HashDocumentCollectionUri; + private readonly Uri ListDocumentCollectionUri; + + public AzureDocumentDbWriteOnlyTransaction(AzureDocumentDbConnection connection) + { + this.connection = connection; + + AzureDocumentDbStorage storage = connection.Storage; + JobDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "jobs"); + SetDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "sets"); + StateDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "states"); + CounterDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "counters"); + HashDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "hashes"); + ListDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "lists"); + } + + private void QueueCommand(Action command) => commands.Add(command); + public void Commit() => commands.ForEach(command => command()); + public void Dispose() { } + + #region Queue + + public void AddToQueue(string queue, string jobId) + { + if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + + IPersistentJobQueueProvider provider = connection.QueueProviders.GetProvider(queue); + IPersistentJobQueue persistentQueue = provider.GetJobQueue(); + QueueCommand(() => persistentQueue.Enqueue(queue, jobId)); + } + + #endregion + + #region Counter + + public void DecrementCounter(string key) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + + QueueCommand(() => + { + Counter data = new Counter + { + Key = key, + Type = CounterTypes.Raw, + Value = -1 + }; + + connection.Storage.Client.CreateDocumentAsync(CounterDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + public void DecrementCounter(string key, TimeSpan expireIn) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (expireIn.Duration() != expireIn) throw new ArgumentException(@"The `expireIn` value must be positive.", nameof(expireIn)); + + QueueCommand(() => + { + Counter data = new Counter + { + Key = key, + Type = CounterTypes.Raw, + Value = -1, + ExpireOn = DateTime.UtcNow.Add(expireIn) + }; + + connection.Storage.Client.CreateDocumentAsync(CounterDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + public void IncrementCounter(string key) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + + QueueCommand(() => + { + Counter data = new Counter + { + Key = key, + Type = CounterTypes.Raw, + Value = 1 + }; + + connection.Storage.Client.CreateDocumentAsync(CounterDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + public void IncrementCounter(string key, TimeSpan expireIn) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (expireIn.Duration() != expireIn) throw new ArgumentException(@"The `expireIn` value must be positive.", nameof(expireIn)); + + QueueCommand(() => + { + Counter data = new Counter + { + Key = key, + Type = CounterTypes.Raw, + Value = 1, + ExpireOn = DateTime.UtcNow.Add(expireIn) + }; + + connection.Storage.Client.CreateDocumentAsync(CounterDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + #endregion + + #region Job + + public void ExpireJob(string jobId, TimeSpan expireIn) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + if (expireIn.Duration() != expireIn) throw new ArgumentException(@"The `expireIn` value must be positive.", nameof(expireIn)); + + QueueCommand(() => + { + Job job = connection.Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .AsEnumerable() + .FirstOrDefault(); + + if (job != null) + { + job.ExpireOn = DateTime.UtcNow.Add(expireIn); + connection.Storage.Client.ReplaceDocumentAsync(job.SelfLink, job).GetAwaiter().GetResult(); + } + }); + } + + public void PersistJob(string jobId) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + + QueueCommand(() => + { + Job job = connection.Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .AsEnumerable() + .FirstOrDefault(); + + if (job != null && job.ExpireOn.HasValue) + { + job.ExpireOn = null; + connection.Storage.Client.ReplaceDocumentAsync(job.SelfLink, job).GetAwaiter().GetResult(); + } + }); + } + + #endregion + + #region State + + public void SetJobState(string jobId, IState state) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + if (state == null) throw new ArgumentNullException(nameof(state)); + + QueueCommand(() => + { + Job job = connection.Storage.Client.CreateDocumentQuery(JobDocumentCollectionUri, QueryOptions) + .Where(j => j.Id == jobId) + .AsEnumerable() + .FirstOrDefault(); + + if (job != null) + { + State data = new State + { + JobId = jobId, + Name = state.Name, + Reason = state.Reason, + CreatedOn = DateTime.UtcNow, + Data = state.SerializeData() + }; + + ResourceResponse response = connection.Storage.Client.CreateDocumentAsync(StateDocumentCollectionUri, data).GetAwaiter().GetResult(); + + job.StateId = response.Resource.Id; + job.StateName = state.Name; + + connection.Storage.Client.ReplaceDocumentAsync(job.SelfLink, job).GetAwaiter().GetResult(); + } + }); + } + + public void AddJobState(string jobId, IState state) + { + if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); + if (state == null) throw new ArgumentNullException(nameof(state)); + + QueueCommand(() => + { + State data = new State + { + JobId = jobId, + Name = state.Name, + Reason = state.Reason, + CreatedOn = DateTime.UtcNow, + Data = state.SerializeData() + }; + + connection.Storage.Client.CreateDocumentAsync(StateDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + #endregion + + #region Set + + public void RemoveFromSet(string key, string value) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (string.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value)); + + QueueCommand(() => + { + Set set = connection.Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key && s.Value == value) + .AsEnumerable() + .FirstOrDefault(); + + if (set != null) + { + connection.Storage.Client.DeleteDocumentAsync(set.SelfLink).GetAwaiter().GetResult(); + } + }); + } + + public void AddToSet(string key, string value) => AddToSet(key, value, 0.0); + + public void AddToSet(string key, string value, double score) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (string.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value)); + + QueueCommand(() => + { + Set set = connection.Storage.Client.CreateDocumentQuery(SetDocumentCollectionUri, QueryOptions) + .Where(s => s.Key == key && s.Value == value) + .AsEnumerable() + .FirstOrDefault(); + + if (set != null) + { + set.Key = key; + set.Value = value; + set.Score = score; + } + else + { + set = new Set + { + Key = key, + Value = value, + Score = score + }; + } + + connection.Storage.Client.UpsertDocumentAsync(SetDocumentCollectionUri, set).GetAwaiter().GetResult(); + }); + } + + #endregion + + #region Hash + + public void RemoveHash(string key) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + + QueueCommand(() => + { + List hashes = connection.Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .AsEnumerable() + .ToList(); + + hashes.ForEach(hash => connection.Storage.Client.DeleteDocumentAsync(hash.SelfLink).GetAwaiter().GetResult()); + }); + } + + public void SetRangeInHash(string key, IEnumerable> keyValuePairs) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (keyValuePairs == null) throw new ArgumentNullException(nameof(keyValuePairs)); + + QueueCommand(() => + { + Func epoch = s => + { + DateTime date; + if (DateTime.TryParse(s, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out date)) + { + if (date.Equals(DateTime.MinValue)) return int.MinValue.ToString(); + DateTime epochDateTime = new DateTime(1970, 1, 1); + TimeSpan epochTimeSpan = date - epochDateTime; + return ((int)epochTimeSpan.TotalSeconds).ToString(CultureInfo.InvariantCulture); + } + return s; + }; + + List sources = keyValuePairs.Select(k => new Hash + { + Key = key, + Field = k.Key, + Value = epoch(k.Value) + }).ToList(); + + List hashes = connection.Storage.Client.CreateDocumentQuery(HashDocumentCollectionUri, QueryOptions) + .Where(h => h.Key == key) + .AsEnumerable() + .ToList(); + + sources.ForEach(source => + { + Hash hash = hashes.FirstOrDefault(h => h.Key == source.Key && h.Field == source.Field); + if (hash != null) source.Id = hash.Id; + }); + + sources.ForEach(hash => connection.Storage.Client.UpsertDocumentAsync(HashDocumentCollectionUri, hash).GetAwaiter().GetResult()); + }); + } + + #endregion + + #region List + + public void InsertToList(string key, string value) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (string.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value)); + + QueueCommand(() => + { + List data = new List + { + Key = key, + Value = value + }; + + connection.Storage.Client.CreateDocumentAsync(ListDocumentCollectionUri, data).GetAwaiter().GetResult(); + }); + } + + public void RemoveFromList(string key, string value) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + if (string.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value)); + + QueueCommand(() => + { + List data = connection.Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key && l.Value == value) + .AsEnumerable() + .FirstOrDefault(); + + if (data != null) + { + connection.Storage.Client.DeleteDocumentAsync(data.SelfLink).GetAwaiter().GetResult(); + } + }); + } + + public void TrimList(string key, int keepStartingFrom, int keepEndingAt) + { + if (string.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key)); + + QueueCommand(() => + { + List lists = connection.Storage.Client.CreateDocumentQuery(ListDocumentCollectionUri, QueryOptions) + .Where(l => l.Key == key) + .AsEnumerable() + .Skip(keepStartingFrom).Take(keepEndingAt) + .ToList(); + + lists.ForEach(list => connection.Storage.Client.DeleteDocumentAsync(list.SelfLink).GetAwaiter().GetResult()); + }); + } + + #endregion + + } +} diff --git a/Hangfire.AzureDocumentDB/CountersAggregator.cs b/Hangfire.AzureDocumentDB/CountersAggregator.cs new file mode 100644 index 0000000..872f429 --- /dev/null +++ b/Hangfire.AzureDocumentDB/CountersAggregator.cs @@ -0,0 +1,98 @@ +using System; +using System.Net; +using System.Linq; +using System.Threading; +using System.Collections.Generic; + +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +using Hangfire.Server; +using Hangfire.Logging; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ +#pragma warning disable 618 + internal class CountersAggregator : IServerComponent +#pragma warning restore 618 + { + private static readonly ILog Logger = LogProvider.For(); + private const string distributedLockKey = "countersaggragator"; + private static readonly TimeSpan defaultLockTimeout = TimeSpan.FromMinutes(5); + private readonly TimeSpan checkInterval; + + private readonly AzureDocumentDbStorage storage; + + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 1000 }; + private readonly Uri CounterDocumentCollectionUri; + + public CountersAggregator(AzureDocumentDbStorage storage) + { + if (storage == null) throw new ArgumentNullException(nameof(storage)); + + this.storage = storage; + checkInterval = storage.Options.CountersAggregateInterval; + CounterDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "counters"); + } + + public void Execute(CancellationToken cancellationToken) + { + Logger.Debug("Aggregating records in 'Counter' table."); + + using (new AzureDocumentDbDistributedLock(distributedLockKey, defaultLockTimeout, storage)) + { + List rawCounters = storage.Client.CreateDocumentQuery(CounterDocumentCollectionUri, QueryOptions) + .Where(c => c.Type == CounterTypes.Raw) + .AsEnumerable() + .ToList(); + + Dictionary> counters = rawCounters.GroupBy(c => c.Key) + .ToDictionary(k => k.Key, v => new Tuple(v.Sum(c => c.Value), v.Max(c => c.ExpireOn))); + + Array.ForEach(counters.Keys.ToArray(), key => + { + cancellationToken.ThrowIfCancellationRequested(); + + Tuple data; + if (counters.TryGetValue(key, out data)) + { + Counter aggregated = storage.Client.CreateDocumentQuery(CounterDocumentCollectionUri, QueryOptions) + .Where(c => c.Key == key && c.Type == CounterTypes.Aggregrate) + .AsEnumerable() + .FirstOrDefault(); + + if (aggregated == null) + { + aggregated = new Counter + { + Key = key, + Type = CounterTypes.Aggregrate, + Value = data.Item1, + ExpireOn = data.Item2 + }; + } + else + { + aggregated.Value += data.Item1; + aggregated.ExpireOn = data.Item2; + } + + ResourceResponse response = storage.Client.UpsertDocumentAsync(CounterDocumentCollectionUri, aggregated).GetAwaiter().GetResult(); + if (response.StatusCode == HttpStatusCode.Created || response.StatusCode == HttpStatusCode.OK) + { + List deleteCountersr = rawCounters.Where(c => c.Key == key).ToList(); + deleteCountersr.ForEach(counter => storage.Client.DeleteDocumentAsync(counter.SelfLink).GetAwaiter().GetResult()); + } + } + }); + } + + Logger.Trace("Records from the 'Counter' table aggregated."); + cancellationToken.WaitHandle.WaitOne(checkInterval); + } + + public override string ToString() => GetType().ToString(); + + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/Counter.cs b/Hangfire.AzureDocumentDB/Entities/Counter.cs new file mode 100644 index 0000000..7163c15 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Counter.cs @@ -0,0 +1,22 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Counter : DocumentEntity + { + [JsonProperty("key")] + public string Key { get; set; } + + [JsonProperty("value")] + public int Value { get; set; } + + [JsonProperty("country_type")] + public CounterTypes Type { get; set; } + } + + internal enum CounterTypes + { + Raw = 1, + Aggregrate = 2 + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/DocumentEntity.cs b/Hangfire.AzureDocumentDB/Entities/DocumentEntity.cs new file mode 100644 index 0000000..f5402d4 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/DocumentEntity.cs @@ -0,0 +1,19 @@ +using System; +using Microsoft.Azure.Documents; +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal abstract class DocumentEntity + { + [JsonProperty("id")] + public string Id { get; set; } = Guid.NewGuid().ToString(); + + [JsonProperty("_self")] + public string SelfLink { get; set; } + + [JsonProperty("expire_on")] + [JsonConverter(typeof(UnixDateTimeConverter))] + public DateTime? ExpireOn { get; set; } + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/Hash.cs b/Hangfire.AzureDocumentDB/Entities/Hash.cs new file mode 100644 index 0000000..8dd1698 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Hash.cs @@ -0,0 +1,16 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Hash : DocumentEntity + { + [JsonProperty("key")] + public string Key { get; set; } + + [JsonProperty("field")] + public string Field { get; set; } + + [JsonProperty("value")] + public string Value { get; set; } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Entities/Job.cs b/Hangfire.AzureDocumentDB/Entities/Job.cs new file mode 100644 index 0000000..eaec256 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Job.cs @@ -0,0 +1,29 @@ +using System; +using Hangfire.Storage; +using Microsoft.Azure.Documents; +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Job : DocumentEntity + { + [JsonProperty("data")] + public InvocationData InvocationData { get; set; } + + [JsonProperty("arguments")] + public string Arguments { get; set; } + + [JsonProperty("state_id")] + public string StateId { get; set; } + + [JsonProperty("state_name")] + public string StateName { get; set; } + + [JsonProperty("parameters")] + public Parameter[] Parameters { get; set; } + + [JsonProperty("created_on")] + [JsonConverter(typeof(UnixDateTimeConverter))] + public DateTime CreatedOn { get; set; } + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/List.cs b/Hangfire.AzureDocumentDB/Entities/List.cs new file mode 100644 index 0000000..e1a4860 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/List.cs @@ -0,0 +1,13 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class List : DocumentEntity + { + [JsonProperty("key")] + public string Key { get; set; } + + [JsonProperty("value")] + public string Value { get; set; } + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/Lock.cs b/Hangfire.AzureDocumentDB/Entities/Lock.cs new file mode 100644 index 0000000..4c116a0 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Lock.cs @@ -0,0 +1,10 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Lock : DocumentEntity + { + [JsonProperty("name")] + public string Name { get; set; } + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/Parameter.cs b/Hangfire.AzureDocumentDB/Entities/Parameter.cs new file mode 100644 index 0000000..22404d3 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Parameter.cs @@ -0,0 +1,13 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Parameter : DocumentEntity + { + [JsonProperty("name")] + public string Name { get; set; } + + [JsonProperty("value")] + public string Value { get; set; } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Entities/Queue.cs b/Hangfire.AzureDocumentDB/Entities/Queue.cs new file mode 100644 index 0000000..91525e4 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Queue.cs @@ -0,0 +1,13 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + class Queue : DocumentEntity + { + [JsonProperty("name")] + public string Name { get; set; } + + [JsonProperty("job_id")] + public string JobId { get; set; } + } +} diff --git a/Hangfire.AzureDocumentDB/Entities/Server.cs b/Hangfire.AzureDocumentDB/Entities/Server.cs new file mode 100644 index 0000000..117bbdb --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Server.cs @@ -0,0 +1,26 @@ +using System; +using Microsoft.Azure.Documents; +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Server : DocumentEntity + { + [JsonProperty("server_id")] + public string ServerId { get; set; } + + [JsonProperty("workers")] + public int Workers { get; set; } + + [JsonProperty("queues")] + public string[] Queues { get; set; } + + [JsonProperty("created_on")] + [JsonConverter(typeof(UnixDateTimeConverter))] + public DateTime CreatedOn { get; set; } + + [JsonProperty("last_heartbeat")] + [JsonConverter(typeof(UnixDateTimeConverter))] + public DateTime LastHeartbeat { get; set; } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Entities/Set.cs b/Hangfire.AzureDocumentDB/Entities/Set.cs new file mode 100644 index 0000000..d9ba1b2 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/Set.cs @@ -0,0 +1,16 @@ +using Newtonsoft.Json; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class Set : DocumentEntity + { + [JsonProperty("key")] + public string Key { get; set; } + + [JsonProperty("value")] + public string Value { get; set; } + + [JsonProperty("score")] + public double? Score { get; set; } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Entities/State.cs b/Hangfire.AzureDocumentDB/Entities/State.cs new file mode 100644 index 0000000..ca9f5a5 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Entities/State.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; + +using Newtonsoft.Json; +using Microsoft.Azure.Documents; + +namespace Hangfire.AzureDocumentDB.Entities +{ + internal class State : DocumentEntity + { + [JsonProperty("job_id")] + public string JobId { get; set; } + + [JsonProperty("name")] + public string Name { get; set; } + + [JsonProperty("reason")] + public string Reason { get; set; } + + [JsonProperty("created_on")] + [JsonConverter(typeof(UnixDateTimeConverter))] + public DateTime CreatedOn { get; set; } + + [JsonProperty("data")] + public Dictionary Data { get; set; } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/ExpirationManager.cs b/Hangfire.AzureDocumentDB/ExpirationManager.cs new file mode 100644 index 0000000..2376109 --- /dev/null +++ b/Hangfire.AzureDocumentDB/ExpirationManager.cs @@ -0,0 +1,77 @@ +using System; +using System.Linq; +using System.Threading; +using System.Collections.Generic; + +using Microsoft.Azure.Documents.Linq; +using Microsoft.Azure.Documents.Client; + +using Hangfire.Server; +using Hangfire.Logging; +using Hangfire.AzureDocumentDB.Entities; + +namespace Hangfire.AzureDocumentDB +{ +#pragma warning disable 618 + internal class ExpirationManager : IServerComponent +#pragma warning restore 618 + { + private static readonly ILog Logger = LogProvider.For(); + private const string distributedLockKey = "expirationmanager"; + private static readonly TimeSpan defaultLockTimeout = TimeSpan.FromMinutes(5); + private static readonly string[] documents = { "locks", "jobs", "lists", "sets", "hashes", "counters" }; + private readonly TimeSpan checkInterval; + + private readonly AzureDocumentDbStorage storage; + + public ExpirationManager(AzureDocumentDbStorage storage) + { + if (storage == null) throw new ArgumentNullException(nameof(storage)); + + this.storage = storage; + checkInterval = storage.Options.ExpirationCheckInterval; + } + + public void Execute(CancellationToken cancellationToken) + { + foreach (string document in documents) + { + Logger.Debug($"Removing outdated records from the '{document}' document."); + + using (new AzureDocumentDbDistributedLock(distributedLockKey, defaultLockTimeout, storage)) + { + string responseContinuation = null; + Uri collectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, document); + + do + { + FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 50, RequestContinuation = responseContinuation }; + IDocumentQuery query = storage.Client.CreateDocumentQuery(collectionUri, QueryOptions) + .AsDocumentQuery(); + + if (query.HasMoreResults) + { + FeedResponse response = query.ExecuteNextAsync(cancellationToken).GetAwaiter().GetResult(); + responseContinuation = response.ResponseContinuation; + + List entities = response + .Where(c => c.ExpireOn < DateTime.UtcNow) + .Where(entity => document != "counters" || !(entity is Counter) || ((Counter)entity).Type != CounterTypes.Raw).ToList(); + + foreach (DocumentEntity entity in entities) + { + cancellationToken.ThrowIfCancellationRequested(); + storage.Client.DeleteDocumentAsync(entity.SelfLink).GetAwaiter().GetResult(); + } + } + + } while (!string.IsNullOrEmpty(responseContinuation)); + + } + + Logger.Trace($"Outdated records removed from the '{document}' document."); + cancellationToken.WaitHandle.WaitOne(checkInterval); + } + } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj b/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj new file mode 100644 index 0000000..326e9af --- /dev/null +++ b/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj @@ -0,0 +1,145 @@ + + + + + Debug + AnyCPU + {E0AD3801-5504-49A7-80C6-8B4373DDE0E5} + Library + Properties + Hangfire.AzureDocumentDB + Hangfire.AzureDocumentDB + v4.5.2 + 512 + true + + + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + bin\Debug\Hangfire.AzureDocumentDB.XML + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + bin\Release\Hangfire.AzureDocumentDB.XML + + + + + + + ..\packages\Hangfire.Core.1.6.12\lib\net45\Hangfire.Core.dll + True + + + ..\packages\Microsoft.Azure.DocumentDB.1.13.2\lib\net45\Microsoft.Azure.Documents.Client.dll + True + + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll + True + + + ..\packages\Owin.1.0\lib\net40\Owin.dll + True + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ResXFileCodeGenerator + Resources.Designer.cs + Designer + + + True + Resources.resx + + + + SettingsSingleFileGenerator + Settings.Designer.cs + + + True + Settings.settings + True + + + + + Designer + + + + + + This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + + + + \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Json/LowerCaseDelimitedPropertyNamesContractResovler.cs b/Hangfire.AzureDocumentDB/Json/LowerCaseDelimitedPropertyNamesContractResovler.cs new file mode 100644 index 0000000..74b4092 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Json/LowerCaseDelimitedPropertyNamesContractResovler.cs @@ -0,0 +1,62 @@ +using System.Linq; +using System.Collections; +using System.Globalization; + +using Newtonsoft.Json.Serialization; + +namespace Hangfire.AzureDocumentDB.Json +{ + internal class LowerCaseDelimitedPropertyNamesContractResovler : DefaultContractResolver + { + private readonly char _delimiter; + + public LowerCaseDelimitedPropertyNamesContractResovler() : this('_') { } + + private LowerCaseDelimitedPropertyNamesContractResovler(char delimiter) + { + _delimiter = delimiter; + } + + protected override string ResolvePropertyName(string propertyName) + { + string camelCaseString = ToCamelCase(propertyName); + return new string(InsertDelimiterBeforeCaps(camelCaseString, _delimiter).OfType().ToArray()); + } + + private static IEnumerable InsertDelimiterBeforeCaps(IEnumerable input, char delimiter) + { + bool lastCharWasUppper = false; + foreach (char c in input) + { + if (char.IsUpper(c)) + { + if (!lastCharWasUppper) + { + yield return delimiter; + lastCharWasUppper = true; + } + yield return char.ToLower(c); + continue; + } + yield return c; + lastCharWasUppper = false; + } + } + + private static string ToCamelCase(string s) + { + if (string.IsNullOrEmpty(s)) + return s; + + if (!char.IsUpper(s[0])) + return s; + + string camelCase = char.ToLower(s[0], CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture); + if (s.Length > 1) + camelCase += s.Substring(1); + + return camelCase; + } + + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Properties/AssemblyInfo.cs b/Hangfire.AzureDocumentDB/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..e210d5b --- /dev/null +++ b/Hangfire.AzureDocumentDB/Properties/AssemblyInfo.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Hangfire.AzureDocumentDB")] +[assembly: AssemblyDescription("Azure DocumentDB provider for Hangfire")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Hangfire.AzureDocumentDB")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("e0ad3801-5504-49a7-80c6-8b4373dde0e5")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Hangfire.AzureDocumentDB/Properties/Resources.Designer.cs b/Hangfire.AzureDocumentDB/Properties/Resources.Designer.cs new file mode 100644 index 0000000..d35cd41 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Properties/Resources.Designer.cs @@ -0,0 +1,71 @@ +//------------------------------------------------------------------------------ +// +// This code was generated by a tool. +// Runtime Version:4.0.30319.42000 +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// +//------------------------------------------------------------------------------ + +namespace Hangfire.AzureDocumentDB.Properties +{ + + + /// + /// A strongly-typed resource class, for looking up localized strings, etc. + /// + // This class was auto-generated by the StronglyTypedResourceBuilder + // class via a tool like ResGen or Visual Studio. + // To add or remove a member, edit your .ResX file then rerun ResGen + // with the /str option, or rebuild your VS project. + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")] + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] + internal class Resources + { + + private static global::System.Resources.ResourceManager resourceMan; + + private static global::System.Globalization.CultureInfo resourceCulture; + + [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] + internal Resources() + { + } + + /// + /// Returns the cached ResourceManager instance used by this class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Resources.ResourceManager ResourceManager + { + get + { + if ((resourceMan == null)) + { + global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("Hangfire.AzureDocumentDB.Properties.Resources", typeof(Resources).Assembly); + resourceMan = temp; + } + return resourceMan; + } + } + + /// + /// Overrides the current thread's CurrentUICulture property for all + /// resource lookups using this strongly typed resource class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Globalization.CultureInfo Culture + { + get + { + return resourceCulture; + } + set + { + resourceCulture = value; + } + } + } +} diff --git a/Hangfire.AzureDocumentDB/Properties/Resources.resx b/Hangfire.AzureDocumentDB/Properties/Resources.resx new file mode 100644 index 0000000..af7dbeb --- /dev/null +++ b/Hangfire.AzureDocumentDB/Properties/Resources.resx @@ -0,0 +1,117 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Properties/Settings.Designer.cs b/Hangfire.AzureDocumentDB/Properties/Settings.Designer.cs new file mode 100644 index 0000000..b6f4bdb --- /dev/null +++ b/Hangfire.AzureDocumentDB/Properties/Settings.Designer.cs @@ -0,0 +1,30 @@ +//------------------------------------------------------------------------------ +// +// This code was generated by a tool. +// Runtime Version:4.0.30319.42000 +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// +//------------------------------------------------------------------------------ + +namespace Hangfire.AzureDocumentDB.Properties +{ + + + [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("Microsoft.VisualStudio.Editors.SettingsDesigner.SettingsSingleFileGenerator", "11.0.0.0")] + internal sealed partial class Settings : global::System.Configuration.ApplicationSettingsBase + { + + private static Settings defaultInstance = ((Settings)(global::System.Configuration.ApplicationSettingsBase.Synchronized(new Settings()))); + + public static Settings Default + { + get + { + return defaultInstance; + } + } + } +} diff --git a/Hangfire.AzureDocumentDB/Properties/Settings.settings b/Hangfire.AzureDocumentDB/Properties/Settings.settings new file mode 100644 index 0000000..3964565 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Properties/Settings.settings @@ -0,0 +1,7 @@ + + + + + + + diff --git a/Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs b/Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs new file mode 100644 index 0000000..6fb54d6 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/Entities/FetchedJob.cs @@ -0,0 +1,59 @@ +using System; +using System.Linq; + +using Hangfire.Storage; + +using Microsoft.Azure.Documents.Client; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal class FetchedJob : IFetchedJob + { + private readonly AzureDocumentDbStorage storage; + private readonly Uri QueueDocumentCollectionUri; + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 1 }; + + public FetchedJob(AzureDocumentDbStorage storage, Entities.Queue data) + { + this.storage = storage; + Id = data.Id; + JobId = data.JobId; + Queue = data.Name; + SelfLink = data.SelfLink; + QueueDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "queues"); + } + + private string Id { get; } + + private string SelfLink { get; } + + public string JobId { get; } + + private string Queue { get; } + + public void Dispose() + { + } + + public void RemoveFromQueue() + { + bool exists = storage.Client.CreateDocumentQuery(QueueDocumentCollectionUri, QueryOptions) + .Where(d => d.Id == Id) + .AsEnumerable() + .Any(); + + if (exists) storage.Client.DeleteDocumentAsync(SelfLink).GetAwaiter().GetResult(); + } + + public void Requeue() + { + Entities.Queue data = new Entities.Queue + { + Id = Id, + Name = Queue, + JobId = JobId + }; + storage.Client.UpsertDocumentAsync(QueueDocumentCollectionUri, data).GetAwaiter().GetResult(); + } + } +} diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs new file mode 100644 index 0000000..fe92484 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueue.cs @@ -0,0 +1,11 @@ +using Hangfire.Storage; +using System.Threading; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal interface IPersistentJobQueue + { + IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken); + void Enqueue(string queue, string jobId); + } +} diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueMonitoringApi.cs b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueMonitoringApi.cs new file mode 100644 index 0000000..8e27e70 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueMonitoringApi.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal interface IPersistentJobQueueMonitoringApi + { + IEnumerable GetQueues(); + IEnumerable GetEnqueuedJobIds(string queue, int from, int perPage); + IEnumerable GetFetchedJobIds(string queue, int from, int perPage); + int GetEnqueuedCount(string queue); + } +} diff --git a/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueProvider.cs b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueProvider.cs new file mode 100644 index 0000000..e509418 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/Interface/IPersistentJobQueueProvider.cs @@ -0,0 +1,8 @@ +namespace Hangfire.AzureDocumentDB.Queue +{ + internal interface IPersistentJobQueueProvider + { + IPersistentJobQueue GetJobQueue(); + IPersistentJobQueueMonitoringApi GetJobQueueMonitoringApi(); + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueue.cs b/Hangfire.AzureDocumentDB/Queue/JobQueue.cs new file mode 100644 index 0000000..0477312 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/JobQueue.cs @@ -0,0 +1,69 @@ +using System; +using System.Linq; +using System.Threading; + +using Microsoft.Azure.Documents.Client; + +using Hangfire.Storage; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal class JobQueue : IPersistentJobQueue + { + private readonly AzureDocumentDbStorage storage; + private readonly string dequeueLockKey = "locks:job:dequeue"; + private readonly TimeSpan defaultLockTimeout = TimeSpan.FromMinutes(1); + private readonly TimeSpan checkInterval; + private readonly object syncLock = new object(); + + private readonly Uri QueueDocumentCollectionUri; + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 1 }; + + public JobQueue(AzureDocumentDbStorage storage) + { + this.storage = storage; + checkInterval = storage.Options.QueuePollInterval; + QueueDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "queues"); + } + + public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) + { + int index = 0; + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + lock (syncLock) + { + using (new AzureDocumentDbDistributedLock(dequeueLockKey, defaultLockTimeout, storage)) + { + string queue = queues.ElementAt(index); + + Entities.Queue data = storage.Client.CreateDocumentQuery(QueueDocumentCollectionUri, QueryOptions) + .Where(q => q.Name == queue) + .AsEnumerable() + .FirstOrDefault(); + + if (data != null) + { + storage.Client.DeleteDocumentAsync(data.SelfLink).GetAwaiter().GetResult(); + return new FetchedJob(storage, data); + } + } + } + + Thread.Sleep(checkInterval); + index = (index + 1) % queues.Length; + } + } + + public void Enqueue(string queue, string jobId) + { + Entities.Queue data = new Entities.Queue + { + Name = queue, + JobId = jobId + }; + storage.Client.CreateDocumentAsync(QueueDocumentCollectionUri, data).GetAwaiter().GetResult(); + } + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs new file mode 100644 index 0000000..73e17da --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs @@ -0,0 +1,46 @@ +using System; +using System.Linq; +using System.Collections.Generic; + +using Microsoft.Azure.Documents.Client; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal class JobQueueMonitoringApi : IPersistentJobQueueMonitoringApi + { + private readonly AzureDocumentDbStorage storage; + private readonly IEnumerable queues; + private readonly Uri QueueDocumentCollectionUri; + private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = 100 }; + + public JobQueueMonitoringApi(AzureDocumentDbStorage storage) + { + this.storage = storage; + queues = storage.Options.Queues; + QueueDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "queues"); + } + + public IEnumerable GetQueues() => queues; + + public int GetEnqueuedCount(string queue) + { + return storage.Client.CreateDocumentQuery(QueueDocumentCollectionUri, QueryOptions) + .Where(q => q.Name == queue) + .AsEnumerable() + .Count(); + } + + public IEnumerable GetEnqueuedJobIds(string queue, int from, int perPage) + { + return storage.Client.CreateDocumentQuery(QueueDocumentCollectionUri, QueryOptions) + .Where(q => q.Name == queue) + .AsEnumerable() + .Skip(from).Take(perPage) + .Select(c => c.JobId) + .ToList(); + } + + public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) => GetEnqueuedJobIds(queue, from, perPage); + + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueProvider.cs b/Hangfire.AzureDocumentDB/Queue/JobQueueProvider.cs new file mode 100644 index 0000000..8017c19 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/JobQueueProvider.cs @@ -0,0 +1,17 @@ +namespace Hangfire.AzureDocumentDB.Queue +{ + internal class JobQueueProvider : IPersistentJobQueueProvider + { + private readonly JobQueue queue; + private readonly JobQueueMonitoringApi monitoringQueue; + + public JobQueueProvider(AzureDocumentDbStorage storage) + { + queue = new JobQueue(storage); + monitoringQueue = new JobQueueMonitoringApi(storage); + } + + public IPersistentJobQueue GetJobQueue() => queue; + public IPersistentJobQueueMonitoringApi GetJobQueueMonitoringApi() => monitoringQueue; + } +} diff --git a/Hangfire.AzureDocumentDB/Queue/PersistentJobQueueProviderCollection.cs b/Hangfire.AzureDocumentDB/Queue/PersistentJobQueueProviderCollection.cs new file mode 100644 index 0000000..c65fbc3 --- /dev/null +++ b/Hangfire.AzureDocumentDB/Queue/PersistentJobQueueProviderCollection.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections; +using System.Collections.Generic; + +namespace Hangfire.AzureDocumentDB.Queue +{ + internal sealed class PersistentJobQueueProviderCollection : IEnumerable + { + private readonly IPersistentJobQueueProvider provider; + private readonly List providers = new List(); + private readonly Dictionary providersByQueue = new Dictionary(StringComparer.OrdinalIgnoreCase); + + public PersistentJobQueueProviderCollection(IPersistentJobQueueProvider provider) + { + if (provider == null) throw new ArgumentNullException(nameof(provider)); + this.provider = provider; + providers.Add(this.provider); + } + + public void Add(IPersistentJobQueueProvider queueProvider, IEnumerable queues) + { + if (queueProvider == null) throw new ArgumentNullException(nameof(queueProvider)); + if (queues == null) throw new ArgumentNullException(nameof(queues)); + + providers.Add(queueProvider); + foreach (string queue in queues) + { + providersByQueue.Add(queue, queueProvider); + } + } + + public IPersistentJobQueueProvider GetProvider(string queue) => providersByQueue.ContainsKey(queue) ? providersByQueue[queue] : provider; + public IEnumerator GetEnumerator() => providers.GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + } +} \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/packages.config b/Hangfire.AzureDocumentDB/packages.config new file mode 100644 index 0000000..7d954f9 --- /dev/null +++ b/Hangfire.AzureDocumentDB/packages.config @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index fe3d226..702f5fa 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,74 @@ # Hangfire.AzureDocumentDB -Azure DocumentDB storage provider for Hangfire + +[![Official Site](https://img.shields.io/badge/site-hangfire.io-blue.svg)](http://hangfire.io) +[![Latest version](https://img.shields.io/nuget/vpre/Hangfire.AzureDocumentDB.svg)](https://www.nuget.org/packages/Hangfire.AzureDocumentDB) +[![Build status](https://ci.appveyor.com/api/projects/status/uvxh94dhxcokga47?svg=true)](https://ci.appveyor.com/project/imranmomin/hangfire-azuredocumentdb) + +This repo will add a [Microsoft Azure DocumentDB](https://azure.microsoft.com/en-ca/services/documentdb) storage support to [Hangfire](http://hangfire.io) - fire-and-forget, delayed and recurring tasks runner for .NET. Scalable and reliable background job runner. Supports multiple servers, CPU and I/O intensive, long-running and short-running jobs. + +Installation + +------------- + +[Hangfire.AzureDocumentDB](https://www.nuget.org/packages/Hangfire.AzureDocumentDB) is available as a NuGet package. Install it using the NuGet Package Console window: + +```powershell + +PM> Install-Package Hangfire.AzureDocumentDB +``` + +Usage + +------------- + +Use one the following ways to initialize `AzureDocumentDbStorage` + +```csharp +GlobalConfiguration.Configuration.UseAzureDocumentDbStorage("", "", ""); + +Hangfire.AzureDocumentDB.AzureDocumentDbStorage azureDocumentDBStorage = new Hangfire.AzureDocumentDB.AzureDocumentDbStorage("", "", ""); +GlobalConfiguration.Configuration.UseStorage(azureDocumentDBStorage); +``` + +```csharp +// customize any options +Hangfire.AzureDocumentDB.AzureDocumentDbStorageOptions azureDocumentDBStorageOptions = new Hangfire.AzureDocumentDB.AzureDocumentDbStorageOptions +{ + Queues = new[] { "default", "critical" }, + RequestTimeout = TimeSpan.FromSeconds(30), + ExpirationCheckInterval = TimeSpan.FromMinutes(15), + CountersAggregateInterval = TimeSpan.FromMinutes(1), + QueuePollInterval = TimeSpan.FromSeconds(2) +}; + +GlobalConfiguration.Configuration.UseAzureDocumentDbStorage("", "", "", azureDocumentDBStorageOptions); + +Hangfire.AzureDocumentDB.AzureDocumentDbStorage azureDocumentDBStorage = new Hangfire.AzureDocumentDB.AzureDocumentDbStorage("", "", "", azureDocumentDBStorageOptions); +GlobalConfiguration.Configuration.UseStorage(azureDocumentDBStorage); +``` + +Limitations + +------------- + +Currently, the storage will create individual collections. In future will try to get an option to use the specified collections. + +* Servers +* Queues +* Jobs +* Hashes +* Sets +* Lists +* Counters +* States +* Locks + +Questions? Problems? + +------------- + +Open-source project are developing more smoothly, when all discussions are held in public. + +If you have any questions or problems related to Hangfire.AzureDocumentDB itself or this storage implementation or want to discuss new features, please create under [issues](https://github.com/imranmomin/Hangfire.AzureDocumentDB/issues/new) and assign the correct label for discussion. + +If you've discovered a bug, please report it to the [GitHub Issues](https://github.com/imranmomin/Hangfire.AzureDocumentDB/pulls). Detailed reports with stack traces, actual and expected behavours are welcome. \ No newline at end of file