Skip to content
This repository has been archived by the owner on Feb 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #19 from imranmomin/develop
Browse files Browse the repository at this point in the history
bug fixes and optimization
  • Loading branch information
imranmomin authored Apr 6, 2018
2 parents 18955f1 + 77ba152 commit cb444cc
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 162 deletions.
26 changes: 0 additions & 26 deletions Hangfire.AzureDocumentDB.nuspec

This file was deleted.

17 changes: 8 additions & 9 deletions Hangfire.AzureDocumentDB/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ public void Execute(CancellationToken cancellationToken)
.AsEnumerable()
.ToList();

Dictionary<string, Tuple<int, DateTime?>> counters = rawCounters.GroupBy(c => c.Key)
.ToDictionary(k => k.Key, v => new Tuple<int, DateTime?>(v.Sum(c => c.Value), v.Max(c => c.ExpireOn)));
Dictionary<string, (int Sum, DateTime? ExpireOn)> counters = rawCounters.GroupBy(c => c.Key)
.ToDictionary(k => k.Key, v=> (Sum: v.Sum(c => c.Value), ExpireOn: v.Max(c => c.ExpireOn)));

Array.ForEach(counters.Keys.ToArray(), key =>
{
cancellationToken.ThrowIfCancellationRequested();

Tuple<int, DateTime?> data;
if (counters.TryGetValue(key, out data))
if (counters.TryGetValue(key, out var data))
{
Counter aggregated = storage.Client.CreateDocumentQuery<Counter>(storage.CollectionUri, queryOptions)
.Where(c => c.Key == key && c.Type == CounterTypes.Aggregrate && c.DocumentType == DocumentTypes.Counter)
Expand All @@ -65,14 +64,14 @@ public void Execute(CancellationToken cancellationToken)
{
Key = key,
Type = CounterTypes.Aggregrate,
Value = data.Item1,
ExpireOn = data.Item2
Value = data.Sum,
ExpireOn = data.ExpireOn
};
}
else
{
aggregated.Value += data.Item1;
aggregated.ExpireOn = data.Item2;
aggregated.Value += data.Sum;
aggregated.ExpireOn = data.ExpireOn;
}

Task<ResourceResponse<Document>> task = storage.Client.UpsertDocumentWithRetriesAsync(storage.CollectionUri, aggregated);
Expand All @@ -95,7 +94,7 @@ public void Execute(CancellationToken cancellationToken)
logger.Trace("Records from the 'Counter' table aggregated.");
cancellationToken.WaitHandle.WaitOne(checkInterval);
}

public override string ToString() => GetType().ToString();

}
Expand Down
11 changes: 5 additions & 6 deletions Hangfire.AzureDocumentDB/DocumentDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public override JobData GetJobData(string jobId)
if (jobId == null) throw new ArgumentNullException(nameof(jobId));

Documents.Job data = Storage.Client.CreateDocumentQuery<Documents.Job>(Storage.CollectionUri, queryOptions)
.Where(j => j.Id == jobId)
.Where(j => j.Id == jobId && j.DocumentType == DocumentTypes.Job)
.AsEnumerable()
.FirstOrDefault();

Expand Down Expand Up @@ -162,7 +162,7 @@ public override string GetJobParameter(string id, string name)
if (name == null) throw new ArgumentNullException(nameof(name));

List<Parameter> parameters = Storage.Client.CreateDocumentQuery<Documents.Job>(Storage.CollectionUri, queryOptions)
.Where(j => j.Id == id)
.Where(j => j.Id == id && j.DocumentType == DocumentTypes.Job)
.SelectMany(j => j.Parameters)
.AsEnumerable()
.ToList();
Expand Down Expand Up @@ -274,12 +274,11 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore

