diff --git a/Hangfire.AzureDocumentDB.nuspec b/Hangfire.AzureDocumentDB.nuspec deleted file mode 100644 index 50c5888..0000000 --- a/Hangfire.AzureDocumentDB.nuspec +++ /dev/null @@ -1,26 +0,0 @@ - - - - Hangfire.AzureDocumentDB - 1.0.0 - Imran Momin - Imran Momin - https://github.com/imranmomin/Hangfire.AzureDocumentDB/blob/master/LICENSE - https://github.com/imranmomin/Hangfire.AzureDocumentDB - false - This repo will add Microsoft Azure DocumentDB storage provider for Hangfire - Copyright 2017 - Hangfire Azure DocumentDB - en-US - - - - - - - - - - - - diff --git a/Hangfire.AzureDocumentDB/CountersAggregator.cs b/Hangfire.AzureDocumentDB/CountersAggregator.cs index 97c093b..adb5a07 100644 --- a/Hangfire.AzureDocumentDB/CountersAggregator.cs +++ b/Hangfire.AzureDocumentDB/CountersAggregator.cs @@ -44,15 +44,14 @@ public void Execute(CancellationToken cancellationToken) .AsEnumerable() .ToList(); - Dictionary> counters = rawCounters.GroupBy(c => c.Key) - .ToDictionary(k => k.Key, v => new Tuple(v.Sum(c => c.Value), v.Max(c => c.ExpireOn))); + Dictionary counters = rawCounters.GroupBy(c => c.Key) + .ToDictionary(k => k.Key, v=> (Sum: v.Sum(c => c.Value), ExpireOn: v.Max(c => c.ExpireOn))); Array.ForEach(counters.Keys.ToArray(), key => { cancellationToken.ThrowIfCancellationRequested(); - Tuple data; - if (counters.TryGetValue(key, out data)) + if (counters.TryGetValue(key, out var data)) { Counter aggregated = storage.Client.CreateDocumentQuery(storage.CollectionUri, queryOptions) .Where(c => c.Key == key && c.Type == CounterTypes.Aggregrate && c.DocumentType == DocumentTypes.Counter) @@ -65,14 +64,14 @@ public void Execute(CancellationToken cancellationToken) { Key = key, Type = CounterTypes.Aggregrate, - Value = data.Item1, - ExpireOn = data.Item2 + Value = data.Sum, + ExpireOn = data.ExpireOn }; } else { - aggregated.Value += data.Item1; - aggregated.ExpireOn = data.Item2; + aggregated.Value += data.Sum; + aggregated.ExpireOn = data.ExpireOn; } Task> task = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, aggregated); @@ -95,7 +94,7 @@ public void Execute(CancellationToken cancellationToken) logger.Trace("Records from the 'Counter' table aggregated."); cancellationToken.WaitHandle.WaitOne(checkInterval); } - + public override string ToString() => GetType().ToString(); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs index b2f5c6b..4a96cac 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbConnection.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbConnection.cs @@ -88,7 +88,7 @@ public override JobData GetJobData(string jobId) if (jobId == null) throw new ArgumentNullException(nameof(jobId)); Documents.Job data = Storage.Client.CreateDocumentQuery(Storage.CollectionUri, queryOptions) - .Where(j => j.Id == jobId) + .Where(j => j.Id == jobId && j.DocumentType == DocumentTypes.Job) .AsEnumerable() .FirstOrDefault(); @@ -162,7 +162,7 @@ public override string GetJobParameter(string id, string name) if (name == null) throw new ArgumentNullException(nameof(name)); List parameters = Storage.Client.CreateDocumentQuery(Storage.CollectionUri, queryOptions) - .Where(j => j.Id == id) + .Where(j => j.Id == id && j.DocumentType == DocumentTypes.Job) .SelectMany(j => j.Parameters) .AsEnumerable() .ToList(); @@ -274,12 +274,11 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore SqlQuerySpec sql = new SqlQuerySpec { - QueryText = "SELECT TOP 1 VALUE c['value'] FROM c WHERE c.key = @key AND c.type = @type AND (c.score BETWEEN @from AND @to) " + - "ORDER BY c.created_on DESC ", + QueryText = "SELECT TOP 1 VALUE c['value'] FROM c WHERE c.key = @key AND c.type = @type AND (c.score BETWEEN @from AND @to) ORDER BY c.score", Parameters = new SqlParameterCollection { new SqlParameter("@key", key), - new SqlParameter("@type", DocumentTypes.State), + new SqlParameter("@type", DocumentTypes.Set), new SqlParameter("@from", fromScore ), new SqlParameter("@to", toScore) } @@ -410,7 +409,7 @@ public override string GetValueFromHash(string key, string name) { new SqlParameter("@key", key), new SqlParameter("@field", name), - new SqlParameter("@type", DocumentTypes.State) + new SqlParameter("@type", DocumentTypes.Hash) } }; diff --git a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs b/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs index 0041787..f0b9986 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs @@ -23,18 +23,24 @@ public IList Queues() { List queueJobs = new List(); - Array.ForEach(storage.Options.Queues, queue => + var tuples = storage.QueueProviders + .Select(x => x.GetJobQueueMonitoringApi()) + .SelectMany(x => x.GetQueues(), (monitoring, queue) => new { Monitoring = monitoring, Queue = queue }) + .OrderBy(x => x.Queue) + .ToArray(); + + foreach (var tuple in tuples) { - long enqueueCount = EnqueuedCount(queue); - JobList jobs = EnqueuedJobs(queue, 0, 1); + long enqueueCount = EnqueuedCount(tuple.Queue); + JobList jobs = EnqueuedJobs(tuple.Queue, 0, 5); queueJobs.Add(new QueueWithTopEnqueuedJobsDto { Length = enqueueCount, Fetched = 0, - Name = queue, + Name = tuple.Queue, FirstJobs = jobs }); - }); + } return queueJobs; } @@ -59,7 +65,7 @@ public JobDetailsDto JobDetails(string jobId) if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId)); Documents.Job job = storage.Client.CreateDocumentQuery(storage.CollectionUri, queryOptions) - .Where(j => j.Id == jobId) + .Where(j => j.Id == jobId && j.DocumentType == DocumentTypes.Job) .AsEnumerable() .FirstOrDefault(); @@ -69,7 +75,7 @@ public JobDetailsDto JobDetails(string jobId) invocationData.Arguments = job.Arguments; List states = storage.Client.CreateDocumentQuery(storage.CollectionUri, queryOptions) - .Where(s => s.JobId == jobId) + .Where(s => s.JobId == jobId && s.DocumentType == DocumentTypes.State) .AsEnumerable() .Select(s => new StateHistoryDto { @@ -149,7 +155,9 @@ public StatisticsDto GetStatistics() results.Add("recurring-jobs", count); long GetValueOrDefault(string key) => results.Where(r => r.Key == key).Select(r => r.Value).SingleOrDefault(); - return new StatisticsDto + + // ReSharper disable once UseObjectOrCollectionInitializer + StatisticsDto statistics = new StatisticsDto { Enqueued = GetValueOrDefault("Enqueued"), Failed = GetValueOrDefault("Failed"), @@ -159,8 +167,13 @@ public StatisticsDto GetStatistics() Deleted = GetValueOrDefault("stats:deleted"), Recurring = GetValueOrDefault("recurring-jobs"), Servers = GetValueOrDefault("Servers"), - Queues = storage.Options.Queues.LongLength }; + + statistics.Queues = storage.QueueProviders + .SelectMany(x => x.GetJobQueueMonitoringApi().GetQueues()) + .Count(); + + return statistics; } #region Job List diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs b/Hangfire.AzureDocumentDB/DocumentDbStorage.cs index 460a5e3..760b2d2 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorage.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbStorage.cs @@ -37,30 +37,12 @@ public sealed class DocumentDbStorage : JobStorage /// The secret key for the DocumentDb Database /// The name of the database to connect with /// The name of the collection on the database - /// argument is null. - /// argument is null. - public DocumentDbStorage(string url, string authSecret, string database, string collection) : this(new DocumentDbStorageOptions { Endpoint = new Uri(url), AuthSecret = authSecret, DatabaseName = database, CollectionName = collection }) { } - - /// - /// Initializes the DocumentDbStorage form the url auth secret provide. - /// - /// The url string to DocumentDb Database - /// The secret key for the DocumentDb Database - /// The name of the database to connect with - /// The DocumentDbStorageOptions object to override any of the options - /// The name of the collection on the database - /// argument is null. - /// argument is null. - public DocumentDbStorage(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options) : this(Transform(url, authSecret, database, collection, options)) { } - - /// - /// Initializes the DocumentDbStorage form the url auth secret provide. - /// /// The DocumentDbStorageOptions object to override any of the options - /// argument is null. - private DocumentDbStorage(DocumentDbStorageOptions options) + public DocumentDbStorage(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options = null) { - Options = options ?? throw new ArgumentNullException(nameof(options)); + Options = options ?? new DocumentDbStorageOptions(); + Options.DatabaseName = database; + Options.CollectionName = collection; JsonSerializerSettings settings = new JsonSerializerSettings { @@ -70,8 +52,8 @@ private DocumentDbStorage(DocumentDbStorageOptions options) }; ConnectionPolicy connectionPolicy = ConnectionPolicy.Default; - connectionPolicy.RequestTimeout = options.RequestTimeout; - Client = new DocumentClient(options.Endpoint, options.AuthSecret, settings, connectionPolicy); + connectionPolicy.RequestTimeout = Options.RequestTimeout; + Client = new DocumentClient(new Uri(url), authSecret, settings, connectionPolicy); Task task = Client.OpenAsync(); Task continueTask = task.ContinueWith(t => Initialize(), TaskContinuationOptions.OnlyOnRanToCompletion); continueTask.Wait(); @@ -111,12 +93,11 @@ public override IEnumerable GetComponents() public override void WriteOptionsToLog(ILog logger) { logger.Info("Using the following options for Azure DocumentDB job storage:"); - logger.Info($" DocumentDB Url: {Options.Endpoint.AbsoluteUri}"); + logger.Info($" DocumentDB Url: {Client.ServiceEndpoint.AbsoluteUri}"); logger.Info($" Request Timeout: {Options.RequestTimeout}"); logger.Info($" Counter Agggerate Interval: {Options.CountersAggregateInterval.TotalSeconds} seconds"); logger.Info($" Queue Poll Interval: {Options.QueuePollInterval.TotalSeconds} seconds"); logger.Info($" Expiration Check Interval: {Options.ExpirationCheckInterval.TotalSeconds} seconds"); - logger.Info($" Queue: {string.Join(",", Options.Queues)}"); } /// @@ -173,17 +154,5 @@ private void Initialize() throw new ApplicationException("Unable to create the stored procedures", databaseTask.Exception); } } - - private static DocumentDbStorageOptions Transform(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options) - { - if (options == null) options = new DocumentDbStorageOptions(); - - options.Endpoint = new Uri(url); - options.AuthSecret = authSecret; - options.DatabaseName = database; - options.CollectionName = collection; - - return options; - } } } diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs b/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs index 22c0b0a..d174f79 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs @@ -9,26 +9,7 @@ namespace Hangfire /// public static class DocumentDbStorageExtensions { - /// - /// Enables to attach Azure DocumentDb to Hangfire - /// - /// The IGlobalConfiguration object - /// The url string to DocumentDb Database - /// The secret key for the DocumentDb Database - /// The name of the database to connect with - /// The name of the collection on the database - /// - public static IGlobalConfiguration UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection) - { - if (configuration == null) throw new ArgumentNullException(nameof(configuration)); - if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); - if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret)); - - DocumentDbStorage storage = new DocumentDbStorage(url, authSecret, database, collection); - return configuration.UseStorage(storage); - } - - /// + /// /// Enables to attach Azure DocumentDb to Hangfire /// /// The IGlobalConfiguration object @@ -38,13 +19,12 @@ public static IGlobalConfiguration UseAzureDocumentDbStorage( /// The name of the collection on the database /// The DocumentDbStorage object to override any of the options /// - public static IGlobalConfiguration UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection, DocumentDbStorageOptions options) + public static IGlobalConfiguration UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection, DocumentDbStorageOptions options = null) { if (configuration == null) throw new ArgumentNullException(nameof(configuration)); if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret)); - if (options == null) throw new ArgumentNullException(nameof(options)); - + DocumentDbStorage storage = new DocumentDbStorage(url, authSecret, database, collection, options); return configuration.UseStorage(storage); } diff --git a/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs b/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs index e3d77da..bd0af41 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs @@ -9,8 +9,6 @@ namespace Hangfire.Azure /// public class DocumentDbStorageOptions { - internal Uri Endpoint { get; set; } - internal string AuthSecret { get; set; } internal string DatabaseName { get; set; } internal string CollectionName { get; set; } @@ -19,11 +17,6 @@ public class DocumentDbStorageOptions /// public TimeSpan RequestTimeout { get; set; } - /// - /// Get or set list of queues to process. Default values "default", "critical" - /// - public string[] Queues { get; set; } - /// /// Get or set the interval timespan to process expired enteries. Default value 15 minutes /// Expired items under "locks", "jobs", "lists", "sets", "hashs", "counters/aggregrated" will be checked @@ -46,7 +39,6 @@ public class DocumentDbStorageOptions public DocumentDbStorageOptions() { RequestTimeout = TimeSpan.FromSeconds(30); - Queues = new[] { "default", "critical" }; ExpirationCheckInterval = TimeSpan.FromMinutes(5); CountersAggregateInterval = TimeSpan.FromMinutes(1); QueuePollInterval = TimeSpan.FromSeconds(2); diff --git a/Hangfire.AzureDocumentDB/ExpirationManager.cs b/Hangfire.AzureDocumentDB/ExpirationManager.cs index 5bbc5c4..0de3451 100644 --- a/Hangfire.AzureDocumentDB/ExpirationManager.cs +++ b/Hangfire.AzureDocumentDB/ExpirationManager.cs @@ -16,7 +16,7 @@ internal class ExpirationManager : IServerComponent private static readonly ILog logger = LogProvider.For(); private const string DISTRIBUTED_LOCK_KEY = "expirationmanager"; private static readonly TimeSpan defaultLockTimeout = TimeSpan.FromMinutes(5); - private static readonly string[] documents = { "locks", "jobs", "lists", "sets", "hashes", "counters" }; + private static readonly DocumentTypes[] documents = { DocumentTypes.Lock, DocumentTypes.Job, DocumentTypes.List, DocumentTypes.Set, DocumentTypes.Hash, DocumentTypes.Counter }; private readonly TimeSpan checkInterval; private readonly DocumentDbStorage storage; private readonly Uri spDeleteExpiredDocumentsUri; @@ -30,15 +30,14 @@ public ExpirationManager(DocumentDbStorage storage) public void Execute(CancellationToken cancellationToken) { - foreach (string document in documents) + foreach (DocumentTypes type in documents) { - logger.Debug($"Removing outdated records from the '{document}' document."); - DocumentTypes type = document.ToDocumentType(); + logger.Debug($"Removing outdated records from the '{type}' document."); using (new DocumentDbDistributedLock(DISTRIBUTED_LOCK_KEY, defaultLockTimeout, storage)) { Task> procedureTask = storage.Client.ExecuteStoredProcedureAsync(spDeleteExpiredDocumentsUri, type); - Task task = procedureTask.ContinueWith(t => logger.Trace($"Outdated records removed {t.Result.Response} records from the '{document}' document."), TaskContinuationOptions.OnlyOnRanToCompletion); + Task task = procedureTask.ContinueWith(t => logger.Trace($"Outdated records removed {t.Result.Response} records from the '{type}' document."), TaskContinuationOptions.OnlyOnRanToCompletion); task.Wait(cancellationToken); } @@ -46,22 +45,4 @@ public void Execute(CancellationToken cancellationToken) } } } - - internal static class ExpirationManagerExtenison - { - internal static DocumentTypes ToDocumentType(this string document) - { - switch (document) - { - case "locks": return DocumentTypes.Lock; - case "jobs": return DocumentTypes.Job; - case "lists": return DocumentTypes.List; - case "sets": return DocumentTypes.Set; - case "hashes": return DocumentTypes.Hash; - case "counters": return DocumentTypes.Counter; - } - - throw new ArgumentException(@"invalid document type", nameof(document)); - } - } } \ No newline at end of file diff --git a/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj b/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj index a2849bb..5adfbae 100644 --- a/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj +++ b/Hangfire.AzureDocumentDB/Hangfire.AzureDocumentDB.csproj @@ -31,11 +31,7 @@ - - - - - + diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs index bdae499..b486617 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs +++ b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using System.Collections.Generic; using Hangfire.Azure.Documents; @@ -9,15 +10,33 @@ namespace Hangfire.Azure.Queue internal class JobQueueMonitoringApi : IPersistentJobQueueMonitoringApi { private readonly DocumentDbStorage storage; - private readonly IEnumerable queues; + private readonly List queuesCache = new List(); + private DateTime cacheUpdated; + private readonly object cacheLock = new object(); + private static readonly TimeSpan queuesCacheTimeout = TimeSpan.FromSeconds(5); - public JobQueueMonitoringApi(DocumentDbStorage storage) + public JobQueueMonitoringApi(DocumentDbStorage storage) => this.storage = storage; + + public IEnumerable GetQueues() { - this.storage = storage; - queues = storage.Options.Queues; - } + lock (cacheLock) + { + if (queuesCache.Count == 0 || cacheUpdated.Add(queuesCacheTimeout) < DateTime.UtcNow) + { + IEnumerable result = storage.Client.CreateDocumentQuery(storage.CollectionUri) + .Where(q => q.DocumentType == DocumentTypes.Queue) + .Select(q => q.Name) + .AsEnumerable() + .Distinct(); - public IEnumerable GetQueues() => queues; + queuesCache.Clear(); + queuesCache.AddRange(result); + cacheUpdated = DateTime.UtcNow; + } + + return queuesCache.ToList(); + } + } public int GetEnqueuedCount(string queue) { diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/announceServer.js b/Hangfire.AzureDocumentDB/StoredProcedure/announceServer.js index 3725d5c..b04ec95 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/announceServer.js +++ b/Hangfire.AzureDocumentDB/StoredProcedure/announceServer.js @@ -21,9 +21,9 @@ function announceServer(server) { data = server; } else { data = documents[0]; - data.last_heartbeat = set.last_heartbeat; - data.workers = set.workers; - data.queues = set.queues; + data.last_heartbeat = server.last_heartbeat; + data.workers = server.workers; + data.queues = server.queues; } collection.upsertDocument(collection.getSelfLink(), data); diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/deleteExpiredDocuments.js b/Hangfire.AzureDocumentDB/StoredProcedure/deleteExpiredDocuments.js index 1543276..361cb6c 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/deleteExpiredDocuments.js +++ b/Hangfire.AzureDocumentDB/StoredProcedure/deleteExpiredDocuments.js @@ -16,8 +16,8 @@ function deleteExpiredDocuments(type) { response.setBody(0); if (err) throw err; - for (var i = 0; i < documents.length; i++) { - var self = documents[i]._self; + for (var index = 0; index < documents.length; index++) { + var self = documents[index]._self; collection.deleteDocument(self); } diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/removeServer.js b/Hangfire.AzureDocumentDB/StoredProcedure/removeServer.js index 3385d45..c0841ca 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/removeServer.js +++ b/Hangfire.AzureDocumentDB/StoredProcedure/removeServer.js @@ -14,7 +14,8 @@ function removeServer(id) { response.setBody(false); if (err) throw err; - if (documents.length > 1) throw new ("Found more than one server for :" + id); + if (documents.length === 0) throw new ("No server found for id :" + id); + if (documents.length > 1) throw new ("Found more than one server for id :" + id); var self = documents[0]._self; collection.deleteDocument(self); diff --git a/Hangfire.AzureDocumentDB/StoredProcedure/removedTimedOutServer.js b/Hangfire.AzureDocumentDB/StoredProcedure/removedTimedOutServer.js index a91979a..e74c579 100644 --- a/Hangfire.AzureDocumentDB/StoredProcedure/removedTimedOutServer.js +++ b/Hangfire.AzureDocumentDB/StoredProcedure/removedTimedOutServer.js @@ -15,7 +15,7 @@ function removedTimedOutServer(lastHeartbeat) { if (err) throw err; for (var index = 0; index < documents.length; index++) { - var self = documents[0]._self; + var self = documents[index]._self; collection.deleteDocument(self); } diff --git a/README.md b/README.md index 506e8c7..7c5ed6c 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,6 @@ GlobalConfiguration.Configuration.UseStorage(storage); // customize any options Hangfire.Azure.DocumentDbStorageOptions options = new Hangfire.Azure.DocumentDbStorageOptions { - Queues = new[] { "default", "critical" }, RequestTimeout = TimeSpan.FromSeconds(30), ExpirationCheckInterval = TimeSpan.FromMinutes(15), CountersAggregateInterval = TimeSpan.FromMinutes(1),