Skip to content
This repository has been archived by the owner on Feb 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3 from imranmomin/develop
Browse files Browse the repository at this point in the history
single document collection option
  • Loading branch information
imranmomin authored May 21, 2017
2 parents 99f7685 + f878afd commit 0d69553
Show file tree
Hide file tree
Showing 29 changed files with 475 additions and 347 deletions.
128 changes: 63 additions & 65 deletions Hangfire.AzureDocumentDB/AzureDocumentDbConnection.cs

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Hangfire.AzureDocumentDB/AzureDocumentDbDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;

using Microsoft.Azure.Documents;
using Hangfire.AzureDocumentDB.Helper;
using Microsoft.Azure.Documents.Client;
using Hangfire.AzureDocumentDB.Entities;

Expand All @@ -24,23 +25,22 @@ public AzureDocumentDbDistributedLock(string resource, TimeSpan timeout, AzureDo

private void Acquire(string name, TimeSpan timeout)
{
Uri documentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "locks");
FeedOptions queryOptions = new FeedOptions { MaxItemCount = 1 };

System.Diagnostics.Stopwatch acquireStart = new System.Diagnostics.Stopwatch();
acquireStart.Start();

while (true)
{
Lock @lock = storage.Client.CreateDocumentQuery<Lock>(documentCollectionUri, queryOptions)
.Where(l => l.Name == name)
.AsEnumerable()
.FirstOrDefault();
bool exists = storage.Client.CreateDocumentQuery<Lock>(storage.Collections.LockDocumentCollectionUri, queryOptions)
.Where(l => l.Name == name && l.DocumentType == DocumentTypes.Lock)
.Select(l => 1)
.AsEnumerable()
.Any();

if (@lock == null)
if (exists == false)
{
@lock = new Lock { Name = name, ExpireOn = DateTime.UtcNow.Add(timeout) };
ResourceResponse<Document> response = storage.Client.CreateDocumentAsync(documentCollectionUri, @lock).GetAwaiter().GetResult();
Lock @lock = new Lock { Name = name, ExpireOn = DateTime.UtcNow.Add(timeout) };
ResourceResponse<Document> response = storage.Client.CreateDocumentWithRetriesAsync(storage.Collections.LockDocumentCollectionUri, @lock).GetAwaiter().GetResult();
if (response.StatusCode == HttpStatusCode.Created)
{
selfLink = response.Resource.SelfLink;
Expand All @@ -63,7 +63,7 @@ private void Relase()
{
lock (syncLock)
{
storage.Client.DeleteDocumentAsync(selfLink).GetAwaiter().GetResult();
storage.Client.DeleteDocumentWithRetriesAsync(selfLink).GetAwaiter().GetResult();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Hangfire.AzureDocumentDB
public class AzureDocumentDbDistributedLockException : Exception
{
/// <summary>
/// Initializes a new instance of the FirebaseDistributedLockException class with serialized data.
/// Initializes a new instance of the AzureDocumentDbDistributedLockException class with serialized data.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public AzureDocumentDbDistributedLockException(string message) : base(message)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace Hangfire.AzureDocumentDB
{
/// <summary>
/// Represents errors that occur while acquiring a distributed lock.
/// </summary>
[Serializable]
public class AzureDocumentDbDistributedRetryException : Exception
{
/// <summary>
/// Initializes a new instance of the AzureDocumentDbDistributedRetryException class with serialized data.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public AzureDocumentDbDistributedRetryException(string message) : base(message)
{
}
}
}
84 changes: 38 additions & 46 deletions Hangfire.AzureDocumentDB/AzureDocumentDbMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,11 @@ namespace Hangfire.AzureDocumentDB
internal sealed class AzureDocumentDbMonitoringApi : IMonitoringApi
{
private readonly AzureDocumentDbStorage storage;

private readonly FeedOptions QueryOptions = new FeedOptions { MaxItemCount = -1 };
private readonly Uri JobDocumentCollectionUri;
private readonly Uri StateDocumentCollectionUri;
private readonly Uri SetDocumentCollectionUri;
private readonly Uri CounterDocumentCollectionUri;
private readonly Uri ServerDocumentCollectionUri;
private readonly Uri QueueDocumentCollectionUri;

public AzureDocumentDbMonitoringApi(AzureDocumentDbStorage storage)
{
this.storage = storage;

JobDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "jobs");
StateDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "states");
SetDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "sets");
CounterDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "counters");
ServerDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "servers");
QueueDocumentCollectionUri = UriFactory.CreateDocumentCollectionUri(storage.Options.DatabaseName, "queues");
}

public IList<QueueWithTopEnqueuedJobsDto> Queues()
Expand All @@ -58,25 +44,24 @@ public IList<QueueWithTopEnqueuedJobsDto> Queues()

public IList<ServerDto> Servers()
{
List<Entities.Server> servers = storage.Client.CreateDocumentQuery<Entities.Server>(ServerDocumentCollectionUri, QueryOptions)
return storage.Client.CreateDocumentQuery<Entities.Server>(storage.Collections.ServerDocumentCollectionUri, QueryOptions)
.Where(s => s.DocumentType == DocumentTypes.Server)
.AsEnumerable()
.ToList();

return servers.Select(server => new ServerDto
{
Name = server.ServerId,
Heartbeat = server.LastHeartbeat,
Queues = server.Queues,
StartedAt = server.CreatedOn,
WorkersCount = server.Workers
}).ToList();
.Select(server => new ServerDto
{
Name = server.ServerId,
Heartbeat = server.LastHeartbeat,
Queues = server.Queues,
StartedAt = server.CreatedOn,
WorkersCount = server.Workers
}).ToList();
}

public JobDetailsDto JobDetails(string jobId)
{
if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId));

Entities.Job job = storage.Client.CreateDocumentQuery<Entities.Job>(JobDocumentCollectionUri, QueryOptions)
Entities.Job job = storage.Client.CreateDocumentQuery<Entities.Job>(storage.Collections.JobDocumentCollectionUri, QueryOptions)
.Where(j => j.Id == jobId)
.AsEnumerable()
.FirstOrDefault();
Expand All @@ -86,7 +71,7 @@ public JobDetailsDto JobDetails(string jobId)
InvocationData invocationData = job.InvocationData;
invocationData.Arguments = job.Arguments;

List<StateHistoryDto> states = storage.Client.CreateDocumentQuery<State>(StateDocumentCollectionUri, QueryOptions)
List<StateHistoryDto> states = storage.Client.CreateDocumentQuery<State>(storage.Collections.StateDocumentCollectionUri, QueryOptions)
.Where(s => s.JobId == jobId)
.AsEnumerable()
.Select(s => new StateHistoryDto
Expand All @@ -112,10 +97,12 @@ public JobDetailsDto JobDetails(string jobId)

public StatisticsDto GetStatistics()
{
// TODO: move to stored procedure
Dictionary<string, long> results = new Dictionary<string, long>();

// get counts of jobs groupby on state
Dictionary<string, long> states = storage.Client.CreateDocumentQuery<Entities.Job>(JobDocumentCollectionUri, QueryOptions)
Dictionary<string, long> states = storage.Client.CreateDocumentQuery<Entities.Job>(storage.Collections.JobDocumentCollectionUri, QueryOptions)
.Where(j => j.DocumentType == DocumentTypes.Job)
.Select(j => j.StateName)
.AsEnumerable()
.Where(j => !string.IsNullOrEmpty(j))
Expand All @@ -125,25 +112,26 @@ public StatisticsDto GetStatistics()
results = results.Concat(states).ToDictionary(k => k.Key, v => v.Value);

// get counts of servers
long servers = storage.Client.CreateDocumentQuery<Entities.Server>(ServerDocumentCollectionUri, QueryOptions)
.Select(s => s.Id)
long servers = storage.Client.CreateDocumentQuery<Entities.Server>(storage.Collections.ServerDocumentCollectionUri, QueryOptions)
.Where(s => s.DocumentType == DocumentTypes.Server)
.Select(s => 1)
.AsEnumerable()
.LongCount();
results.Add("Servers", servers);

// get sum of stats:succeeded counters raw / aggregate
Dictionary<string, long> counters = storage.Client.CreateDocumentQuery<Counter>(CounterDocumentCollectionUri, QueryOptions)
.Where(c => c.Key == "stats:succeeded" || c.Key == "stats:deleted")
Dictionary<string, long> counters = storage.Client.CreateDocumentQuery<Counter>(storage.Collections.CounterDocumentCollectionUri, QueryOptions)
.Where(c => (c.Key == "stats:succeeded" || c.Key == "stats:deleted") && c.DocumentType == DocumentTypes.Counter)
.AsEnumerable()
.GroupBy(c => c.Key)
.ToDictionary(g => g.Key, g => (long)g.Sum(c => c.Value));

results = results.Concat(counters).ToDictionary(k => k.Key, v => v.Value);

long count = 0;
count += storage.Client.CreateDocumentQuery<Set>(SetDocumentCollectionUri, QueryOptions)
.Where(s => s.Key == "recurring-jobs")
.Select(s => s.Id)
count += storage.Client.CreateDocumentQuery<Set>(storage.Collections.SetDocumentCollectionUri, QueryOptions)
.Where(s => s.Key == "recurring-jobs" && s.DocumentType == DocumentTypes.Set)
.Select(s => 1)
.AsEnumerable()
.LongCount();

Expand Down Expand Up @@ -241,15 +229,17 @@ public JobList<DeletedJobDto> DeletedJobs(int from, int count)

private JobList<T> GetJobsOnState<T>(string stateName, int from, int count, Func<State, Common.Job, T> selector)
{
// TODO: move to stored procedure
List<KeyValuePair<string, T>> jobs = new List<KeyValuePair<string, T>>();

List<Entities.Job> filterJobs = storage.Client.CreateDocumentQuery<Entities.Job>(JobDocumentCollectionUri, QueryOptions)
.Where(j => j.StateName == stateName)
List<Entities.Job> filterJobs = storage.Client.CreateDocumentQuery<Entities.Job>(storage.Collections.JobDocumentCollectionUri, QueryOptions)
.Where(j => j.DocumentType == DocumentTypes.Job && j.StateName == stateName)
.AsEnumerable()
.Skip(from).Take(count)
.ToList();

List<State> states = storage.Client.CreateDocumentQuery<State>(StateDocumentCollectionUri, QueryOptions)
List<State> states = storage.Client.CreateDocumentQuery<State>(storage.Collections.StateDocumentCollectionUri, QueryOptions)
.Where(s => s.DocumentType == DocumentTypes.State)
.AsEnumerable()
.Where(s => filterJobs.Any(j => j.StateId == s.Id))
.ToList();
Expand All @@ -273,15 +263,17 @@ private JobList<T> GetJobsOnQueue<T>(string queue, int from, int count, Func<str
{
if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));

// TODO: move to stored procedure
List<KeyValuePair<string, T>> jobs = new List<KeyValuePair<string, T>>();

List<Entities.Queue> queues = storage.Client.CreateDocumentQuery<Entities.Queue>(QueueDocumentCollectionUri, QueryOptions)
.Where(q => q.Name == queue)
List<Entities.Queue> queues = storage.Client.CreateDocumentQuery<Entities.Queue>(storage.Collections.QueueDocumentCollectionUri, QueryOptions)
.Where(q => q.Name == queue && q.DocumentType == DocumentTypes.Queue)
.AsEnumerable()
.Skip(from).Take(count)
.ToList();

List<Entities.Job> filterJobs = storage.Client.CreateDocumentQuery<Entities.Job>(JobDocumentCollectionUri, QueryOptions)
List<Entities.Job> filterJobs = storage.Client.CreateDocumentQuery<Entities.Job>(storage.Collections.JobDocumentCollectionUri, QueryOptions)
.Where(j => j.DocumentType == DocumentTypes.Job)
.AsEnumerable()
.Where(j => queues.Any(q => q.JobId == j.Id))
.ToList();
Expand Down Expand Up @@ -326,9 +318,9 @@ public long EnqueuedCount(string queue)

private long GetNumberOfJobsByStateName(string state)
{
return storage.Client.CreateDocumentQuery<Entities.Job>(JobDocumentCollectionUri, QueryOptions)
.Where(j => j.StateName == state)
.Select(s => s.Id)
return storage.Client.CreateDocumentQuery<Entities.Job>(storage.Collections.JobDocumentCollectionUri, QueryOptions)
.Where(j => j.DocumentType == DocumentTypes.Job && j.StateName == state)
.Select(s => 1)
.AsEnumerable()
.LongCount();
}
Expand Down Expand Up @@ -359,8 +351,8 @@ private Dictionary<DateTime, long> GetTimelineStats(Dictionary<string, DateTime>
{
Dictionary<DateTime, long> result = keys.ToDictionary(k => k.Value, v => default(long));

Dictionary<string, int> data = storage.Client.CreateDocumentQuery<Counter>(CounterDocumentCollectionUri, QueryOptions)
.Where(c => c.Type == CounterTypes.Aggregrate)
Dictionary<string, int> data = storage.Client.CreateDocumentQuery<Counter>(storage.Collections.CounterDocumentCollectionUri, QueryOptions)
.Where(c => c.Type == CounterTypes.Aggregrate && c.DocumentType == DocumentTypes.Counter)
.AsEnumerable()
.Where(c => keys.ContainsKey(c.Key))
.ToDictionary(k => k.Key, k => k.Value);
Expand Down
Loading

0 comments on commit 0d69553

Please sign in to comment.