SqlQuerySpec sql = new SqlQuerySpec
{
QueryText = "SELECT TOP 1 VALUE c['value'] FROM c WHERE c.key = @key AND c.type = @type AND (c.score BETWEEN @from AND @to) " +
"ORDER BY c.created_on DESC ",
QueryText = "SELECT TOP 1 VALUE c['value'] FROM c WHERE c.key = @key AND c.type = @type AND (c.score BETWEEN @from AND @to) ORDER BY c.score",
Parameters = new SqlParameterCollection
{
new SqlParameter("@key", key),
new SqlParameter("@type", DocumentTypes.State),
new SqlParameter("@type", DocumentTypes.Set),
new SqlParameter("@from", fromScore ),
new SqlParameter("@to", toScore)
}
Expand Down Expand Up @@ -410,7 +409,7 @@ public override string GetValueFromHash(string key, string name)
{
new SqlParameter("@key", key),
new SqlParameter("@field", name),
new SqlParameter("@type", DocumentTypes.State)
new SqlParameter("@type", DocumentTypes.Hash)
}
};

Expand Down
31 changes: 22 additions & 9 deletions Hangfire.AzureDocumentDB/DocumentDbMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ public IList<QueueWithTopEnqueuedJobsDto> Queues()
{
List<QueueWithTopEnqueuedJobsDto> queueJobs = new List<QueueWithTopEnqueuedJobsDto>();

Array.ForEach(storage.Options.Queues, queue =>
var tuples = storage.QueueProviders
.Select(x => x.GetJobQueueMonitoringApi())
.SelectMany(x => x.GetQueues(), (monitoring, queue) => new { Monitoring = monitoring, Queue = queue })
.OrderBy(x => x.Queue)
.ToArray();

foreach (var tuple in tuples)
{
long enqueueCount = EnqueuedCount(queue);
JobList<EnqueuedJobDto> jobs = EnqueuedJobs(queue, 0, 1);
long enqueueCount = EnqueuedCount(tuple.Queue);
JobList<EnqueuedJobDto> jobs = EnqueuedJobs(tuple.Queue, 0, 5);
queueJobs.Add(new QueueWithTopEnqueuedJobsDto
{
Length = enqueueCount,
Fetched = 0,
Name = queue,
Name = tuple.Queue,
FirstJobs = jobs
});
});
}

return queueJobs;
}
Expand All @@ -59,7 +65,7 @@ public JobDetailsDto JobDetails(string jobId)
if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId));

Documents.Job job = storage.Client.CreateDocumentQuery<Documents.Job>(storage.CollectionUri, queryOptions)
.Where(j => j.Id == jobId)
.Where(j => j.Id == jobId && j.DocumentType == DocumentTypes.Job)
.AsEnumerable()
.FirstOrDefault();

Expand All @@ -69,7 +75,7 @@ public JobDetailsDto JobDetails(string jobId)
invocationData.Arguments = job.Arguments;

