Skip to content
This repository has been archived by the owner on Feb 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #23 from imranmomin/develop
Browse files Browse the repository at this point in the history
2.1.1
  • Loading branch information
imranmomin authored Apr 28, 2018
2 parents 8e53dad + 4f76a04 commit 1e4c7d5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 22 deletions.
46 changes: 30 additions & 16 deletions Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;

using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using Microsoft.Azure.Documents;
using Hangfire.Storage.Monitoring;
Expand Down Expand Up @@ -185,48 +186,56 @@ public StatisticsDto GetStatistics()
public JobList<EnqueuedJobDto> EnqueuedJobs(string queue, int from, int perPage)
{
string queryText = "SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND NOT is_defined(doc.fetched_at) ORDER BY doc.created_on";
return GetJobsOnQueue(queryText, queue, from, perPage, (state, job) => new EnqueuedJobDto
return GetJobsOnQueue(queryText, queue, from, perPage, (state, job, fetchedAt) => new EnqueuedJobDto
{
Job = job,
State = state
State = state.Name,
InEnqueuedState = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
EnqueuedAt = EnqueuedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase)
? JobHelper.DeserializeNullableDateTime(state.Data["EnqueuedAt"])
: null
});
}

public JobList<FetchedJobDto> FetchedJobs(string queue, int from, int perPage)
{
string queryText = "SELECT * FROM doc WHERE doc.type = @type AND doc.name = @name AND is_defined(doc.fetched_at) ORDER BY doc.created_on";
return GetJobsOnQueue(queryText, queue, from, perPage, (state, job) => new FetchedJobDto
return GetJobsOnQueue(queryText, queue, from, perPage, (state, job, fetchedAt) => new FetchedJobDto
{
Job = job,
State = state
State = state.Name,
FetchedAt = fetchedAt
});
}

public JobList<ProcessingJobDto> ProcessingJobs(int from, int count)
{
return GetJobsOnState(States.ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto
return GetJobsOnState(ProcessingState.StateName, from, count, (state, job) => new ProcessingJobDto
{
Job = job,
InProcessingState = ProcessingState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
ServerId = state.Data.ContainsKey("ServerId") ? state.Data["ServerId"] : state.Data["ServerName"],
StartedAt = JobHelper.DeserializeDateTime(state.Data["StartedAt"])
});
}

public JobList<ScheduledJobDto> ScheduledJobs(int from, int count)
{
return GetJobsOnState(States.ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto
return GetJobsOnState(ScheduledState.StateName, from, count, (state, job) => new ScheduledJobDto
{
Job = job,
InScheduledState = ScheduledState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
EnqueueAt = JobHelper.DeserializeDateTime(state.Data["EnqueueAt"]),
ScheduledAt = JobHelper.DeserializeDateTime(state.Data["ScheduledAt"])
});
}

public JobList<SucceededJobDto> SucceededJobs(int from, int count)
{
return GetJobsOnState(States.SucceededState.StateName, from, count, (state, job) => new SucceededJobDto
return GetJobsOnState(SucceededState.StateName, from, count, (state, job) => new SucceededJobDto
{
Job = job,
InSucceededState = SucceededState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
Result = state.Data.ContainsKey("Result") ? state.Data["Result"] : null,
TotalDuration = state.Data.ContainsKey("PerformanceDuration") && state.Data.ContainsKey("Latency")
? (long?)long.Parse(state.Data["PerformanceDuration"]) + long.Parse(state.Data["Latency"])
Expand All @@ -237,9 +246,10 @@ public JobList<SucceededJobDto> SucceededJobs(int from, int count)

public JobList<FailedJobDto> FailedJobs(int from, int count)
{
return GetJobsOnState(States.FailedState.StateName, from, count, (state, job) => new FailedJobDto
return GetJobsOnState(FailedState.StateName, from, count, (state, job) => new FailedJobDto
{
Job = job,
InFailedState = FailedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
Reason = state.Reason,
FailedAt = JobHelper.DeserializeNullableDateTime(state.Data["FailedAt"]),
ExceptionDetails = state.Data["ExceptionDetails"],
Expand All @@ -250,9 +260,10 @@ public JobList<FailedJobDto> FailedJobs(int from, int count)

public JobList<DeletedJobDto> DeletedJobs(int from, int count)
{
return GetJobsOnState(States.DeletedState.StateName, from, count, (state, job) => new DeletedJobDto
return GetJobsOnState(DeletedState.StateName, from, count, (state, job) => new DeletedJobDto
{
Job = job,
InDeletedState = DeletedState.StateName.Equals(state.Name, StringComparison.OrdinalIgnoreCase),
DeletedAt = JobHelper.DeserializeNullableDateTime(state.Data["DeletedAt"])
});
}
Expand Down Expand Up @@ -288,7 +299,7 @@ private JobList<T> GetJobsOnState<T>(string stateName, int from, int count, Func
return new JobList<T>(jobs);
}

private JobList<T> GetJobsOnQueue<T>(string queryText, string queue, int from, int count, Func<string, Common.Job, T> selector)
private JobList<T> GetJobsOnQueue<T>(string queryText, string queue, int from, int count, Func<State, Common.Job, DateTime?, T> selector)
{
if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));
List<KeyValuePair<string, T>> jobs = new List<KeyValuePair<string, T>>();
Expand Down Expand Up @@ -320,7 +331,10 @@ private JobList<T> GetJobsOnQueue<T>(string queryText, string queue, int from, i
InvocationData invocationData = job.InvocationData;
invocationData.Arguments = job.Arguments;

T data = selector(job.StateName, invocationData.Deserialize());
uri = UriFactory.CreateDocumentUri(storage.Options.DatabaseName, storage.Options.CollectionName, job.StateId);
Task<DocumentResponse<State>> stateTask = storage.Client.ReadDocumentAsync<State>(uri);

T data = selector(stateTask.Result, invocationData.Deserialize(), queueItem.FetchedAt);
jobs.Add(new KeyValuePair<string, T>(job.Id, data));
}
});
Expand Down Expand Up @@ -352,15 +366,15 @@ public long FetchedCount(string queue)
return counters.FetchedCount ?? 0;
}

public long ScheduledCount() => GetNumberOfJobsByStateName(States.ScheduledState.StateName);
public long ScheduledCount() => GetNumberOfJobsByStateName(ScheduledState.StateName);

public long FailedCount() => GetNumberOfJobsByStateName(States.FailedState.StateName);
public long FailedCount() => GetNumberOfJobsByStateName(FailedState.StateName);

public long ProcessingCount() => GetNumberOfJobsByStateName(States.ProcessingState.StateName);
public long ProcessingCount() => GetNumberOfJobsByStateName(ProcessingState.StateName);

public long SucceededListCount() => GetNumberOfJobsByStateName(States.SucceededState.StateName);
public long SucceededListCount() => GetNumberOfJobsByStateName(SucceededState.StateName);

public long DeletedListCount() => GetNumberOfJobsByStateName(States.DeletedState.StateName);
public long DeletedListCount() => GetNumberOfJobsByStateName(DeletedState.StateName);

private long GetNumberOfJobsByStateName(string state)
{
Expand Down
6 changes: 4 additions & 2 deletions Hangfire.AzureDocumentDB/DocumentDbWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public void AddToSet(string key, string value, double score)
{
Key = key,
Value = value,
Score = score
Score = score,
CreatedOn = DateTime.UtcNow
};

Uri spAddToSetUri = UriFactory.CreateStoredProcedureUri(connection.Storage.Options.DatabaseName, connection.Storage.Options.CollectionName, "addToSet");
Expand Down Expand Up @@ -282,7 +283,8 @@ public void InsertToList(string key, string value)
List data = new List
{
Key = key,
Value = value
Value = value,
CreatedOn = DateTime.UtcNow
};

Task<ResourceResponse<Document>> task = connection.Storage.Client.CreateDocumentAsync(connection.Storage.CollectionUri, data);
Expand Down
5 changes: 3 additions & 2 deletions Hangfire.AzureDocumentDB/Queue/JobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
SqlQuerySpec sql = new SqlQuerySpec
{
QueryText = "SELECT TOP 1 * FROM doc WHERE doc.type = @type AND doc.name = @name AND " +
"((NOT is_defined(doc.fetched_at)) OR (is_defined(doc.fetched_at) AND doc.fetched_at < @timeout))",
"((NOT is_defined(doc.fetched_at)) OR (is_defined(doc.fetched_at) AND doc.fetched_at < @timeout)) " +
"ORDER BY doc.created_on",
Parameters = new SqlParameterCollection
{
new SqlParameter("@type", Documents.DocumentTypes.Queue),
Expand All @@ -49,7 +50,7 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
.AsEnumerable()
.FirstOrDefault();

if (data != null && !string.IsNullOrEmpty(data.JobId))
if (data != null)
{
// mark the document
data.FetchedAt = DateTime.UtcNow;
Expand Down
4 changes: 2 additions & 2 deletions Hangfire.AzureDocumentDB/Queue/JobQueueMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ public IEnumerable<string> GetFetchedJobIds(string queue, int from, int perPage)
}
};

(int Fetched, int Enqueued) result = storage.Client.CreateDocumentQuery<Documents.Queue>(storage.CollectionUri, sql)
(int EnqueuedCount, int FetchedCount) result = storage.Client.CreateDocumentQuery<Documents.Queue>(storage.CollectionUri, sql)
.AsEnumerable()
.GroupBy(q => q.Name)
.Select(v => (Fetched: v.Sum(q => q.FetchedAt.HasValue ? 1 : 0), Enqueued: v.Sum(q => q.FetchedAt.HasValue ? 0 : 1)))
.Select(v => (EnqueuedCount: v.Sum(q => q.FetchedAt.HasValue ? 0 : 1), FetchedCount: v.Sum(q => q.FetchedAt.HasValue ? 1 : 0)))
.FirstOrDefault();

return result;
Expand Down

0 comments on commit 1e4c7d5

Please sign in to comment.