diff --git a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs b/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs index 61c7204..75ce9d7 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using Hangfire.Common; +using Hangfire.States; using Hangfire.Storage; using Microsoft.Azure.Documents; using Hangfire.Storage.Monitoring; @@ -185,28 +186,34 @@ public StatisticsDto GetStatistics() public JobList EnqueuedJobs(string queue, int from, int perPage) { string queryText = "SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND NOT is_defined(doc.fetched_at) ORDER BY doc.created_on"; - return GetJobsOnQueue(queryText, queue, from, perPage, (state, job) => new EnqueuedJobDto + return GetJobsOnQueue(queryText, queue, from, perPage, (state, job, fetchedAt) => new EnqueuedJobDto { Job = job, - State = state + State = state.Name, + InEnqueuedState = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), + EnqueuedAt = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase) + ? JobHelper.DeserializeNullableDateTime(state.Data["EnqueuedAt"]) + : null }); } public JobList FetchedJobs(string queue, int from, int perPage) { string queryText = "SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND is_defined(doc.fetched_at) ORDER BY doc.created_on"; - return GetJobsOnQueue(queryText, queue, from, perPage, (state, job) => new FetchedJobDto + return GetJobsOnQueue(queryText, queue, from, perPage, (state, job, fetchedAt) => new FetchedJobDto { Job = job, - State = state + State = state.Name, + FetchedAt = fetchedAt }); } public JobList ProcessingJobs(int from, int count) { - return GetJobsOnState(States.ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto + return GetJobsOnState(ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto { Job = job, + InProcessingState = ProcessingState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), ServerId = state.Data.ContainsKey("ServerId") ? state.Data["ServerId"] : state.Data["ServerName"], StartedAt = JobHelper.DeserializeDateTime(state.Data["StartedAt"]) }); @@ -214,9 +221,10 @@ public JobList ProcessingJobs(int from, int count) public JobList ScheduledJobs(int from, int count) { - return GetJobsOnState(States.ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto + return GetJobsOnState(ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto { Job = job, + InScheduledState = ScheduledState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), EnqueueAt = JobHelper.DeserializeDateTime(state.Data["EnqueueAt"]), ScheduledAt = JobHelper.DeserializeDateTime(state.Data["ScheduledAt"]) }); @@ -224,9 +232,10 @@ public JobList ScheduledJobs(int from, int count) public JobList SucceededJobs(int from, int count) { - return GetJobsOnState(States.SucceededState.StateName, from, count, (state, job) => new SucceededJobDto + return GetJobsOnState(SucceededState.StateName, from, count, (state, job) => new SucceededJobDto { Job = job, + InSucceededState = SucceededState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), Result = state.Data.ContainsKey("Result") ? state.Data["Result"] : null, TotalDuration = state.Data.ContainsKey("PerformanceDuration") && state.Data.ContainsKey("Latency") ? (long?)long.Parse(state.Data["PerformanceDuration"]) + long.Parse(state.Data["Latency"]) @@ -237,9 +246,10 @@ public JobList SucceededJobs(int from, int count) public JobList FailedJobs(int from, int count) { - return GetJobsOnState(States.FailedState.StateName, from, count, (state, job) => new FailedJobDto + return GetJobsOnState(FailedState.StateName, from, count, (state, job) => new FailedJobDto { Job = job, + InFailedState = FailedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), Reason = state.Reason, FailedAt = JobHelper.DeserializeNullableDateTime(state.Data["FailedAt"]), ExceptionDetails = state.Data["ExceptionDetails"], @@ -250,9 +260,10 @@ public JobList FailedJobs(int from, int count) public JobList DeletedJobs(int from, int count) { - return GetJobsOnState(States.DeletedState.StateName, from, count, (state, job) => new DeletedJobDto + return GetJobsOnState(DeletedState.StateName, from, count, (state, job) => new DeletedJobDto { Job = job, + InDeletedState = DeletedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase), DeletedAt = JobHelper.DeserializeNullableDateTime(state.Data["DeletedAt"]) }); } @@ -288,7 +299,7 @@ private JobList GetJobsOnState(string stateName, int from, int count, Func return new JobList(jobs); } - private JobList GetJobsOnQueue(string queryText, string queue, int from, int count, Func selector) + private JobList GetJobsOnQueue(string queryText, string queue, int from, int count, Func selector) { if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue)); List> jobs = new List>(); @@ -320,7 +331,10 @@ private JobList GetJobsOnQueue(string queryText, string queue, int from, i InvocationData invocationData = job.InvocationData; invocationData.Arguments = job.Arguments; - T data = selector(job.StateName, invocationData.Deserialize()); + uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId); + Task> stateTask = storage.Client.ReadDocumentAsync(uri); + + T data = selector(stateTask.Result, invocationData.Deserialize(), queueItem.FetchedAt); jobs.Add(new KeyValuePair(job.Id, data)); } }); @@ -352,15 +366,15 @@ public long FetchedCount(string queue) return counters.FetchedCount ?? 0; } - public long ScheduledCount() => GetNumberOfJobsByStateName(States.ScheduledState.StateName); + public long ScheduledCount() => GetNumberOfJobsByStateName(ScheduledState.StateName); - public long FailedCount() => GetNumberOfJobsByStateName(States.FailedState.StateName); + public long FailedCount() => GetNumberOfJobsByStateName(FailedState.StateName); - public long ProcessingCount() => GetNumberOfJobsByStateName(States.ProcessingState.StateName); + public long ProcessingCount() => GetNumberOfJobsByStateName(ProcessingState.StateName); - public long SucceededListCount() => GetNumberOfJobsByStateName(States.SucceededState.StateName); + public long SucceededListCount() => GetNumberOfJobsByStateName(SucceededState.StateName); - public long DeletedListCount() => GetNumberOfJobsByStateName(States.DeletedState.StateName); + public long DeletedListCount() => GetNumberOfJobsByStateName(DeletedState.StateName); private long GetNumberOfJobsByStateName(string state) { diff --git a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs index e6d8da0..f9b699b 100644 --- a/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs +++ b/Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs @@ -223,7 +223,8 @@ public void AddToSet(string key, string value, double score) { Key = key, Value = value, - Score = score + Score = score, + CreatedOn = DateTime.UtcNow }; Uri spAddToSetUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "addToSet"); @@ -282,7 +283,8 @@ public void InsertToList(string key, string value) List data = new List { Key = key, - Value = value + Value = value, + CreatedOn = DateTime.UtcNow }; Task> task = connection.Storage.Client.CreateDocumentAsync(connection.Storage.CollectionUri, data); diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueue.cs b/Hangfire.AzureDocumentDB/Queue/JobQueue.cs index a147eec..44a7293 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueue.cs +++ b/Hangfire.AzureDocumentDB/Queue/JobQueue.cs @@ -36,7 +36,8 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) SqlQuerySpec sql = new SqlQuerySpec { QueryText = "SELECT TOP 1 * FROM doc WHERE doc.type = @type AND doc.name = @name AND " + - "((NOT is_defined(doc.fetched_at)) OR (is_defined(doc.fetched_at) AND doc.fetched_at < @timeout))", + "((NOT is_defined(doc.fetched_at)) OR (is_defined(doc.fetched_at) AND doc.fetched_at < @timeout)) " + + "ORDER BY doc.created_on", Parameters = new SqlParameterCollection { new SqlParameter("@type", Documents.DocumentTypes.Queue), @@ -49,7 +50,7 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken) .AsEnumerable() .FirstOrDefault(); - if (data != null && !string.IsNullOrEmpty(data.JobId)) + if (data != null) { // mark the document data.FetchedAt = DateTime.UtcNow; diff --git a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs index 5c3d047..9afa71d 100644 --- a/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs +++ b/Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs @@ -101,10 +101,10 @@ public IEnumerable GetFetchedJobIds(string queue, int from, int perPage) } }; - (int Fetched, int Enqueued) result = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) + (int EnqueuedCount, int FetchedCount) result = storage.Client.CreateDocumentQuery(storage.CollectionUri, sql) .AsEnumerable() .GroupBy(q => q.Name) - .Select(v => (Fetched: v.Sum(q => q.FetchedAt.HasValue ? 1 : 0), Enqueued: v.Sum(q => q.FetchedAt.HasValue ? 0 : 1))) + .Select(v => (EnqueuedCount: v.Sum(q => q.FetchedAt.HasValue ? 0 : 1), FetchedCount: v.Sum(q => q.FetchedAt.HasValue ? 1 : 0))) .FirstOrDefault(); return result;