List<StateHistoryDto> states = storage.Client.CreateDocumentQuery<State>(storage.CollectionUri, queryOptions)
.Where(s => s.JobId == jobId)
.Where(s => s.JobId == jobId && s.DocumentType == DocumentTypes.State)
.AsEnumerable()
.Select(s => new StateHistoryDto
{
Expand Down Expand Up @@ -149,7 +155,9 @@ public StatisticsDto GetStatistics()
results.Add("recurring-jobs", count);

long GetValueOrDefault(string key) => results.Where(r => r.Key == key).Select(r => r.Value).SingleOrDefault();
return new StatisticsDto

// ReSharper disable once UseObjectOrCollectionInitializer
StatisticsDto statistics = new StatisticsDto
{
Enqueued = GetValueOrDefault("Enqueued"),
Failed = GetValueOrDefault("Failed"),
Expand All @@ -159,8 +167,13 @@ public StatisticsDto GetStatistics()
Deleted = GetValueOrDefault("stats:deleted"),
Recurring = GetValueOrDefault("recurring-jobs"),
Servers = GetValueOrDefault("Servers"),
Queues = storage.Options.Queues.LongLength
};

statistics.Queues = storage.QueueProviders
.SelectMany(x => x.GetJobQueueMonitoringApi().GetQueues())
.Count();

return statistics;
}

#region Job List
Expand Down
45 changes: 7 additions & 38 deletions Hangfire.AzureDocumentDB/DocumentDbStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,12 @@ public sealed class DocumentDbStorage : JobStorage
/// <param name="authSecret">The secret key for the DocumentDb Database</param>
/// <param name="database">The name of the database to connect with</param>
/// <param name="collection">The name of the collection on the database</param>
/// <exception cref="ArgumentNullException"><paramref name="url"/> argument is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="authSecret"/> argument is null.</exception>
public DocumentDbStorage(string url, string authSecret, string database, string collection) : this(new DocumentDbStorageOptions { Endpoint = new Uri(url), AuthSecret = authSecret, DatabaseName = database, CollectionName = collection }) { }

/// <summary>
/// Initializes the DocumentDbStorage form the url auth secret provide.
/// </summary>
/// <param name="url">The url string to DocumentDb Database</param>
/// <param name="authSecret">The secret key for the DocumentDb Database</param>
/// <param name="database">The name of the database to connect with</param>
/// <param name="options">The DocumentDbStorageOptions object to override any of the options</param>
/// <param name="collection">The name of the collection on the database</param>
/// <exception cref="ArgumentNullException"><paramref name="url"/> argument is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="authSecret"/> argument is null.</exception>
public DocumentDbStorage(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options) : this(Transform(url, authSecret, database, collection, options)) { }

/// <summary>
/// Initializes the DocumentDbStorage form the url auth secret provide.
/// </summary>
/// <param name="options">The DocumentDbStorageOptions object to override any of the options</param>
/// <exception cref="ArgumentNullException"><paramref name="options"/> argument is null.</exception>
private DocumentDbStorage(DocumentDbStorageOptions options)
public DocumentDbStorage(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options = null)
{
Options = options ?? throw new ArgumentNullException(nameof(options));
Options = options ?? new DocumentDbStorageOptions();
Options.DatabaseName = database;
Options.CollectionName = collection;

JsonSerializerSettings settings = new JsonSerializerSettings
{
Expand All @@ -70,8 +52,8 @@ private DocumentDbStorage(DocumentDbStorageOptions options)
};

ConnectionPolicy connectionPolicy = ConnectionPolicy.Default;
connectionPolicy.RequestTimeout = options.RequestTimeout;
Client = new DocumentClient(options.Endpoint, options.AuthSecret, settings, connectionPolicy);
connectionPolicy.RequestTimeout = Options.RequestTimeout;
Client = new DocumentClient(new Uri(url), authSecret, settings, connectionPolicy);
Task task = Client.OpenAsync();
Task continueTask = task.ContinueWith(t => Initialize(), TaskContinuationOptions.OnlyOnRanToCompletion);
continueTask.Wait();
Expand Down Expand Up @@ -111,12 +93,11 @@ public override IEnumerable<IServerComponent> GetComponents()
public override void WriteOptionsToLog(ILog logger)
{
logger.Info("Using the following options for Azure DocumentDB job storage:");
logger.Info($" DocumentDB Url: {Options.Endpoint.AbsoluteUri}");
logger.Info($" DocumentDB Url: {Client.ServiceEndpoint.AbsoluteUri}");
logger.Info($" Request Timeout: {Options.RequestTimeout}");
logger.Info($" Counter Agggerate Interval: {Options.CountersAggregateInterval.TotalSeconds} seconds");
logger.Info($" Queue Poll Interval: {Options.QueuePollInterval.TotalSeconds} seconds");
logger.Info($" Expiration Check Interval: {Options.ExpirationCheckInterval.TotalSeconds} seconds");
logger.Info($" Queue: {string.Join(",", Options.Queues)}");
}

/// <summary>
Expand Down Expand Up @@ -173,17 +154,5 @@ private void Initialize()
throw new ApplicationException("Unable to create the stored procedures", databaseTask.Exception);
}
}

private static DocumentDbStorageOptions Transform(string url, string authSecret, string database, string collection, DocumentDbStorageOptions options)
{
if (options == null) options = new DocumentDbStorageOptions();

options.Endpoint = new Uri(url);
options.AuthSecret = authSecret;
options.DatabaseName = database;
options.CollectionName = collection;

return options;
}
}
}
26 changes: 3 additions & 23 deletions Hangfire.AzureDocumentDB/DocumentDbStorageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,7 @@ namespace Hangfire
/// </summary>
public static class DocumentDbStorageExtensions
{
/// <summary>
/// Enables to attach Azure DocumentDb to Hangfire
/// </summary>
/// <param name="configuration">The IGlobalConfiguration object</param>
/// <param name="url">The url string to DocumentDb Database</param>
/// <param name="authSecret">The secret key for the DocumentDb Database</param>
/// <param name="database">The name of the database to connect with</param>
/// <param name="collection">The name of the collection on the database</param>
/// <returns></returns>
public static IGlobalConfiguration<DocumentDbStorage> UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url));
if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret));

DocumentDbStorage storage = new DocumentDbStorage(url, authSecret, database, collection);
return configuration.UseStorage(storage);
}

/// <summary>
/// <summary>
/// Enables to attach Azure DocumentDb to Hangfire
/// </summary>
/// <param name="configuration">The IGlobalConfiguration object</param>
Expand All @@ -38,13 +19,12 @@ public static IGlobalConfiguration<DocumentDbStorage> UseAzureDocumentDbStorage(
/// <param name="collection">The name of the collection on the database</param>
/// <param name="options">The DocumentDbStorage object to override any of the options</param>
/// <returns></returns>
public static IGlobalConfiguration<DocumentDbStorage> UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection, DocumentDbStorageOptions options)
public static IGlobalConfiguration<DocumentDbStorage> UseAzureDocumentDbStorage(this IGlobalConfiguration configuration, string url, string authSecret, string database, string collection, DocumentDbStorageOptions options = null)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url));
if (string.IsNullOrEmpty(authSecret)) throw new ArgumentNullException(nameof(authSecret));
if (options == null) throw new ArgumentNullException(nameof(options));


DocumentDbStorage storage = new DocumentDbStorage(url, authSecret, database, collection, options);
return configuration.UseStorage(storage);
}
Expand Down
8 changes: 0 additions & 8 deletions Hangfire.AzureDocumentDB/DocumentDbStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace Hangfire.Azure
/// </summary>
public class DocumentDbStorageOptions
{
internal Uri Endpoint { get; set; }
internal string AuthSecret { get; set; }
internal string DatabaseName { get; set; }
internal string CollectionName { get; set; }

Expand All @@ -19,11 +17,6 @@ public class DocumentDbStorageOptions
/// </summary>
public TimeSpan RequestTimeout { get; set; }

/// <summary>
/// Get or set list of queues to process. Default values "default", "critical"
/// </summary>
public string[] Queues { get; set; }

/// <summary>
/// Get or set the interval timespan to process expired enteries. Default value 15 minutes
/// Expired items under "locks", "jobs", "lists", "sets", "hashs", "counters/aggregrated" will be checked
Expand All @@ -46,7 +39,6 @@ public class DocumentDbStorageOptions
public DocumentDbStorageOptions()
{
RequestTimeout = TimeSpan.FromSeconds(30);
Queues = new[] { "default", "critical" };
ExpirationCheckInterval = TimeSpan.FromMinutes(5);
CountersAggregateInterval = TimeSpan.FromMinutes(1);
QueuePollInterval = TimeSpan.FromSeconds(2);
Expand Down
Loading

0 comments on commit cb444cc

Please sign in to comment.