From 84505548b1826eca3c3b0d21326e7bf4ffd489e1 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 24 Jan 2022 09:53:30 +0100 Subject: [PATCH 01/32] v2.1/bug/68 SentDate is not present Add SentDate on ConsumerMessage at CosumerEventHandler --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index fb35ecd..133e1c2 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -228,6 +228,9 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List() } }; From 5c4e20eb9908b5d2c9ba52e39d1e5ba1317ae567 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 7 Feb 2022 09:53:48 +0100 Subject: [PATCH 02/32] feature/71 Cache Messages to ConsumerPointer Update ConsumerEventHandler to enable caching before start consuming --- src/Storage.App/Program.cs | 6 +++-- .../XNodes/Handlers/ConsumerEventHandler.cs | 26 +++++++++++++++++-- src/Storage.IO/Services/ConsumerIOService.cs | 2 +- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/Storage.App/Program.cs b/src/Storage.App/Program.cs index 83fa45e..1182eda 100644 --- a/src/Storage.App/Program.cs +++ b/src/Storage.App/Program.cs @@ -25,9 +25,11 @@ public static void Main(string[] args) if (Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH") != null) Environment.SetEnvironmentVariable("ASPNETCORE_Kestrel__Certificates__Default__Path", Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH")); - if (Environment.GetEnvironmentVariable("ASPNETCORE_URLS") == "") - Environment.SetEnvironmentVariable("ASPNETCORE_URLS", "https://+:443;http://+:80"); + if (Environment.GetEnvironmentVariable("ANDYX_URLS") != null) + Environment.SetEnvironmentVariable("ASPNETCORE_URLS", Environment.GetEnvironmentVariable("ANDYX_URLS")); + else + Environment.SetEnvironmentVariable("ASPNETCORE_URLS", "https://+:443;http://+:80"); try { diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index fb35ecd..07aa390 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -207,15 +207,17 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows.Any(u => u.MessageId == r.MessageId)); } } - private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows) + private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows, DateTime partitionDate) { + CachePointers(obj, rows, partitionDate); + foreach (var row in rows) { var consumerMessage = new ConsumerMessage() @@ -240,6 +242,26 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows, DateTime partitionDate) + { + // Unacknowledge message, add to the pointer, and send + _logger.LogInformation($"Pointers are caching for {obj.ConsumerName}"); + for (int i = 0; i < rows.Count; i++) + { + _consumerIOService.WriteMessageAcknowledged(new Model.Events.Messages.MessageAcknowledgedArgs() + { + Tenant = obj.Tenant, + Product = obj.Product, + Component = obj.Component, + Topic = obj.Topic, + Consumer = obj.ConsumerName, + IsAcknowledged = false, + MessageId = rows[i].MessageId, + }, + partitionDate.ToString("yyyy-MM-dd")); + } + } + private List GetPartitionFiles(string tenant, string product, string component, string topic) { List messages = new List(); diff --git a/src/Storage.IO/Services/ConsumerIOService.cs b/src/Storage.IO/Services/ConsumerIOService.cs index 061969d..3c8e840 100644 --- a/src/Storage.IO/Services/ConsumerIOService.cs +++ b/src/Storage.IO/Services/ConsumerIOService.cs @@ -184,7 +184,7 @@ private string AddConsumerConnectorGetKey(string tenant, string product, string } // Acknowledgement of messages - public void WriteMessageAcknowledged(MessageAcknowledgedArgs message, string partitionFile = "none") + public void WriteMessageAcknowledged(MessageAcknowledgedArgs message, string partitionFile = "no-index") { string consumerKey = AddConsumerConnectorGetKey(message.Tenant, message.Product, message.Component, message.Topic, message.Consumer); From b9357fece7fecd1fcbfda73391bf8cd40ad56f35 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 7 Feb 2022 12:49:56 +0100 Subject: [PATCH 03/32] bugFix for unkacked messages --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index c6d9664..1fa9d26 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -207,16 +207,17 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows.Any(u => u.MessageId == r.MessageId)); } } - private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows, DateTime partitionDate) + private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows, DateTime partitionDate, bool isNewConsumer = false) { - CachePointers(obj, rows, partitionDate); + if (isNewConsumer == true) + CachePointers(obj, rows, partitionDate); foreach (var row in rows) { @@ -230,7 +231,7 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List() From b8ca3eba8ef7850579383436dd55b58d62e7380f Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 7 Feb 2022 22:11:01 +0100 Subject: [PATCH 04/32] v2.1/feature/73 TenantUpdatedDetails Add TenantUpdatedArgs Model; Update TenantEventHandler and TenantIOService --- .../Service/XNodes/Handlers/TenantEventHandler.cs | 3 ++- src/Storage.IO/Readers/TenantReader.cs | 7 +++++++ src/Storage.IO/Services/TenantIOService.cs | 12 +++++++++--- .../Events/Tenants/TenantUpdatedArgs.cs | 12 ++++++++++-- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs index 3f96f29..ef26f48 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs @@ -43,7 +43,8 @@ private void XNodeEventService_TenantCreated(Model.Events.Tenants.TenantCreatedA private void XNodeEventService_TenantUpdated(Model.Events.Tenants.TenantUpdatedArgs obj) { - // SKIPED for next release v2.1 + tenantIOService.TryUpdateTenantDirectory(obj.Name, new Model.App.Tenants.Tenant() { Id = obj.Id, Name = obj.Name, Settings = obj.Settings }); + logger.LogInformation($"Tenant '{obj.Name}' settings updated"); } private void XNodeEventService_ProductCreated(Model.Events.Products.ProductCreatedArgs obj) diff --git a/src/Storage.IO/Readers/TenantReader.cs b/src/Storage.IO/Readers/TenantReader.cs index 088a64b..5c79c2c 100644 --- a/src/Storage.IO/Readers/TenantReader.cs +++ b/src/Storage.IO/Readers/TenantReader.cs @@ -1,5 +1,6 @@ using Buildersoft.Andy.X.Storage.IO.Locations; using Buildersoft.Andy.X.Storage.Model.App.Consumers; +using Buildersoft.Andy.X.Storage.Model.App.Tenants; using Buildersoft.Andy.X.Storage.Model.App.Topics; using Buildersoft.Andy.X.Storage.Utility.Extensions.Json; using System.Collections.Generic; @@ -9,6 +10,12 @@ namespace Buildersoft.Andy.X.Storage.IO.Readers { public static class TenantReader { + public static Tenant ReadTenantConfigFile(string tenantName) + { + return File + .ReadAllText(TenantLocations.GetTenantConfigFile(tenantName)) + .JsonToObjectAndDecrypt(); + } public static Topic ReadTopicConfigFile(string tenant, string product, string component, string topic) { return File diff --git a/src/Storage.IO/Services/TenantIOService.cs b/src/Storage.IO/Services/TenantIOService.cs index bb65bb6..2deaa93 100644 --- a/src/Storage.IO/Services/TenantIOService.cs +++ b/src/Storage.IO/Services/TenantIOService.cs @@ -10,7 +10,7 @@ using System; using System.Collections.Concurrent; using System.IO; -using System.Threading; +using System.Threading.Tasks; namespace Buildersoft.Andy.X.Storage.IO.Services { @@ -36,7 +36,7 @@ private void InitializeTenantConfigFileProcessor() if (IsTenantConfigFilesWorking != true) { IsTenantConfigFilesWorking = true; - new Thread(() => TenantConfigiFileProcessor()).Start(); + new Task(() => TenantConfigiFileProcessor()).Start(); } } private void InitializeTenantLoggingProcessor() @@ -44,7 +44,7 @@ private void InitializeTenantLoggingProcessor() if (IsTenantLoggingWorking != true) { IsTenantLoggingWorking = true; - new Thread(() => TenantLoggingProcessor()).Start(); + new Task(() => TenantLoggingProcessor()).Start(); } } @@ -111,6 +111,12 @@ public bool TryCreateTenantDirectory(string tenantName, Tenant tenantDetails) } } + public bool TryUpdateTenantDirectory(string tenantName, Tenant tenantDetails) + { + Tenant TenantIOReader + return true; + } + public bool TryCreateProductDirectory(string tenant, Product product) { try diff --git a/src/Storage.Model/Events/Tenants/TenantUpdatedArgs.cs b/src/Storage.Model/Events/Tenants/TenantUpdatedArgs.cs index 40556b6..fec16cb 100644 --- a/src/Storage.Model/Events/Tenants/TenantUpdatedArgs.cs +++ b/src/Storage.Model/Events/Tenants/TenantUpdatedArgs.cs @@ -5,7 +5,15 @@ namespace Buildersoft.Andy.X.Storage.Model.Events.Tenants { public class TenantUpdatedArgs { - public Guid TenantId { get; set; } - public string TenantName { get; set; } + public Guid Id { get; set; } + public string Name { get; set; } + public string DigitalSignature { get; set; } + + public TenantSettings Settings { get; set; } + + public TenantUpdatedArgs() + { + Settings = new TenantSettings(); + } } } From f58189271f0a09d857f307264a7288f85876fa86 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Wed, 9 Feb 2022 12:24:56 +0100 Subject: [PATCH 05/32] v2.1/feature/41 Add Header to Records/Message Add Headers into Message Entity; Implement Headers across the storage. --- .../XNodes/Handlers/ConsumerEventHandler.cs | 4 ++-- .../XNodes/Handlers/MessageEventHandler.cs | 1 + src/Storage.IO/Services/MessageIOService.cs | 1 + src/Storage.IO/Services/TenantIOService.cs | 7 ++++++- src/Storage.Model/App/Messages/Message.cs | 3 +++ src/Storage.Model/App/Messages/MessageRow.cs | 19 ------------------- src/Storage.Model/Entities/Message.cs | 2 ++ .../Events/Messages/MessageStoredArgs.cs | 2 ++ 8 files changed, 17 insertions(+), 22 deletions(-) delete mode 100644 src/Storage.Model/App/Messages/MessageRow.cs diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 1fa9d26..2c58288 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -1,6 +1,5 @@ using Buildersoft.Andy.X.Storage.Core.Service.System; using Buildersoft.Andy.X.Storage.IO.Locations; -using Buildersoft.Andy.X.Storage.IO.Readers; using Buildersoft.Andy.X.Storage.IO.Services; using Buildersoft.Andy.X.Storage.Model.App.Consumers; using Buildersoft.Andy.X.Storage.Model.App.Messages; @@ -234,7 +233,8 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List() + MessageRaw = row.Payload.JsonToObject(), + Headers = row.Headers.JsonToObject>() } }; diff --git a/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs index e267064..464b2c2 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs @@ -37,6 +37,7 @@ private async void XNodeEventService_MessageStored(MessageStoredArgs obj) Id = obj.Id, Component = obj.Component, MessageRaw = obj.MessageRaw, + Headers = obj.Headers, Product = obj.Product, Topic = obj.Topic, SentDate = obj.SentDate diff --git a/src/Storage.IO/Services/MessageIOService.cs b/src/Storage.IO/Services/MessageIOService.cs index 4e6ff6a..81eb52a 100644 --- a/src/Storage.IO/Services/MessageIOService.cs +++ b/src/Storage.IO/Services/MessageIOService.cs @@ -66,6 +66,7 @@ public void StoreMessage(Message message) { MessageId = message.Id, Payload = message.MessageRaw.ToJsonAndEncrypt(), + Headers = message.Headers.ToJsonAndEncrypt(), SentDate = message.SentDate, StoredDate = DateTime.Now }); diff --git a/src/Storage.IO/Services/TenantIOService.cs b/src/Storage.IO/Services/TenantIOService.cs index 2deaa93..0e2c395 100644 --- a/src/Storage.IO/Services/TenantIOService.cs +++ b/src/Storage.IO/Services/TenantIOService.cs @@ -113,7 +113,12 @@ public bool TryCreateTenantDirectory(string tenantName, Tenant tenantDetails) public bool TryUpdateTenantDirectory(string tenantName, Tenant tenantDetails) { - Tenant TenantIOReader + if (Directory.Exists(TenantLocations.GetTenantDirectory(tenantName)) == true) + { + // Update the file from the Server + tenantConfigFilesQueue.Enqueue(tenantDetails); + InitializeTenantConfigFileProcessor(); + } return true; } diff --git a/src/Storage.Model/App/Messages/Message.cs b/src/Storage.Model/App/Messages/Message.cs index 266584b..0622617 100644 --- a/src/Storage.Model/App/Messages/Message.cs +++ b/src/Storage.Model/App/Messages/Message.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace Buildersoft.Andy.X.Storage.Model.App.Messages { @@ -11,6 +12,8 @@ public class Message public Guid Id { get; set; } public object MessageRaw { get; set; } + public Dictionary Headers { get; set; } + public DateTime SentDate { get; set; } } } diff --git a/src/Storage.Model/App/Messages/MessageRow.cs b/src/Storage.Model/App/Messages/MessageRow.cs deleted file mode 100644 index ded8ca3..0000000 --- a/src/Storage.Model/App/Messages/MessageRow.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Buildersoft.Andy.X.Storage.Model.App.Messages -{ - public class MessageRow - { - public Guid Id { get; set; } - public object MessageRaw { get; set; } - public DateTime StoredDate { get; set; } - public MessageRow() - { - StoredDate = DateTime.Now; - } - } -} diff --git a/src/Storage.Model/Entities/Message.cs b/src/Storage.Model/Entities/Message.cs index ce920c3..b86652f 100644 --- a/src/Storage.Model/Entities/Message.cs +++ b/src/Storage.Model/Entities/Message.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.ComponentModel.DataAnnotations; namespace Buildersoft.Andy.X.Storage.Model.Entities @@ -8,6 +9,7 @@ public class Message [Key] public Guid MessageId { get; set; } public string Payload { get; set; } + public string Headers { get; set; } public DateTime StoredDate { get; set; } public DateTime SentDate { get; set; } } diff --git a/src/Storage.Model/Events/Messages/MessageStoredArgs.cs b/src/Storage.Model/Events/Messages/MessageStoredArgs.cs index 02933aa..2224371 100644 --- a/src/Storage.Model/Events/Messages/MessageStoredArgs.cs +++ b/src/Storage.Model/Events/Messages/MessageStoredArgs.cs @@ -13,6 +13,8 @@ public class MessageStoredArgs public Guid Id { get; set; } public object MessageRaw { get; set; } + public Dictionary Headers { get; set; } + public DateTime SentDate { get; set; } } } From 5c0e089ce085cb268dbba8b4d019e6f658ba29be Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 11 Feb 2022 10:39:32 +0100 Subject: [PATCH 06/32] v2.1/feature/69 Add Implement Retention Period for the messages Add ComponentRetention --- .../App/Components/ComponentRetention.cs | 14 ++++++++++++++ .../App/Components/ComponentSettings.cs | 2 ++ 2 files changed, 16 insertions(+) create mode 100644 src/Storage.Model/App/Components/ComponentRetention.cs diff --git a/src/Storage.Model/App/Components/ComponentRetention.cs b/src/Storage.Model/App/Components/ComponentRetention.cs new file mode 100644 index 0000000..1613cf2 --- /dev/null +++ b/src/Storage.Model/App/Components/ComponentRetention.cs @@ -0,0 +1,14 @@ +namespace Buildersoft.Andy.X.Storage.Model.App.Components +{ + public class ComponentRetention + { + public string Name { get; set; } + public long RetentionTimeInMinutes { get; set; } + + public ComponentRetention() + { + Name = "default"; + RetentionTimeInMinutes = -1; + } + } +} diff --git a/src/Storage.Model/App/Components/ComponentSettings.cs b/src/Storage.Model/App/Components/ComponentSettings.cs index 3aa3130..b9157f0 100644 --- a/src/Storage.Model/App/Components/ComponentSettings.cs +++ b/src/Storage.Model/App/Components/ComponentSettings.cs @@ -9,6 +9,7 @@ public class ComponentSettings public bool EnableAuthorization { get; set; } public List Tokens { get; set; } + public ComponentRetention RetentionPolicy { get; set; } public ComponentSettings() @@ -18,6 +19,7 @@ public ComponentSettings() EnableAuthorization = false; Tokens = new List(); + RetentionPolicy = new ComponentRetention(); } } } From 223951985001ec79372906929c0c4c7fd7662dcc Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 11 Feb 2022 17:22:04 +0100 Subject: [PATCH 07/32] v2.1/feature/77 Synchronize Tokens between Nodes in Andy X Cluster Add Events for ComponentToken and TenantToken; Implement TenantEventHandler and XNodeEventService; --- .../XNodes/Handlers/TenantEventHandler.cs | 67 +++++++++++++++++++ .../Service/XNodes/XNodeEventService.cs | 40 ++++++++++- .../Components/ComponentTokenCreatedArgs.cs | 22 ++++++ .../Components/ComponentTokenRevokedArgs.cs | 19 ++++++ .../Events/Tenants/TenantTokenCreatedArgs.cs | 19 ++++++ .../Events/Tenants/TenantTokenRevokedArgs.cs | 18 +++++ 6 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 src/Storage.Model/Events/Components/ComponentTokenCreatedArgs.cs create mode 100644 src/Storage.Model/Events/Components/ComponentTokenRevokedArgs.cs create mode 100644 src/Storage.Model/Events/Tenants/TenantTokenCreatedArgs.cs create mode 100644 src/Storage.Model/Events/Tenants/TenantTokenRevokedArgs.cs diff --git a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs index ef26f48..fb32aef 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs @@ -1,6 +1,8 @@ using Buildersoft.Andy.X.Storage.Core.Service.System; using Buildersoft.Andy.X.Storage.IO.Services; +using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; +using System.Linq; namespace Buildersoft.Andy.X.Storage.Core.Service.XNodes.Handlers { @@ -24,12 +26,18 @@ private void InitializeEvents() xNodeEventService.TenantCreated += XNodeEventService_TenantCreated; xNodeEventService.TenantUpdated += XNodeEventService_TenantUpdated; + xNodeEventService.TenantTokenCreated += XNodeEventService_TenantTokenCreated; + xNodeEventService.TenantTokenRevoked += XNodeEventService_TenantTokenRevoked; + xNodeEventService.ProductCreated += XNodeEventService_ProductCreated; xNodeEventService.ProductUpdated += XNodeEventService_ProductUpdated; xNodeEventService.ComponentCreated += XNodeEventService_ComponentCreated; xNodeEventService.ComponentUpdated += XNodeEventService_ComponentUpdated; + xNodeEventService.ComponentTokenCreated += XNodeEventService_ComponentTokenCreated; + xNodeEventService.ComponentTokenRevoked += XNodeEventService_ComponentTokenRevoked; + xNodeEventService.TopicCreated += XNodeEventService_TopicCreated; xNodeEventService.TopicUpdated += XNodeEventService_TopicUpdated; @@ -47,6 +55,36 @@ private void XNodeEventService_TenantUpdated(Model.Events.Tenants.TenantUpdatedA logger.LogInformation($"Tenant '{obj.Name}' settings updated"); } + private async void XNodeEventService_TenantTokenCreated(Model.Events.Tenants.TenantTokenCreatedArgs obj) + { + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // This node should be ignored because, it already produces CreateTenantToken. + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + // Transmit the CreateTenantToken to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("CreateTenantToken", obj); + } + } + + logger.LogInformation($"TenantToken created for '{obj.Tenant}', settings updated"); + } + + private async void XNodeEventService_TenantTokenRevoked(Model.Events.Tenants.TenantTokenRevokedArgs obj) + { + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // This node should be ignored because, it already produces RevokeTenantToken. + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + // Transmit the RevokeTenantToken to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("RevokeTenantToken", obj); + } + } + + logger.LogInformation($"TenantToken revoked for '{obj.Tenant}', settings updated"); + } + private void XNodeEventService_ProductCreated(Model.Events.Products.ProductCreatedArgs obj) { tenantIOService.TryCreateProductDirectory(obj.Tenant, new Model.App.Products.Product() { Id = obj.Id, Name = obj.Name }); @@ -69,6 +107,35 @@ private void XNodeEventService_ComponentUpdated(Model.Events.Components.Componen tenantIOService.TryCreateComponentDirectory(obj.Tenant, obj.Product, new Model.App.Components.Component() { Id = obj.Id, Name = obj.Name, Settings = obj.Settings }); } + private async void XNodeEventService_ComponentTokenCreated(Model.Events.Components.ComponentTokenCreatedArgs obj) + { + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // This node should be ignored because, it already produces CreateComponentToken. + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + // Transmit the CreateComponentToken to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("CreateComponentToken", obj); + } + } + + logger.LogInformation($"ComponentToken created for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + } + private async void XNodeEventService_ComponentTokenRevoked(Model.Events.Components.ComponentTokenRevokedArgs obj) + { + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // This node should be ignored because, it already produces RevokeComponentToken + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + // Transmit the RevokeComponentToken to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("RevokeComponentToken", obj); + } + } + + logger.LogInformation($"ComponentToken revoked for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + } + private void XNodeEventService_TopicCreated(Model.Events.Topics.TopicCreatedArgs obj) { tenantIOService.TryCreateTopicDirectory(obj.Tenant, obj.Product, obj.Component, new Model.App.Topics.Topic() { Id = obj.Id, Name = obj.Name, Settings = obj.Settings }); diff --git a/src/Storage.Core/Service/XNodes/XNodeEventService.cs b/src/Storage.Core/Service/XNodes/XNodeEventService.cs index 8bc01b0..90e3e27 100644 --- a/src/Storage.Core/Service/XNodes/XNodeEventService.cs +++ b/src/Storage.Core/Service/XNodes/XNodeEventService.cs @@ -16,6 +16,7 @@ using Microsoft.Extensions.Logging; using System; using System.Threading; +using System.Threading.Tasks; namespace Buildersoft.Andy.X.Storage.Core.Service.XNodes { @@ -35,12 +36,18 @@ public class XNodeEventService public event Action TenantCreated; public event Action TenantUpdated; + public event Action TenantTokenCreated; + public event Action TenantTokenRevoked; + public event Action ProductCreated; public event Action ProductUpdated; public event Action ComponentCreated; public event Action ComponentUpdated; + public event Action ComponentTokenCreated; + public event Action ComponentTokenRevoked; + public event Action TopicCreated; public event Action TopicUpdated; @@ -87,18 +94,28 @@ public XNodeEventService(ILogger logger, var provider = new XNodeConnectionProvider(nodeConfig, dataStorageConfig, agentConfiguration, agentId); _connection = provider.GetHubConnection(); + _connection.Closed += _connection_Closed; + _connection.Reconnected += _connection_Reconnected; + _connection.Reconnecting += _connection_Reconnecting; + _connection.On("StorageConnected", connectedArgs => StorageConnected?.Invoke(connectedArgs)); _connection.On("StorageDisconnected", disconnectedArgs => StorageDisconnected?.Invoke(disconnectedArgs)); _connection.On("TenantCreated", tenantCreated => TenantCreated?.Invoke(tenantCreated)); _connection.On("TenantUpdated", tenantUpdated => TenantUpdated?.Invoke(tenantUpdated)); + _connection.On("TenantTokenCreated", tenantTokenCreated => TenantTokenCreated?.Invoke(tenantTokenCreated)); + _connection.On("TenantTokenRevoked", tenantTokenRevoked => TenantTokenRevoked?.Invoke(tenantTokenRevoked)); + _connection.On("ProductCreated", productCreated => ProductCreated?.Invoke(productCreated)); _connection.On("ProductUpdated", productUpdated => ProductUpdated?.Invoke(productUpdated)); _connection.On("ComponentCreated", componentCreated => ComponentCreated?.Invoke(componentCreated)); _connection.On("ComponentUpdated", componentUpdated => ComponentUpdated?.Invoke(componentUpdated)); + _connection.On("ComponentTokenCreated", componentTokenCreated => ComponentTokenCreated?.Invoke(componentTokenCreated)); + _connection.On("ComponentTokenRevoked", componentTokenRevoked => ComponentTokenRevoked?.Invoke(componentTokenRevoked)); + _connection.On("TopicCreated", topicCreated => TopicCreated?.Invoke(topicCreated)); _connection.On("TopicUpdated", topicUpdated => TopicUpdated?.Invoke(topicUpdated)); @@ -119,6 +136,24 @@ public XNodeEventService(ILogger logger, xNodeConnectionRepository.AddService(nodeConfig.ServiceUrl, agentId, this); } + private Task _connection_Closed(Exception arg) + { + logger.LogError($"Agent connection is closed, details {arg.Message}"); + return Task.CompletedTask; + } + + private Task _connection_Reconnected(string arg) + { + logger.LogInformation($"Agent with id {Guid.NewGuid()} is connected"); + return Task.CompletedTask; + } + + private Task _connection_Reconnecting(Exception arg) + { + logger.LogWarning($"Agent connection is lost, agent is reconnecting to node, details {arg.Message}"); + return Task.CompletedTask; + } + private void InitializeEventHandlers() { agentEventHandler = new AgentEventHandler(logger, this, tenantIOService); @@ -148,7 +183,10 @@ await _connection.StartAsync().ContinueWith(task => { if (task.Exception != null) { - logger.LogError($"Error occurred during connection. Details: {task.Exception.Message}, {string.Join(",", task.Exception.InnerExceptions)}"); + // Details is not show for now... + //logger.LogError($"Error occurred during connection. Details: {task.Exception.Message}, {string.Join(",", task.Exception.InnerExceptions)}"); + + logger.LogError($"Error occurred during connection. Details: {task.Exception.Message}"); // retry connection Thread.Sleep(3000); diff --git a/src/Storage.Model/Events/Components/ComponentTokenCreatedArgs.cs b/src/Storage.Model/Events/Components/ComponentTokenCreatedArgs.cs new file mode 100644 index 0000000..a74a67d --- /dev/null +++ b/src/Storage.Model/Events/Components/ComponentTokenCreatedArgs.cs @@ -0,0 +1,22 @@ +using Buildersoft.Andy.X.Storage.Model.App.Components; +using System.Collections.Generic; + +namespace Buildersoft.Andy.X.Storage.Model.Events.Components +{ + public class ComponentTokenCreatedArgs + { + public string Tenant { get; set; } + public string Product { get; set; } + public string Component { get; set; } + + public ComponentToken Token { get; set; } + public List StoragesAlreadySent { get; set; } + + + public ComponentTokenCreatedArgs() + { + Token = new ComponentToken(); + StoragesAlreadySent = new List(); + } + } +} diff --git a/src/Storage.Model/Events/Components/ComponentTokenRevokedArgs.cs b/src/Storage.Model/Events/Components/ComponentTokenRevokedArgs.cs new file mode 100644 index 0000000..b27d201 --- /dev/null +++ b/src/Storage.Model/Events/Components/ComponentTokenRevokedArgs.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace Buildersoft.Andy.X.Storage.Model.Events.Components +{ + public class ComponentTokenRevokedArgs + { + public string Tenant { get; set; } + public string Product { get; set; } + public string Component { get; set; } + + public string Token { get; set; } + public List StoragesAlreadySent { get; set; } + + public ComponentTokenRevokedArgs() + { + StoragesAlreadySent = new List(); + } + } +} diff --git a/src/Storage.Model/Events/Tenants/TenantTokenCreatedArgs.cs b/src/Storage.Model/Events/Tenants/TenantTokenCreatedArgs.cs new file mode 100644 index 0000000..04d4ffb --- /dev/null +++ b/src/Storage.Model/Events/Tenants/TenantTokenCreatedArgs.cs @@ -0,0 +1,19 @@ +using Buildersoft.Andy.X.Storage.Model.App.Tenants; +using System.Collections.Generic; + +namespace Buildersoft.Andy.X.Storage.Model.Events.Tenants +{ + public class TenantTokenCreatedArgs + { + public string Tenant { get; set; } + public TenantToken Token { get; set; } + + public List StoragesAlreadySent { get; set; } + + public TenantTokenCreatedArgs() + { + Token = new TenantToken(); + StoragesAlreadySent = new List(); + } + } +} diff --git a/src/Storage.Model/Events/Tenants/TenantTokenRevokedArgs.cs b/src/Storage.Model/Events/Tenants/TenantTokenRevokedArgs.cs new file mode 100644 index 0000000..79eefa5 --- /dev/null +++ b/src/Storage.Model/Events/Tenants/TenantTokenRevokedArgs.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; + +namespace Buildersoft.Andy.X.Storage.Model.Events.Tenants +{ + public class TenantTokenRevokedArgs + { + public string Tenant { get; set; } + public string Token { get; set; } + + public List StoragesAlreadySent { get; set; } + + + public TenantTokenRevokedArgs() + { + StoragesAlreadySent = new List(); + } + } +} From 6278fd18c1dc116726e8789e92c2e89e6aa3dd2b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 11 Feb 2022 17:25:10 +0100 Subject: [PATCH 08/32] Update version to preview --- src/Storage.IO/Services/SystemIOService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storage.IO/Services/SystemIOService.cs b/src/Storage.IO/Services/SystemIOService.cs index 3fbd2f2..0956ddb 100644 --- a/src/Storage.IO/Services/SystemIOService.cs +++ b/src/Storage.IO/Services/SystemIOService.cs @@ -17,7 +17,7 @@ public SystemIOService(ILogger logger) Console.Write(" ###"); Console.ForegroundColor = generalColor; Console.WriteLine(" ###"); Console.ForegroundColor = ConsoleColor.Red; Console.Write(" ###"); Console.ForegroundColor = generalColor; Console.Write(" ###"); - Console.WriteLine(" Andy X Storage 2.0.0. Copyright (C) 2022 Buildersoft LLC"); + Console.WriteLine(" Andy X Storage 2.1.0-preview. Copyright (C) 2022 Buildersoft LLC"); Console.ForegroundColor = ConsoleColor.Red; Console.Write(" #### "); Console.ForegroundColor = generalColor; Console.WriteLine("Licensed under the Apache License 2.0. See https://bit.ly/3DqVQbx"); Console.ForegroundColor = ConsoleColor.Red; From cc5caeae3cf0a5d7d76eb4f0e981b5889dc511c6 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sat, 12 Feb 2022 00:21:03 +0100 Subject: [PATCH 09/32] Remove ANDYX_URLS and ASPNETCORE_URLS --- src/Storage.App/Program.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storage.App/Program.cs b/src/Storage.App/Program.cs index 1182eda..8d7978d 100644 --- a/src/Storage.App/Program.cs +++ b/src/Storage.App/Program.cs @@ -25,11 +25,11 @@ public static void Main(string[] args) if (Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH") != null) Environment.SetEnvironmentVariable("ASPNETCORE_Kestrel__Certificates__Default__Path", Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH")); - - if (Environment.GetEnvironmentVariable("ANDYX_URLS") != null) - Environment.SetEnvironmentVariable("ASPNETCORE_URLS", Environment.GetEnvironmentVariable("ANDYX_URLS")); - else - Environment.SetEnvironmentVariable("ASPNETCORE_URLS", "https://+:443;http://+:80"); + // Commented this for know; found BUG, can not run on docker, is asking for certificate. + //if (Environment.GetEnvironmentVariable("ANDYX_URLS") != null) + // Environment.SetEnvironmentVariable("ASPNETCORE_URLS", Environment.GetEnvironmentVariable("ANDYX_URLS")); + //else + // Environment.SetEnvironmentVariable("ASPNETCORE_URLS", "https://+:443;http://+:80"); try { From 287a347d33af974d8cb2513efd393dd2049e652a Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sat, 12 Feb 2022 20:59:39 +0100 Subject: [PATCH 10/32] v2.1.feature/no_ticket Add create tenant General bug fixes --- src/Storage.App/Dockerfile | 4 +- .../XNodes/Handlers/TenantEventHandler.cs | 43 +++++++++++---- src/Storage.IO/Locations/TenantLocations.cs | 10 ++++ src/Storage.IO/Readers/TenantReader.cs | 4 ++ src/Storage.IO/Services/TenantIOService.cs | 52 +++++++++++++++++++ src/Storage.IO/Writers/TenantWriter.cs | 34 ++++++++++++ .../Events/Tenants/TenantCreatedArgs.cs | 5 +- 7 files changed, 140 insertions(+), 12 deletions(-) diff --git a/src/Storage.App/Dockerfile b/src/Storage.App/Dockerfile index d203700..89ab760 100644 --- a/src/Storage.App/Dockerfile +++ b/src/Storage.App/Dockerfile @@ -2,8 +2,8 @@ FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base WORKDIR /app -EXPOSE 80 -EXPOSE 443 +#EXPOSE 80 +#EXPOSE 443 FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build WORKDIR /src diff --git a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs index fb32aef..19f0117 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/TenantEventHandler.cs @@ -43,10 +43,22 @@ private void InitializeEvents() } - private void XNodeEventService_TenantCreated(Model.Events.Tenants.TenantCreatedArgs obj) + private async void XNodeEventService_TenantCreated(Model.Events.Tenants.TenantCreatedArgs obj) { tenantIOService.TryCreateTenantDirectory(obj.Name, new Model.App.Tenants.Tenant() { Id = obj.Id, Name = obj.Name, Settings = obj.Settings }); logger.LogInformation($"Tenant '{obj.Name}' properties created"); + + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // This node should be ignored because, it already produces CreateTenant. + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + // Transmit the CreateComponentToken to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("CreateTenant", obj); + } + } + logger.LogInformation($"Informing other nodes for tenant '{obj.Name}' creation"); + } private void XNodeEventService_TenantUpdated(Model.Events.Tenants.TenantUpdatedArgs obj) @@ -57,6 +69,10 @@ private void XNodeEventService_TenantUpdated(Model.Events.Tenants.TenantUpdatedA private async void XNodeEventService_TenantTokenCreated(Model.Events.Tenants.TenantTokenCreatedArgs obj) { + // store tenantToken data + tenantIOService.CreateTenantTokenFile(obj.Tenant, obj.Token); + logger.LogInformation($"TenantToken created for '{obj.Tenant}', settings updated"); + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) { // This node should be ignored because, it already produces CreateTenantToken. @@ -66,12 +82,15 @@ private async void XNodeEventService_TenantTokenCreated(Model.Events.Tenants.Ten await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("CreateTenantToken", obj); } } - - logger.LogInformation($"TenantToken created for '{obj.Tenant}', settings updated"); + logger.LogInformation($"Informing other nodes for TenantToken creation"); } private async void XNodeEventService_TenantTokenRevoked(Model.Events.Tenants.TenantTokenRevokedArgs obj) { + // remove from the storage + tenantIOService.DeleteTenantTokenFile(obj.Tenant, obj.Token); + logger.LogInformation($"TenantToken revoked for '{obj.Tenant}', settings updated"); + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) { // This node should be ignored because, it already produces RevokeTenantToken. @@ -81,8 +100,7 @@ private async void XNodeEventService_TenantTokenRevoked(Model.Events.Tenants.Ten await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("RevokeTenantToken", obj); } } - - logger.LogInformation($"TenantToken revoked for '{obj.Tenant}', settings updated"); + logger.LogInformation($"Informing other nodes for TenantToken revocation"); } private void XNodeEventService_ProductCreated(Model.Events.Products.ProductCreatedArgs obj) @@ -109,6 +127,10 @@ private void XNodeEventService_ComponentUpdated(Model.Events.Components.Componen private async void XNodeEventService_ComponentTokenCreated(Model.Events.Components.ComponentTokenCreatedArgs obj) { + // store componentToken data + tenantIOService.CreateComponentTokenFile(obj.Tenant, obj.Product, obj.Component, obj.Token); + logger.LogInformation($"ComponentToken created for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) { // This node should be ignored because, it already produces CreateComponentToken. @@ -118,11 +140,15 @@ private async void XNodeEventService_ComponentTokenCreated(Model.Events.Componen await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("CreateComponentToken", obj); } } - - logger.LogInformation($"ComponentToken created for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + logger.LogInformation($"Informing other nodes for ComponentToken creation"); } + private async void XNodeEventService_ComponentTokenRevoked(Model.Events.Components.ComponentTokenRevokedArgs obj) { + // remove from the storage + tenantIOService.DeleteComponentTokenFile(obj.Tenant, obj.Product, obj.Component, obj.Token); + logger.LogInformation($"ComponentToken revoked for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) { // This node should be ignored because, it already produces RevokeComponentToken @@ -132,8 +158,7 @@ private async void XNodeEventService_ComponentTokenRevoked(Model.Events.Componen await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("RevokeComponentToken", obj); } } - - logger.LogInformation($"ComponentToken revoked for '{obj.Tenant}/{obj.Product}/{obj.Component}', settings updated"); + logger.LogInformation($"Informing other nodes for ComponentToken revocation"); } private void XNodeEventService_TopicCreated(Model.Events.Topics.TopicCreatedArgs obj) diff --git a/src/Storage.IO/Locations/TenantLocations.cs b/src/Storage.IO/Locations/TenantLocations.cs index da0ab1e..87c42a5 100644 --- a/src/Storage.IO/Locations/TenantLocations.cs +++ b/src/Storage.IO/Locations/TenantLocations.cs @@ -15,6 +15,11 @@ public static string GetTenantLogsRootDirectory(string tenantName) return Path.Combine(GetTenantDirectory(tenantName), "logs"); } + public static string GetTenantTokensDirectory(string tenantName) + { + return Path.Combine(GetTenantDirectory(tenantName), "tokens"); + } + public static string GetProductRootDirectory(string tenantName) { return Path.Combine(GetTenantDirectory(tenantName)); @@ -35,6 +40,11 @@ public static string GetComponentDirectory(string tenantName, string productName return Path.Combine(GetComponentRootDirectory(tenantName, productName), componentName); } + public static string GetComponentTokenDirectory(string tenantName, string productName, string componentName) + { + return Path.Combine(GetComponentDirectory(tenantName, productName, componentName), "tokens"); + } + public static string GetTopicRootDirectory(string tenantName, string productName, string componentName) { return Path.Combine(GetComponentDirectory(tenantName, productName, componentName)); diff --git a/src/Storage.IO/Readers/TenantReader.cs b/src/Storage.IO/Readers/TenantReader.cs index 5c79c2c..a1166bc 100644 --- a/src/Storage.IO/Readers/TenantReader.cs +++ b/src/Storage.IO/Readers/TenantReader.cs @@ -55,9 +55,13 @@ public static List ReadAllConsumers() string component = Path.GetFileName(componentLocation); string[] topics = Directory.GetDirectories(TenantLocations.GetTopicRootDirectory(tenant, product, component)); + foreach (var topicLocation in topics) { string topic = Path.GetFileName(topicLocation); + if (topic == "tokens") + continue; + string[] consumers = Directory.GetDirectories(TenantLocations.GetConsumerRootDirectory(tenant, product, component, topic)); foreach (var consumerLocation in consumers) { diff --git a/src/Storage.IO/Services/TenantIOService.cs b/src/Storage.IO/Services/TenantIOService.cs index 0e2c395..61f0330 100644 --- a/src/Storage.IO/Services/TenantIOService.cs +++ b/src/Storage.IO/Services/TenantIOService.cs @@ -6,6 +6,7 @@ using Buildersoft.Andy.X.Storage.Model.App.Tenants; using Buildersoft.Andy.X.Storage.Model.App.Topics; using Buildersoft.Andy.X.Storage.Model.Logs; +using Buildersoft.Andy.X.Storage.Utility.Extensions.Json; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -99,6 +100,7 @@ public bool TryCreateTenantDirectory(string tenantName, Tenant tenantDetails) Directory.CreateDirectory(TenantLocations.GetTenantDirectory(tenantName)); Directory.CreateDirectory(TenantLocations.GetProductRootDirectory(tenantName)); Directory.CreateDirectory(TenantLocations.GetTenantLogsRootDirectory(tenantName)); + Directory.CreateDirectory(TenantLocations.GetTenantTokensDirectory(tenantName)); tenantConfigFilesQueue.Enqueue(tenantDetails); InitializeTenantConfigFileProcessor(); @@ -150,6 +152,7 @@ public bool TryCreateComponentDirectory(string tenant, string product, Component { Directory.CreateDirectory(TenantLocations.GetComponentDirectory(tenant, product, component.Name)); Directory.CreateDirectory(TenantLocations.GetTopicRootDirectory(tenant, product, component.Name)); + Directory.CreateDirectory(TenantLocations.GetComponentTokenDirectory(tenant, product, component.Name)); // Because this call is triggered by XNode in only in an agent in storage, it doesn't need to go thru a queue. TenantWriter.WriteComponentConfigFile(tenant, product, component); @@ -236,5 +239,54 @@ public void WriteAgentStateInTenantLog(string tenantName, string agentId, string InitializeTenantLoggingProcessor(); } + + + public bool CreateTenantTokenFile(string tenant, TenantToken tenantToken) + { + string fileName = $"tkn_{Guid.NewGuid()}.xandy"; + return TenantWriter.WriteTenantTokenFile(fileName, tenant, tenantToken); + } + + public bool DeleteTenantTokenFile(string tenant, string token) + { + var tokenFiles = Directory.GetFiles(TenantLocations.GetTenantTokensDirectory(tenant)); + foreach (var tokenFile in tokenFiles) + { + var tokenDetails = File.ReadAllText(tokenFile).JsonToObject(); + if (tokenDetails != null) + { + if (tokenDetails.Token == token) + { + File.Delete(tokenFile); + break; + } + } + } + return true; + } + + public bool CreateComponentTokenFile(string tenant, string product, string component, ComponentToken componentToken) + { + string fileName = $"tkn_{Guid.NewGuid()}.xandy"; + return TenantWriter.WriteComponentTokenFile(tenant, product, component, fileName, componentToken); + } + + public bool DeleteComponentTokenFile(string tenant, string product, string component,string token) + { + var tokenFiles = Directory.GetFiles(TenantLocations.GetComponentTokenDirectory(tenant, product, component)); + foreach (var tokenFile in tokenFiles) + { + var tokenDetails = File.ReadAllText(tokenFile).JsonToObject(); + if (tokenDetails != null) + { + if (tokenDetails.Token == token) + { + File.Delete(tokenFile); + break; + } + } + } + return true; + } } } diff --git a/src/Storage.IO/Writers/TenantWriter.cs b/src/Storage.IO/Writers/TenantWriter.cs index 941ce68..6b41b85 100644 --- a/src/Storage.IO/Writers/TenantWriter.cs +++ b/src/Storage.IO/Writers/TenantWriter.cs @@ -86,5 +86,39 @@ public async static void WriteInTenantLog(string tenantName, string rowLog) // TODO: handle this exception } } + + public static bool WriteTenantTokenFile(string fileName, string tenant, TenantToken tenantToken) + { + try + { + + string tenantTokenLocation = Path.Combine(TenantLocations.GetTenantTokensDirectory(tenant), fileName); + + File.WriteAllText(tenantTokenLocation, tenantToken.ToPrettyJson()); + + return true; + } + catch (Exception) + { + return false; + } + } + + public static bool WriteComponentTokenFile(string tenant, string product, string component, string fileName, ComponentToken componentToken) + { + try + { + + string componentTokenLocation = Path.Combine(TenantLocations.GetComponentTokenDirectory(tenant, product, component), fileName); + + File.WriteAllText(componentTokenLocation, componentToken.ToPrettyJson()); + + return true; + } + catch (Exception) + { + return false; + } + } } } diff --git a/src/Storage.Model/Events/Tenants/TenantCreatedArgs.cs b/src/Storage.Model/Events/Tenants/TenantCreatedArgs.cs index 0532303..32996f6 100644 --- a/src/Storage.Model/Events/Tenants/TenantCreatedArgs.cs +++ b/src/Storage.Model/Events/Tenants/TenantCreatedArgs.cs @@ -1,5 +1,6 @@ using Buildersoft.Andy.X.Storage.Model.App.Tenants; using System; +using System.Collections.Generic; namespace Buildersoft.Andy.X.Storage.Model.Events.Tenants { @@ -7,13 +8,15 @@ public class TenantCreatedArgs { public Guid Id { get; set; } public string Name { get; set; } - public string DigitalSignature { get; set; } public TenantSettings Settings { get; set; } + public List StoragesAlreadySent { get; set; } + public TenantCreatedArgs() { Settings = new TenantSettings(); + StoragesAlreadySent = new List(); } } } From ea61f78b715381b10f7c806cc1e356040f4b81c9 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sat, 12 Feb 2022 22:16:08 +0100 Subject: [PATCH 11/32] bugfix: error on preventing reading messges --- src/Storage.Model/App/Consumers/ConsumerMessage.cs | 3 ++- src/Storage.Model/App/Messages/Message.cs | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Storage.Model/App/Consumers/ConsumerMessage.cs b/src/Storage.Model/App/Consumers/ConsumerMessage.cs index f1ac7b4..c142cbc 100644 --- a/src/Storage.Model/App/Consumers/ConsumerMessage.cs +++ b/src/Storage.Model/App/Consumers/ConsumerMessage.cs @@ -4,8 +4,9 @@ namespace Buildersoft.Andy.X.Storage.Model.App.Consumers { public class ConsumerMessage { - public Message Message { get; set; } public string Consumer { get; set; } + + public Message Message { get; set; } public ConsumerMessage() { Message = new Message(); diff --git a/src/Storage.Model/App/Messages/Message.cs b/src/Storage.Model/App/Messages/Message.cs index 0622617..98e4bbd 100644 --- a/src/Storage.Model/App/Messages/Message.cs +++ b/src/Storage.Model/App/Messages/Message.cs @@ -15,5 +15,10 @@ public class Message public Dictionary Headers { get; set; } public DateTime SentDate { get; set; } + + public Message() + { + Headers = new Dictionary(); + } } } From d535b2570ea2ed51c8ed20d002cce921ddfbfbc5 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 15 Feb 2022 00:01:48 +0100 Subject: [PATCH 12/32] v2.1/bug/67 Add SubscriptionType for ConsumerDisconnectArgs --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 6 ++++-- .../Events/Consumers/ConsumerDisconnectedArgs.cs | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 2c58288..e2ff64b 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -76,11 +76,13 @@ private void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedArgs obj Product = obj.Product, Component = obj.Component, Topic = obj.Topic, - CreatedDate = DateTime.Now + CreatedDate = DateTime.Now, + SubscriptionType = obj.SubscriptionType, }); string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); - ReleaseUnacknoledgedMessageTasks(consumerKey); + if (obj.SubscriptionType != SubscriptionType.Shared) + ReleaseUnacknoledgedMessageTasks(consumerKey); } private void XNodeEventService_MessageAcknowledged(Model.Events.Messages.MessageAcknowledgedArgs obj) diff --git a/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs b/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs index b528a30..c3c6370 100644 --- a/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs +++ b/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs @@ -11,5 +11,6 @@ public class ConsumerDisconnectedArgs public Guid Id { get; set; } public string ConsumerName { get; set; } + public SubscriptionType SubscriptionType { get; set; } } } From c8eb76935494c5a9678eea5e02d51123f61752aa Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Thu, 24 Feb 2022 00:41:22 +0100 Subject: [PATCH 13/32] v2.1/bug/no_ticket: check if totalcount of pointers are more than ackedPonterMessages --- .../Background/Services/ConsumerArchiveBackgroundService.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs b/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs index 5a20965..21dd520 100644 --- a/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs +++ b/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs @@ -71,8 +71,10 @@ private void BackgroundTaskTimer_Elapsed(object sender, ElapsedEventArgs e) StopService(); + int totalCount = _consumerPointerContext.ConsumerMessages.Count(); var ackedPointerMessages = _consumerPointerContext.ConsumerMessages.Where(x => x.IsAcknowledged == true).OrderBy(x => x.SentDate).Take(_partitionConfiguration.SizeInMemory); - _consumerPointerContext.BulkDelete(ackedPointerMessages.ToList()); + if (totalCount > ackedPointerMessages.Count()) + _consumerPointerContext.BulkDelete(ackedPointerMessages.ToList()); StartService(); } From c0d5d16e2073f98b28483875e4adcb14d0a5a0ac Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Thu, 24 Feb 2022 00:45:42 +0100 Subject: [PATCH 14/32] bugfix --- src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs b/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs index c3c6370..017b7de 100644 --- a/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs +++ b/src/Storage.Model/Events/Consumers/ConsumerDisconnectedArgs.cs @@ -1,4 +1,5 @@ -using System; +using Buildersoft.Andy.X.Storage.Model.App.Consumers; +using System; namespace Buildersoft.Andy.X.Storage.Model.Events.Consumers { From 6e9697d31c63659c7e05c0fde8b8c72aca7f9795 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 28 Feb 2022 10:43:22 +0100 Subject: [PATCH 15/32] v2.1/feature/82 Add SkipCertificate into XNode Configuration for Production Update XNodeConfiguration; Update XNodeEventService to enable reconnection when Connect is closed; Implement SkipCertificate on XNodeConnectionProvider --- src/Storage.App/Program.cs | 5 ----- src/Storage.App/appsettings.json | 3 ++- src/Storage.Core/Provider/XNodeConnectionProvider.cs | 7 +++++++ src/Storage.Core/Service/XNodes/XNodeEventService.cs | 5 ++++- src/Storage.Model/Configuration/XNodeConfiguration.cs | 6 ++++++ 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/Storage.App/Program.cs b/src/Storage.App/Program.cs index 8d7978d..97b9a5c 100644 --- a/src/Storage.App/Program.cs +++ b/src/Storage.App/Program.cs @@ -25,11 +25,6 @@ public static void Main(string[] args) if (Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH") != null) Environment.SetEnvironmentVariable("ASPNETCORE_Kestrel__Certificates__Default__Path", Environment.GetEnvironmentVariable("ANDYX_CERTIFICATE_DEFAULT_PATH")); - // Commented this for know; found BUG, can not run on docker, is asking for certificate. - //if (Environment.GetEnvironmentVariable("ANDYX_URLS") != null) - // Environment.SetEnvironmentVariable("ASPNETCORE_URLS", Environment.GetEnvironmentVariable("ANDYX_URLS")); - //else - // Environment.SetEnvironmentVariable("ASPNETCORE_URLS", "https://+:443;http://+:80"); try { diff --git a/src/Storage.App/appsettings.json b/src/Storage.App/appsettings.json index 240c8a1..d0e3b0b 100644 --- a/src/Storage.App/appsettings.json +++ b/src/Storage.App/appsettings.json @@ -24,7 +24,8 @@ "Subscription": 1, "JwtToken": "na", "Username": "admin", - "Password": "admin" + "Password": "admin", + "SkipCertificate": false //"CertificateFile": "CERT_FILE", //"CertificatePassword": "CERT_PASSWORD" } diff --git a/src/Storage.Core/Provider/XNodeConnectionProvider.cs b/src/Storage.Core/Provider/XNodeConnectionProvider.cs index 3d8cd67..ce1f8e8 100644 --- a/src/Storage.Core/Provider/XNodeConnectionProvider.cs +++ b/src/Storage.Core/Provider/XNodeConnectionProvider.cs @@ -66,6 +66,13 @@ private void BuildConnectionWithAgent(XNodeConfiguration nodeConfig) { if (message is HttpClientHandler httpClientHandler) { + if (nodeConfig.CertificateFile == "") + { + httpClientHandler.ServerCertificateCustomValidationCallback += + (sender, certificate, chain, sslPolicyErrors) => { return true; }; + return message; + } + httpClientHandler.ClientCertificateOptions = ClientCertificateOption.Manual; httpClientHandler.SslProtocols = SslProtocols.Tls12; var certLocation = Path.Combine(SystemLocations.GetConfigCertificateDirectory(), nodeConfig.CertificateFile); diff --git a/src/Storage.Core/Service/XNodes/XNodeEventService.cs b/src/Storage.Core/Service/XNodes/XNodeEventService.cs index 90e3e27..d764865 100644 --- a/src/Storage.Core/Service/XNodes/XNodeEventService.cs +++ b/src/Storage.Core/Service/XNodes/XNodeEventService.cs @@ -130,7 +130,6 @@ public XNodeEventService(ILogger logger, _connection.On("MessageStored", msgStored => MessageStored?.Invoke(msgStored)); InitializeEventHandlers(); - ConnectAsync(); xNodeConnectionRepository.AddService(nodeConfig.ServiceUrl, agentId, this); @@ -139,6 +138,10 @@ public XNodeEventService(ILogger logger, private Task _connection_Closed(Exception arg) { logger.LogError($"Agent connection is closed, details {arg.Message}"); + + // try to reconnect + ConnectAsync(); + return Task.CompletedTask; } diff --git a/src/Storage.Model/Configuration/XNodeConfiguration.cs b/src/Storage.Model/Configuration/XNodeConfiguration.cs index 170d525..e6deb73 100644 --- a/src/Storage.Model/Configuration/XNodeConfiguration.cs +++ b/src/Storage.Model/Configuration/XNodeConfiguration.cs @@ -8,8 +8,14 @@ public class XNodeConfiguration public string Username { get; set; } public string Password { get; set; } + public bool SkipCertificate { get; set; } public string CertificateFile { get; set; } public string CertificatePassword { get; set; } + + public XNodeConfiguration() + { + SkipCertificate = true; + } } public enum Subscription From 6e9e294f1d771436f524295a7d0df17a12ff3d9b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 28 Feb 2022 10:47:42 +0100 Subject: [PATCH 16/32] Update links on README.md file --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 75e4916..bfefa3b 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Andy X Data Storage is an open-source standalone service that is used to store m ## Get Started -Follow the [Getting Started](https://andyx.azurewebsites.net/) instructions how to run Andy X. +Follow the [Getting Started](https://buildersoftdev.azurewebsites.net/andyx) instructions how to run Andy X. For local development and testing, you can run Andy X within a Docker container, for more info click [here](https://hub.docker.com/u/buildersoftdev) @@ -23,8 +23,8 @@ Security issues and bugs should be reported privately, via email, en.buildersoft These are some other repos for related projects: -* [Andy X Dashboard](https://github.com/buildersoftdev/andyxdashboard) - Dashboard for Andy X Node -* [Andy X Terminal](https://github.com/buildersoftdev/andyxterminal) - Manage all resources of Andy X +* [Andy X](https://github.com/buildersoftdev/andyx) - Andy X Node, consume and produce messages +* [Andy X Cli](https://github.com/buildersoftdev/andyx-cli) - Manage all resources of Andy X ## Deploying Andy X Data Storage with docker-compose From 8a3a05d7ad45d43f1e31c36816e28693c058a486 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 28 Feb 2022 11:27:51 +0100 Subject: [PATCH 17/32] hot_fix: SkipCertificate was not involved --- .../Provider/XNodeConnectionProvider.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Storage.Core/Provider/XNodeConnectionProvider.cs b/src/Storage.Core/Provider/XNodeConnectionProvider.cs index ce1f8e8..71baec2 100644 --- a/src/Storage.Core/Provider/XNodeConnectionProvider.cs +++ b/src/Storage.Core/Provider/XNodeConnectionProvider.cs @@ -66,19 +66,19 @@ private void BuildConnectionWithAgent(XNodeConfiguration nodeConfig) { if (message is HttpClientHandler httpClientHandler) { - if (nodeConfig.CertificateFile == "") + if (nodeConfig.SkipCertificate == true) { httpClientHandler.ServerCertificateCustomValidationCallback += (sender, certificate, chain, sslPolicyErrors) => { return true; }; - return message; } - - httpClientHandler.ClientCertificateOptions = ClientCertificateOption.Manual; - httpClientHandler.SslProtocols = SslProtocols.Tls12; - var certLocation = Path.Combine(SystemLocations.GetConfigCertificateDirectory(), nodeConfig.CertificateFile); - httpClientHandler.ClientCertificates.Add(new X509Certificate2(certLocation, nodeConfig.CertificatePassword)); + else + { + httpClientHandler.ClientCertificateOptions = ClientCertificateOption.Manual; + httpClientHandler.SslProtocols = SslProtocols.Tls12; + var certLocation = Path.Combine(SystemLocations.GetConfigCertificateDirectory(), nodeConfig.CertificateFile); + httpClientHandler.ClientCertificates.Add(new X509Certificate2(certLocation, nodeConfig.CertificatePassword)); + } } - return message; }; } From edea87a4942208f655287e1baec8211718981d53 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Thu, 3 Mar 2022 16:18:04 +0100 Subject: [PATCH 18/32] v2.1/feature/83 Create SyncService Consumers and Producers connected between Nodes in a Cluster Add NotifyConsumerConnection, Add NotifyProducerConnection, Implement into connected and disconnected events on Producer and Consumer Event Handler --- .../Provider/XNodeConnectionProvider.cs | 1 + .../XNodes/Handlers/ConsumerEventHandler.cs | 67 ++++++++++++++++++- .../XNodes/Handlers/ProducerEventHandler.cs | 55 ++++++++++++++- .../ConsumerArchiveBackgroundService.cs | 5 +- .../Connectors/ConsumerConnector.cs | 4 ++ .../Connectors/MessageStorageConnector.cs | 4 ++ .../Consumer/NotifyConsumerConnection.cs | 26 +++++++ .../Producer/NotifyProducerConnection.cs | 19 ++++++ 8 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 src/Storage.Model/Commands/Consumer/NotifyConsumerConnection.cs create mode 100644 src/Storage.Model/Commands/Producer/NotifyProducerConnection.cs diff --git a/src/Storage.Core/Provider/XNodeConnectionProvider.cs b/src/Storage.Core/Provider/XNodeConnectionProvider.cs index 71baec2..7718187 100644 --- a/src/Storage.Core/Provider/XNodeConnectionProvider.cs +++ b/src/Storage.Core/Provider/XNodeConnectionProvider.cs @@ -29,6 +29,7 @@ public XNodeConnectionProvider(XNodeConfiguration nodeConfig, this.dataStorageConfig = dataStorageConfig; this.agentConfiguration = agentConfiguration; this.agentId = agentId; + ConnectToXNode(); } diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index e2ff64b..0aed17d 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -3,6 +3,7 @@ using Buildersoft.Andy.X.Storage.IO.Services; using Buildersoft.Andy.X.Storage.Model.App.Consumers; using Buildersoft.Andy.X.Storage.Model.App.Messages; +using Buildersoft.Andy.X.Storage.Model.Commands.Consumer; using Buildersoft.Andy.X.Storage.Model.Contexts; using Buildersoft.Andy.X.Storage.Model.Events.Consumers; using Buildersoft.Andy.X.Storage.Model.Files; @@ -50,7 +51,7 @@ private void InitializeEvents() _xNodeEventService.MessageAcknowledged += XNodeEventService_MessageAcknowledged; } - private void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj) + private async void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj) { _consumerIOService.TryCreateConsumerDirectory(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Consumer() { @@ -64,9 +65,25 @@ private void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj) ConsumerSettings = new ConsumerSettings() { InitialPosition = obj.InitialPosition }, CreatedDate = DateTime.Now }); + + // notify other nodes in cluster that a consumer has been disconnected + await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() + { + ConnectionType = ConnectionType.Connected, + + Id = obj.Id, + SubscriptionType = obj.SubscriptionType, + Component = obj.Component, + ConsumerName = obj.ConsumerName, + InitialPosition = InitialPosition.Latest, + Product = obj.Product, + Tenant = obj.Tenant, + Topic = obj.Topic, + }); + } - private void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedArgs obj) + private async void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedArgs obj) { _consumerIOService.WriteDisconnectedConsumerLog(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Consumer() { @@ -83,6 +100,22 @@ private void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedArgs obj if (obj.SubscriptionType != SubscriptionType.Shared) ReleaseUnacknoledgedMessageTasks(consumerKey); + + // notify other nodes in cluster that a consumer has been disconnected + await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() + { + ConnectionType = ConnectionType.Disconnected, + + Id = obj.Id, + SubscriptionType = obj.SubscriptionType, + Component = obj.Component, + ConsumerName = obj.ConsumerName, + InitialPosition = InitialPosition.Latest, + Product = obj.Product, + Tenant = obj.Tenant, + Topic = obj.Topic, + }); + } private void XNodeEventService_MessageAcknowledged(Model.Events.Messages.MessageAcknowledgedArgs obj) @@ -90,6 +123,8 @@ private void XNodeEventService_MessageAcknowledged(Model.Events.Messages.Message _consumerIOService.WriteMessageAcknowledged(obj); } + + #region Unacknowledge Messages private void XNodeEventService_ConsumerUnacknowledgedMessagesRequested(ConsumerConnectedArgs obj) { string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); @@ -167,6 +202,10 @@ private void ReleaseUnacknoledgedMessageTasks(string consumerKey) _unacknowledgedMessageProcesses[consumerKey].Dispose(); _unacknowledgedMessageProcesses.TryRemove(consumerKey, out _); + + // Cleanup memory. + GC.Collect(); + GC.WaitForPendingFinalizers(); } private void CheckPointerDbConnection(ConsumerPointerContext tenantContext, string consumerKey) @@ -288,6 +327,30 @@ private List GetPartitionFiles(string tenant, string product, strin return sorted; } + #endregion + + + private async Task NotifyNodesForConsumerConnection(NotifyConsumerConnection obj) + { + if (obj.SubscriptionType != SubscriptionType.Shared) + { + _logger.LogInformation($"Notify other nodes for Consumer '{obj.ConsumerName}' connection status"); + // Transmit the message to other connected XNODES. + if (_xNodeEventService.GetXNodeConnectionRepository().GetAllServices().Count > 1) + { + foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // this node should be ignored because, it already produces the messages to consumers connected. + if (xNode.Key != _xNodeEventService.GetCurrentXNodeServiceUrl()) + { + //Transmit the message to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("NotifyNodesForConsumerConnection", obj); + } + } + } + } + } + private string GenerateConsumerKey(string tenant, string product, string component, string topic, string consumer) { diff --git a/src/Storage.Core/Service/XNodes/Handlers/ProducerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ProducerEventHandler.cs index 5c79b40..94acc6e 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ProducerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ProducerEventHandler.cs @@ -1,7 +1,11 @@ using Buildersoft.Andy.X.Storage.Core.Service.System; using Buildersoft.Andy.X.Storage.IO.Services; +using Buildersoft.Andy.X.Storage.Model.Commands.Producer; +using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; using System; +using System.Linq; +using System.Threading.Tasks; namespace Buildersoft.Andy.X.Storage.Core.Service.XNodes.Handlers { @@ -25,7 +29,7 @@ private void InitializeEvents() xNodeEventService.ProducerConnected += XNodeEventService_ProducerConnected; xNodeEventService.ProducerDisconnected += XNodeEventService_ProducerDisconnected; } - private void XNodeEventService_ProducerConnected(Model.Events.Producers.ProducerConnectedArgs obj) + private async void XNodeEventService_ProducerConnected(Model.Events.Producers.ProducerConnectedArgs obj) { producerIOService.TryCreateProducerDirectory(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Model.App.Producers.Producer() { @@ -33,9 +37,22 @@ private void XNodeEventService_ProducerConnected(Model.Events.Producers.Producer Name = obj.ProducerName, CreatedDate = DateTime.Now }); + + // connect + await NotifyNodesForProducerConnection(new NotifyProducerConnection() + { + ConnectionType = Model.Commands.Consumer.ConnectionType.Connected, + + Id = obj.Id, + ProducerName = obj.ProducerName, + Component = obj.Component, + Topic = obj.Topic, + Product = obj.Product, + Tenant = obj.Tenant + }); } - private void XNodeEventService_ProducerDisconnected(Model.Events.Producers.ProducerDisconnectedArgs obj) + private async void XNodeEventService_ProducerDisconnected(Model.Events.Producers.ProducerDisconnectedArgs obj) { producerIOService.WriteDisconnectedProducerLog(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Model.App.Producers.Producer() { @@ -43,6 +60,40 @@ private void XNodeEventService_ProducerDisconnected(Model.Events.Producers.Produ Name = obj.ProducerName, CreatedDate = DateTime.Now }); + + // disconnect + await NotifyNodesForProducerConnection(new NotifyProducerConnection() + { + ConnectionType = Model.Commands.Consumer.ConnectionType.Disconnected, + + Id = obj.Id, + ProducerName = obj.ProducerName, + Component = obj.Component, + Topic = obj.Topic, + Product = obj.Product, + Tenant = obj.Tenant + }); + } + + + private async Task NotifyNodesForProducerConnection(NotifyProducerConnection obj) + { + + logger.LogInformation($"Notify other nodes for Producer '{obj.ProducerName}' connection status"); + // Transmit the message to other connected XNODES. + if (xNodeEventService.GetXNodeConnectionRepository().GetAllServices().Count > 1) + { + foreach (var xNode in xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + // this node should be ignored because, it already produces the messages to consumers connected. + if (xNode.Key != xNodeEventService.GetCurrentXNodeServiceUrl()) + { + //Transmit the message to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("NotifyNodesForProducerConnection", obj); + } + } + } + } } } diff --git a/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs b/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs index 21dd520..3dfe75a 100644 --- a/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs +++ b/src/Storage.IO/Background/Services/ConsumerArchiveBackgroundService.cs @@ -50,7 +50,8 @@ private void InitializeBackgroundTask() backgroundTaskTimer.Elapsed += BackgroundTaskTimer_Elapsed; backgroundTaskTimer.AutoReset = true; - _logger.LogInformation($"Consumer pointer archivation service for '{_tenant}/{_product}/{_component}/{_topic}/{_consumer}' is initialized"); + + _logger.LogInformation($"Pointer Archivation Service for '{_tenant}/{_product}/{_component}/{_topic}/{_consumer}' is active"); } public void StopService() @@ -67,8 +68,6 @@ public void StartService() private void BackgroundTaskTimer_Elapsed(object sender, ElapsedEventArgs e) { - _logger.LogInformation($"Consumer pointer archivation service for '{_tenant}/{_product}/{_component}/{_topic}/{_consumer}' is triggered"); - StopService(); int totalCount = _consumerPointerContext.ConsumerMessages.Count(); diff --git a/src/Storage.IO/Connectors/ConsumerConnector.cs b/src/Storage.IO/Connectors/ConsumerConnector.cs index f52e49a..e16a6f1 100644 --- a/src/Storage.IO/Connectors/ConsumerConnector.cs +++ b/src/Storage.IO/Connectors/ConsumerConnector.cs @@ -187,6 +187,10 @@ public void StopAutoFlushPointer() _flushPointerTimer.Stop(); _consumerArchiveBackgroundService.StopService(); + + // Cleanup memory. + GC.Collect(); + GC.WaitForPendingFinalizers(); } } } diff --git a/src/Storage.IO/Connectors/MessageStorageConnector.cs b/src/Storage.IO/Connectors/MessageStorageConnector.cs index d1e8068..5392a5a 100644 --- a/src/Storage.IO/Connectors/MessageStorageConnector.cs +++ b/src/Storage.IO/Connectors/MessageStorageConnector.cs @@ -101,6 +101,10 @@ public void DisposeAutoFlushPointer() { _flushPointerTimer.Elapsed -= FlushPointerTimer_Elapsed; _flushPointerTimer.Stop(); + + // Cleanup memory. + GC.Collect(); + GC.WaitForPendingFinalizers(); } } diff --git a/src/Storage.Model/Commands/Consumer/NotifyConsumerConnection.cs b/src/Storage.Model/Commands/Consumer/NotifyConsumerConnection.cs new file mode 100644 index 0000000..7aa7c84 --- /dev/null +++ b/src/Storage.Model/Commands/Consumer/NotifyConsumerConnection.cs @@ -0,0 +1,26 @@ +using Buildersoft.Andy.X.Storage.Model.App.Consumers; +using System; + +namespace Buildersoft.Andy.X.Storage.Model.Commands.Consumer +{ + public class NotifyConsumerConnection + { + public string Tenant { get; set; } + public string Product { get; set; } + public string Component { get; set; } + public string Topic { get; set; } + + public ConnectionType ConnectionType { get; set; } + + public Guid Id { get; set; } + public string ConsumerName { get; set; } + public SubscriptionType SubscriptionType { get; set; } + public InitialPosition InitialPosition { get; set; } + } + + public enum ConnectionType + { + Connected, + Disconnected + } +} diff --git a/src/Storage.Model/Commands/Producer/NotifyProducerConnection.cs b/src/Storage.Model/Commands/Producer/NotifyProducerConnection.cs new file mode 100644 index 0000000..3874db8 --- /dev/null +++ b/src/Storage.Model/Commands/Producer/NotifyProducerConnection.cs @@ -0,0 +1,19 @@ +using Buildersoft.Andy.X.Storage.Model.Commands.Consumer; +using System; + +namespace Buildersoft.Andy.X.Storage.Model.Commands.Producer +{ + public class NotifyProducerConnection + { + public string Tenant { get; set; } + public string Product { get; set; } + public string Component { get; set; } + public string Topic { get; set; } + + public Guid Id { get; set; } + public string ProducerName { get; set; } + + public ConnectionType ConnectionType { get; set; } + + } +} From 65c4192e6d30322ae36683b2494b142aaf7c925a Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sun, 6 Mar 2022 16:30:00 +0100 Subject: [PATCH 19/32] v2.1/bugfix/87 Memory is not being released Update PartitionConfiguration; Create Message Partition Files by Hours; Release Memory when there is no processing required --- src/Storage.App/Storage.App.csproj | 3 +- src/Storage.App/appsettings.json | 1 + .../Service/System/SystemService.cs | 9 +-- .../XNodes/Handlers/ConsumerEventHandler.cs | 60 +++++++++++++------ .../Service/XNodes/XNodeEventService.cs | 6 +- src/Storage.Core/Storage.Core.csproj | 2 +- .../Connectors/ConsumerConnector.cs | 35 ++++++++--- .../Connectors/MessageStorageConnector.cs | 39 ++++++++++-- src/Storage.IO/Locations/MessageLocations.cs | 2 +- src/Storage.IO/Services/MessageIOService.cs | 4 +- src/Storage.IO/Storage.IO.csproj | 2 +- .../Configuration/PartitionConfiguration.cs | 1 + src/Storage.Model/Storage.Model.csproj | 2 +- src/Storage.Utility/Storage.Utility.csproj | 2 +- 14 files changed, 124 insertions(+), 44 deletions(-) diff --git a/src/Storage.App/Storage.App.csproj b/src/Storage.App/Storage.App.csproj index f035ab4..ae73ea8 100644 --- a/src/Storage.App/Storage.App.csproj +++ b/src/Storage.App/Storage.App.csproj @@ -5,13 +5,14 @@ 7e4fa6b7-add2-44ec-a908-b95747757b49 Linux Buildersoft.Andy.X.Storage.App - 2.0.0 + 2.1.0 Buildersoft Buildersoft Andy Buildersoft Buildersoft Andy X is a distributed messaging system. This system will empower developers to move into Event Driven Systems. Andy X is a multi-tenant system. Copyright © Buildersoft 2022 ..\.. + True diff --git a/src/Storage.App/appsettings.json b/src/Storage.App/appsettings.json index d0e3b0b..2a19420 100644 --- a/src/Storage.App/appsettings.json +++ b/src/Storage.App/appsettings.json @@ -44,6 +44,7 @@ "Partition": { "SizeInMemory": 3000, + "BatchSize": 3000, "FlushInterval": 5000, "PointerAcknowledgedMessageArchivationInterval": 3600000 //"PointerAcknowledgedMessageArchivationInterval": 180000 diff --git a/src/Storage.Core/Service/System/SystemService.cs b/src/Storage.Core/Service/System/SystemService.cs index 7d338a6..19ac4a8 100644 --- a/src/Storage.Core/Service/System/SystemService.cs +++ b/src/Storage.Core/Service/System/SystemService.cs @@ -22,7 +22,7 @@ public class SystemService private readonly TenantIOService _tenantIOService; private readonly ProducerIOService _producerIOService; private readonly ConsumerIOService _consumerIOService; - private readonly MessageIOService _messageIOService2; + private readonly MessageIOService _messageIOService; private readonly List nodes; private readonly DataStorageConfiguration dataStorage; private readonly AgentConfiguration agent; @@ -38,7 +38,7 @@ public SystemService( TenantIOService tenantIOService, ProducerIOService producerIOService, ConsumerIOService consumerIOService, - MessageIOService messageIOService2) + MessageIOService messageIOService) { _logger = logger; _serviceProvider = serviceProvider; @@ -48,7 +48,7 @@ public SystemService( _tenantIOService = tenantIOService; _producerIOService = producerIOService; _consumerIOService = consumerIOService; - _messageIOService2 = messageIOService2; + _messageIOService = messageIOService; nodes = _serviceProvider.GetService(typeof(List)) as List; dataStorage = _serviceProvider.GetService(typeof(DataStorageConfiguration)) as DataStorageConfiguration; agent = _serviceProvider.GetService(typeof(AgentConfiguration)) as AgentConfiguration; @@ -136,12 +136,13 @@ private void InitializeServices() agentId, xnode, dataStorage, + partition, agent, _xNodeConnectionRepository, _tenantIOService, _producerIOService, _consumerIOService, - _messageIOService2); + _messageIOService); } } else diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 0aed17d..6c66823 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -4,6 +4,7 @@ using Buildersoft.Andy.X.Storage.Model.App.Consumers; using Buildersoft.Andy.X.Storage.Model.App.Messages; using Buildersoft.Andy.X.Storage.Model.Commands.Consumer; +using Buildersoft.Andy.X.Storage.Model.Configuration; using Buildersoft.Andy.X.Storage.Model.Contexts; using Buildersoft.Andy.X.Storage.Model.Events.Consumers; using Buildersoft.Andy.X.Storage.Model.Files; @@ -13,6 +14,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Globalization; using System.IO; using System.Linq; using System.Threading; @@ -26,18 +28,22 @@ public class ConsumerEventHandler private readonly XNodeEventService _xNodeEventService; private readonly ConsumerIOService _consumerIOService; private readonly MessageIOService _messageIOService; + private readonly PartitionConfiguration _partitionConfiguration; private readonly ConcurrentDictionary _unacknowledgedMessageProcesses; public ConsumerEventHandler( ILogger logger, XNodeEventService xNodeEventService, ConsumerIOService consumerIOService, - MessageIOService messageIOService) + MessageIOService messageIOService, + PartitionConfiguration partitionConfiguration) { _logger = logger; _xNodeEventService = xNodeEventService; _consumerIOService = consumerIOService; _messageIOService = messageIOService; + _partitionConfiguration = partitionConfiguration; + _unacknowledgedMessageProcesses = new ConcurrentDictionary(); InitializeEvents(); @@ -106,16 +112,15 @@ await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() { ConnectionType = ConnectionType.Disconnected, - Id = obj.Id, - SubscriptionType = obj.SubscriptionType, + Tenant = obj.Tenant, + Product = obj.Product, + Topic = obj.Topic, Component = obj.Component, + Id = obj.Id, ConsumerName = obj.ConsumerName, + SubscriptionType = obj.SubscriptionType, InitialPosition = InitialPosition.Latest, - Product = obj.Product, - Tenant = obj.Tenant, - Topic = obj.Topic, }); - } private void XNodeEventService_MessageAcknowledged(Model.Events.Messages.MessageAcknowledgedArgs obj) @@ -202,10 +207,6 @@ private void ReleaseUnacknoledgedMessageTasks(string consumerKey) _unacknowledgedMessageProcesses[consumerKey].Dispose(); _unacknowledgedMessageProcesses.TryRemove(consumerKey, out _); - - // Cleanup memory. - GC.Collect(); - GC.WaitForPendingFinalizers(); } private void CheckPointerDbConnection(ConsumerPointerContext tenantContext, string consumerKey) @@ -259,9 +260,10 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List(); foreach (var row in rows) { - var consumerMessage = new ConsumerMessage() + consumerMessages.Add(new ConsumerMessage() { Consumer = obj.ConsumerName, Message = new Message() @@ -277,12 +279,35 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List(), Headers = row.Headers.JsonToObject>() } - }; + }); + await SendToNodes(consumerMessages); + } + await SendToNodes(consumerMessages, true); + } - foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + private async Task SendToNodes(List consumerMessages, bool sendTheRest = false) + { + if (sendTheRest == false) + { + if (consumerMessages.Count == _partitionConfiguration.BatchSize) + { + foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + //Transmit messages to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessages); + } + consumerMessages.Clear(); + } + } + else + { + if (consumerMessages.Count > 0) { - //Transmit the message to the other nodes. - await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessage); + foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices()) + { + //Transmit messages to the other nodes. + await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessages); + } } } } @@ -318,7 +343,8 @@ private List GetPartitionFiles(string tenant, string product, strin string fileName = Path.GetFileNameWithoutExtension(partition); string[] partitionNameSplited = fileName.Split("_"); - var partitionDate = DateTime.Parse($"{partitionNameSplited[2]}-{partitionNameSplited[3]}-{partitionNameSplited[4]}"); + var partitionDate = DateTime.ParseExact($"{partitionNameSplited[2]}-{partitionNameSplited[3]}-{partitionNameSplited[4]} {partitionNameSplited[5]}:00:00", "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); + messages.Add(new MessageFile() { Path = partition, PartitionDate = partitionDate }); }); diff --git a/src/Storage.Core/Service/XNodes/XNodeEventService.cs b/src/Storage.Core/Service/XNodes/XNodeEventService.cs index d764865..1a1dc55 100644 --- a/src/Storage.Core/Service/XNodes/XNodeEventService.cs +++ b/src/Storage.Core/Service/XNodes/XNodeEventService.cs @@ -70,11 +70,13 @@ public class XNodeEventService private string agentId; private readonly XNodeConfiguration nodeConfig; + private readonly PartitionConfiguration partitionConfiguration; public XNodeEventService(ILogger logger, string agentId, XNodeConfiguration nodeConfig, DataStorageConfiguration dataStorageConfig, + PartitionConfiguration partitionConfiguration, AgentConfiguration agentConfiguration, IXNodeConnectionRepository xNodeConnectionRepository, TenantIOService tenantIOService, @@ -90,7 +92,7 @@ public XNodeEventService(ILogger logger, this.messageIOService = messageIOService; this.agentId = agentId; this.nodeConfig = nodeConfig; - + this.partitionConfiguration = partitionConfiguration; var provider = new XNodeConnectionProvider(nodeConfig, dataStorageConfig, agentConfiguration, agentId); _connection = provider.GetHubConnection(); @@ -162,7 +164,7 @@ private void InitializeEventHandlers() agentEventHandler = new AgentEventHandler(logger, this, tenantIOService); tenantEventHandler = new TenantEventHandler(logger, this, tenantIOService); producerEventHandler = new ProducerEventHandler(logger, this, producerIOService); - consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService); + consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService, partitionConfiguration); messageEventHandler = new MessageEventHandler(logger, this, messageIOService); } diff --git a/src/Storage.Core/Storage.Core.csproj b/src/Storage.Core/Storage.Core.csproj index 3401c9d..7378497 100644 --- a/src/Storage.Core/Storage.Core.csproj +++ b/src/Storage.Core/Storage.Core.csproj @@ -2,7 +2,7 @@ net6.0 - 2.0.0 + 2.1.0 Buildersoft Buildersoft Andy Buildersoft diff --git a/src/Storage.IO/Connectors/ConsumerConnector.cs b/src/Storage.IO/Connectors/ConsumerConnector.cs index e16a6f1..b70eda3 100644 --- a/src/Storage.IO/Connectors/ConsumerConnector.cs +++ b/src/Storage.IO/Connectors/ConsumerConnector.cs @@ -31,6 +31,7 @@ public class ConsumerConnector public Model.Threading.ThreadPool ThreadingPool { get; set; } public ConcurrentQueue MessagesBuffer { get; set; } + private bool isMemoryReleased; public int Count { get; set; } public ConcurrentDictionary BatchAcknowledgedConsumerMessagesToMerge { get; set; } @@ -65,14 +66,14 @@ public ConsumerConnector(ILogger logger, BatchUnacknowledgedConsumerMessagesToMerge = new ConcurrentDictionary(); ConsumerPointerContext = consumerPointer; Count = 0; - + isMemoryReleased = true; try { consumerPointer.ChangeTracker.AutoDetectChangesEnabled = false; consumerPointer.Database.EnsureCreated(); - // database exists - // create new instance of Backend ConsumerArchiveBackgroundService + // Database exists + // Create new instance of Backend ConsumerArchiveBackgroundService _consumerArchiveBackgroundService = new ConsumerArchiveBackgroundService(logger, tenant, product, component, topic, consumer, partitionConfiguration, consumerPointer); _consumerArchiveBackgroundService.StartService(); } @@ -95,9 +96,33 @@ private void FlushPointerTimer_Elapsed(object sender, ElapsedEventArgs e) AutoFlushAcknowledgedBatchPointers(); AutoFlushUnacknowledgedBatchPointers(); + ReleaseMemory(); + _flushPointerTimer.Start(); } + private void ReleaseMemory() + { + if (isMemoryReleased == false) + { + if (MessagesBuffer.Count == 0 && BatchAcknowledgedConsumerMessagesToMerge.Count == 0 && BatchUnacknowledgedConsumerMessagesToMerge.Count == 0) + { + // ConsumerPointerContext.Dispose(); + GC.Collect(); + GC.SuppressFinalize(this); + GC.SuppressFinalize(ConsumerPointerContext); + GC.SuppressFinalize(MessagesBuffer); + GC.SuppressFinalize(BatchAcknowledgedConsumerMessagesToMerge); + GC.SuppressFinalize(BatchUnacknowledgedConsumerMessagesToMerge); + + //GC.Collect(); + //GC.WaitForPendingFinalizers(); + + isMemoryReleased = true; + } + } + } + private void AutoFlushAcknowledgedBatchPointers() { lock (BatchAcknowledgedConsumerMessagesToMerge) @@ -187,10 +212,6 @@ public void StopAutoFlushPointer() _flushPointerTimer.Stop(); _consumerArchiveBackgroundService.StopService(); - - // Cleanup memory. - GC.Collect(); - GC.WaitForPendingFinalizers(); } } } diff --git a/src/Storage.IO/Connectors/MessageStorageConnector.cs b/src/Storage.IO/Connectors/MessageStorageConnector.cs index 5392a5a..2793e6b 100644 --- a/src/Storage.IO/Connectors/MessageStorageConnector.cs +++ b/src/Storage.IO/Connectors/MessageStorageConnector.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; +using System.Diagnostics; using System.Timers; namespace Buildersoft.Andy.X.Storage.IO.Connectors @@ -21,6 +21,8 @@ public class MessageStorageConnector public ConcurrentQueue MessagesBuffer { get; set; } public ConcurrentDictionary BatchMessagesToInsert { get; set; } + public bool isMemoryReleased { get; set; } + public MessageStorageConnector(PartitionConfiguration partitionConfiguration, int agentCount) { _partitionConfiguration = partitionConfiguration; @@ -36,6 +38,8 @@ public MessageStorageConnector(PartitionConfiguration partitionConfiguration, in _flushPointerTimer.Elapsed += FlushPointerTimer_Elapsed; _flushPointerTimer.AutoReset = true; _flushPointerTimer.Start(); + + isMemoryReleased = true; } private void FlushPointerTimer_Elapsed(object sender, ElapsedEventArgs e) @@ -44,13 +48,38 @@ private void FlushPointerTimer_Elapsed(object sender, ElapsedEventArgs e) FlushBatchToDisk(); + ReleaseMemory(); + _flushPointerTimer.Start(); } + private void ReleaseMemory() + { + if (isMemoryReleased == false) + { + if (MessagesBuffer.Count == 0 && BatchMessagesToInsert.Count == 0) + { + // MessageContext.Dispose(); + GC.Collect(); + GC.SuppressFinalize(this); + GC.SuppressFinalize(MessageContext); + GC.SuppressFinalize(MessagesBuffer); + GC.SuppressFinalize(BatchMessagesToInsert); + + //GC.Collect(); + //GC.WaitForPendingFinalizers(); + + isMemoryReleased = true; + } + } + } + private void FlushBatchToDisk() { try { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.Start(); if (ThreadingPool.AreThreadsRunning == true) { if (BatchMessagesToInsert.Count >= _partitionConfiguration.SizeInMemory) @@ -90,6 +119,9 @@ public void CreateMessageFile() { MessageContext.ChangeTracker.AutoDetectChangesEnabled = false; MessageContext.Database.EnsureCreated(); + + // Enable Memory Release + isMemoryReleased = false; } catch (Exception) { @@ -101,11 +133,6 @@ public void DisposeAutoFlushPointer() { _flushPointerTimer.Elapsed -= FlushPointerTimer_Elapsed; _flushPointerTimer.Stop(); - - // Cleanup memory. - GC.Collect(); - GC.WaitForPendingFinalizers(); } } - } diff --git a/src/Storage.IO/Locations/MessageLocations.cs b/src/Storage.IO/Locations/MessageLocations.cs index 14c14de..602abf5 100644 --- a/src/Storage.IO/Locations/MessageLocations.cs +++ b/src/Storage.IO/Locations/MessageLocations.cs @@ -7,7 +7,7 @@ public static class MessageLocations { public static string GetMessagePartitionFile(string tenantName, string productName, string componentName, string topicName, DateTime date) { - return Path.Combine(TenantLocations.GetMessageRootDirectory(tenantName, productName, componentName, topicName), $"msg_part_{date:yyyy_MM_dd}.xandy"); + return Path.Combine(TenantLocations.GetMessageRootDirectory(tenantName, productName, componentName, topicName), $"msg_part_{date:yyyy_MM_dd_HH}.xandy"); } } } diff --git a/src/Storage.IO/Services/MessageIOService.cs b/src/Storage.IO/Services/MessageIOService.cs index 81eb52a..d6f6b26 100644 --- a/src/Storage.IO/Services/MessageIOService.cs +++ b/src/Storage.IO/Services/MessageIOService.cs @@ -37,7 +37,7 @@ public MessageIOService( public bool InitializeMessageFileConnector(string tenant, string product, string component, string topic, DateTime date) { - string topicKey = $"{tenant}~{product}~{component}~{topic}~{date:yyyy_MM_dd}"; + string topicKey = $"{tenant}~{product}~{component}~{topic}~{date:yyyy_MM_dd_HH}"; lock (connectors) { try @@ -146,7 +146,7 @@ private void MessagingProcessor(string topicKey, Guid threadId) public string AddMessageFileConnectorGetKey(string tenant, string product, string component, string topic, DateTime date) { - string topicKey = $"{tenant}~{product}~{component}~{topic}~{date:yyyy_MM_dd}"; + string topicKey = $"{tenant}~{product}~{component}~{topic}~{date:yyyy_MM_dd_HH}"; InitializeMessageFileConnector(tenant, product, component, topic, date); diff --git a/src/Storage.IO/Storage.IO.csproj b/src/Storage.IO/Storage.IO.csproj index 003f79a..c4ee443 100644 --- a/src/Storage.IO/Storage.IO.csproj +++ b/src/Storage.IO/Storage.IO.csproj @@ -3,7 +3,7 @@ net6.0 Buildersoft.Andy.X.Storage.IO - 2.0.0 + 2.1.0 Buildersoft Buildersoft Andy Buildersoft diff --git a/src/Storage.Model/Configuration/PartitionConfiguration.cs b/src/Storage.Model/Configuration/PartitionConfiguration.cs index e3d3b81..659e7d3 100644 --- a/src/Storage.Model/Configuration/PartitionConfiguration.cs +++ b/src/Storage.Model/Configuration/PartitionConfiguration.cs @@ -3,6 +3,7 @@ public class PartitionConfiguration { public int SizeInMemory { get; set; } + public int BatchSize { get; set; } public int FlushInterval { get; set; } // its in miliseconds public double PointerAcknowledgedMessageArchivationInterval { get; set; } // its in miliseconds } diff --git a/src/Storage.Model/Storage.Model.csproj b/src/Storage.Model/Storage.Model.csproj index 3b2e316..a7168b3 100644 --- a/src/Storage.Model/Storage.Model.csproj +++ b/src/Storage.Model/Storage.Model.csproj @@ -3,7 +3,7 @@ net6.0 Buildersoft.Andy.X.Storage.Model - 2.0.0 + 2.1.0 Buildersoft Buildersoft Andy Buildersoft diff --git a/src/Storage.Utility/Storage.Utility.csproj b/src/Storage.Utility/Storage.Utility.csproj index fb1d6b6..d403946 100644 --- a/src/Storage.Utility/Storage.Utility.csproj +++ b/src/Storage.Utility/Storage.Utility.csproj @@ -2,7 +2,7 @@ net6.0 - 2.0.0 + 2.1.0 Buildersoft Buildersoft Andy Buildersoft From cab49d0c7efb8456c46918931c692e137779bc7f Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sun, 6 Mar 2022 17:48:27 +0100 Subject: [PATCH 20/32] v2.1/feature/86 Store Messages as Batch in Storage Add MessagesStored Event --- .../XNodes/Handlers/MessageEventHandler.cs | 22 +++++++++++++++++++ .../Service/XNodes/XNodeEventService.cs | 3 +++ 2 files changed, 25 insertions(+) diff --git a/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs index 464b2c2..447d040 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/MessageEventHandler.cs @@ -3,6 +3,7 @@ using Buildersoft.Andy.X.Storage.Model.Events.Messages; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -27,6 +28,27 @@ public MessageEventHandler(ILogger logger, private void InitializeEvents() { xNodeEventService.MessageStored += XNodeEventService_MessageStored; + xNodeEventService.MessagesStored += XNodeEventService_MessagesStored; + } + + private async void XNodeEventService_MessagesStored(List obj) + { + foreach (var message in obj) + { + messageIOService.StoreMessage(new Model.App.Messages.Message() + { + Tenant = message.Tenant, + Id = message.Id, + Component = message.Component, + MessageRaw = message.MessageRaw, + Headers = message.Headers, + Product = message.Product, + Topic = message.Topic, + SentDate = message.SentDate + }); + + await RetransmitMessageToOtherNodes(message); + } } private async void XNodeEventService_MessageStored(MessageStoredArgs obj) diff --git a/src/Storage.Core/Service/XNodes/XNodeEventService.cs b/src/Storage.Core/Service/XNodes/XNodeEventService.cs index 1a1dc55..05bce23 100644 --- a/src/Storage.Core/Service/XNodes/XNodeEventService.cs +++ b/src/Storage.Core/Service/XNodes/XNodeEventService.cs @@ -15,6 +15,7 @@ using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.Logging; using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -52,6 +53,7 @@ public class XNodeEventService public event Action TopicUpdated; public event Action MessageStored; + public event Action> MessagesStored; public event Action MessageAcknowledged; public event Action ProducerConnected; @@ -130,6 +132,7 @@ public XNodeEventService(ILogger logger, _connection.On("MessageAcknowledged", messageAcked => MessageAcknowledged?.Invoke(messageAcked)); _connection.On("MessageStored", msgStored => MessageStored?.Invoke(msgStored)); + _connection.On>("MessagesStored", msgStored => MessagesStored?.Invoke(msgStored)); InitializeEventHandlers(); ConnectAsync(); From 15e8274e49140c77f3087150ff0a350c049762f6 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Mon, 7 Mar 2022 09:52:30 +0100 Subject: [PATCH 21/32] fix Enable release check when message storing resumes --- src/Storage.IO/Connectors/ConsumerConnector.cs | 6 ++++++ src/Storage.IO/Connectors/MessageStorageConnector.cs | 8 +++++++- src/Storage.IO/Services/ConsumerIOService.cs | 1 + src/Storage.IO/Services/MessageIOService.cs | 1 + 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/Storage.IO/Connectors/ConsumerConnector.cs b/src/Storage.IO/Connectors/ConsumerConnector.cs index b70eda3..c024372 100644 --- a/src/Storage.IO/Connectors/ConsumerConnector.cs +++ b/src/Storage.IO/Connectors/ConsumerConnector.cs @@ -123,6 +123,12 @@ private void ReleaseMemory() } } + public void EnableReleaseMemoryFlag() + { + if (isMemoryReleased == true) + isMemoryReleased = false; + } + private void AutoFlushAcknowledgedBatchPointers() { lock (BatchAcknowledgedConsumerMessagesToMerge) diff --git a/src/Storage.IO/Connectors/MessageStorageConnector.cs b/src/Storage.IO/Connectors/MessageStorageConnector.cs index 2793e6b..bb9762c 100644 --- a/src/Storage.IO/Connectors/MessageStorageConnector.cs +++ b/src/Storage.IO/Connectors/MessageStorageConnector.cs @@ -21,7 +21,7 @@ public class MessageStorageConnector public ConcurrentQueue MessagesBuffer { get; set; } public ConcurrentDictionary BatchMessagesToInsert { get; set; } - public bool isMemoryReleased { get; set; } + private bool isMemoryReleased { get; set; } public MessageStorageConnector(PartitionConfiguration partitionConfiguration, int agentCount) { @@ -74,6 +74,12 @@ private void ReleaseMemory() } } + public void EnableReleaseMemoryFlag() + { + if (isMemoryReleased == true) + isMemoryReleased = false; + } + private void FlushBatchToDisk() { try diff --git a/src/Storage.IO/Services/ConsumerIOService.cs b/src/Storage.IO/Services/ConsumerIOService.cs index 3c8e840..9a3c133 100644 --- a/src/Storage.IO/Services/ConsumerIOService.cs +++ b/src/Storage.IO/Services/ConsumerIOService.cs @@ -208,6 +208,7 @@ public void WriteMessageAcknowledged(MessageAcknowledgedArgs message, string par PartitionIndex = 0 }); + connectors[consumerKey].EnableReleaseMemoryFlag(); InitializeMessagingProcessor(consumerKey); } diff --git a/src/Storage.IO/Services/MessageIOService.cs b/src/Storage.IO/Services/MessageIOService.cs index d6f6b26..598a1e7 100644 --- a/src/Storage.IO/Services/MessageIOService.cs +++ b/src/Storage.IO/Services/MessageIOService.cs @@ -70,6 +70,7 @@ public void StoreMessage(Message message) SentDate = message.SentDate, StoredDate = DateTime.Now }); + connectors[topicKey].EnableReleaseMemoryFlag(); InitializeMessagingProcessor(topicKey, message.SentDate); } From f8fc1c0bb4b69a5c08c4a11c839e19b9ba2e5afa Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Thu, 10 Mar 2022 00:09:01 +0100 Subject: [PATCH 22/32] bug/no_ticket : Fix memory consumption Update ConsumerEventHandler, ask GC to remove un needed data from memory --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 6c66823..62da069 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -252,6 +252,12 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows.Any(u => u.MessageId == r.MessageId)); + + // Remove from memory all not used data from here... + rows = null; + GC.Collect(); + GC.SuppressFinalize(this); + GC.SuppressFinalize(rows); } } From 638abc444a378c73edfae2a627ad667c7ffbf313 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Thu, 10 Mar 2022 16:42:01 +0100 Subject: [PATCH 23/32] bug fix --- src/Storage.App/appsettings.json | 3 ++- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 1 - src/Storage.IO/Services/ConsumerIOService.cs | 3 --- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/Storage.App/appsettings.json b/src/Storage.App/appsettings.json index 2a19420..1a5a464 100644 --- a/src/Storage.App/appsettings.json +++ b/src/Storage.App/appsettings.json @@ -20,7 +20,8 @@ "XNodes": [ { - "ServiceUrl": "https://localhost:6541", + //"ServiceUrl": "https://localhost:6541", + "ServiceUrl": "http://localhost:9001", "Subscription": 1, "JwtToken": "na", "Username": "admin", diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 62da069..fc14682 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -254,7 +254,6 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows.Any(u => u.MessageId == r.MessageId)); // Remove from memory all not used data from here... - rows = null; GC.Collect(); GC.SuppressFinalize(this); GC.SuppressFinalize(rows); diff --git a/src/Storage.IO/Services/ConsumerIOService.cs b/src/Storage.IO/Services/ConsumerIOService.cs index 9a3c133..87c473f 100644 --- a/src/Storage.IO/Services/ConsumerIOService.cs +++ b/src/Storage.IO/Services/ConsumerIOService.cs @@ -291,9 +291,6 @@ private void InitializeMessagingProcessor(string consumerKey) _logger.LogWarning($"Pointer controller for '{consumerKey}' stopped working, trying to start {timeOutCounter} of 10"); if (timeOutCounter == 10) { - // recreate connection - var consumerKeySplitted = consumerKey.Split('-'); - connectors[consumerKey].StopAutoFlushPointer(); connectors.TryRemove(consumerKey, out _); _logger.LogWarning($"Pointer controller for '{consumerKey}' couldn't start. Pointer controller is restarted"); From 293d8f1a20c047ccb09b00f13eb692bf17a87db8 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sat, 12 Mar 2022 15:45:10 +0100 Subject: [PATCH 24/32] v2.1/bugfix/91 Remove unackedlist removal from unacked --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index fc14682..4566695 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -250,8 +250,10 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows.Any(u => u.MessageId == r.MessageId)); + // here is a bug #91, is removing all items form the list + // for now we are removing this condition + //if (isNewConsumer != true) + // unackedMessages.ToList().RemoveAll(r => rows.Any(u => u.MessageId == r.MessageId)); // Remove from memory all not used data from here... GC.Collect(); From 55a0f6302e5b02c361b0cea87dd7bc85064840a8 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sun, 13 Mar 2022 12:42:06 +0100 Subject: [PATCH 25/32] v2.1/feature/93 Add Consumer Repository Add IConsumerConnectionRepository, Impelemnt ConsumerConnectionRepository, Update ConsumerEventHandler to support this repo, Fix Unacknowledged Process Release --- ...ns.cs => RepositoryInjectionExtensions.cs} | 9 +- .../IConsumerConnectionRepository.cs | 13 +++ .../Consumers/ConsumerConnectionRepository.cs | 53 +++++++++++ .../Service/System/SystemService.cs | 11 ++- .../XNodes/Handlers/ConsumerEventHandler.cs | 91 +++++++++++++------ .../Service/XNodes/XNodeEventService.cs | 11 ++- src/Storage.Model/App/Consumers/Consumer.cs | 25 ++++- 7 files changed, 179 insertions(+), 34 deletions(-) rename src/Storage.App/Extensions/DependencyInjection/{XNodeDependencyInjectionExtensions.cs => RepositoryInjectionExtensions.cs} (54%) create mode 100644 src/Storage.Core/Abstraction/Repository/Consumers/IConsumerConnectionRepository.cs create mode 100644 src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs diff --git a/src/Storage.App/Extensions/DependencyInjection/XNodeDependencyInjectionExtensions.cs b/src/Storage.App/Extensions/DependencyInjection/RepositoryInjectionExtensions.cs similarity index 54% rename from src/Storage.App/Extensions/DependencyInjection/XNodeDependencyInjectionExtensions.cs rename to src/Storage.App/Extensions/DependencyInjection/RepositoryInjectionExtensions.cs index 77edf9f..941230e 100644 --- a/src/Storage.App/Extensions/DependencyInjection/XNodeDependencyInjectionExtensions.cs +++ b/src/Storage.App/Extensions/DependencyInjection/RepositoryInjectionExtensions.cs @@ -1,14 +1,21 @@ using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Connection; +using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers; using Buildersoft.Andy.X.Storage.Core.Repository.Connection; +using Buildersoft.Andy.X.Storage.Core.Repository.Consumers; using Microsoft.Extensions.DependencyInjection; namespace Buildersoft.Andy.X.Storage.App.Extensions.DependencyInjection { - public static class XNodeDependencyInjectionExtensions + public static class RepositoryInjectionExtensions { public static void AddNodeServiceRepository(this IServiceCollection services) { services.AddSingleton(); } + + public static void AddConsumerConnectionRepository(this IServiceCollection services) + { + services.AddSingleton(); + } } } diff --git a/src/Storage.Core/Abstraction/Repository/Consumers/IConsumerConnectionRepository.cs b/src/Storage.Core/Abstraction/Repository/Consumers/IConsumerConnectionRepository.cs new file mode 100644 index 0000000..a7f86a3 --- /dev/null +++ b/src/Storage.Core/Abstraction/Repository/Consumers/IConsumerConnectionRepository.cs @@ -0,0 +1,13 @@ +using Buildersoft.Andy.X.Storage.Model.App.Consumers; + +namespace Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers +{ + public interface IConsumerConnectionRepository + { + Consumer GetConsumerById(string id); + void AddConsumer(string id, Consumer consumer); + void AddConsumerConnection(string id); + void RemoveConsumer(string id); + void RemoveConsumerConnection(string id); + } +} diff --git a/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs b/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs new file mode 100644 index 0000000..ab3abe7 --- /dev/null +++ b/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Concurrent; +using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers; +using Buildersoft.Andy.X.Storage.Model.App.Consumers; + +namespace Buildersoft.Andy.X.Storage.Core.Repository.Consumers +{ + public class ConsumerConnectionRepository : IConsumerConnectionRepository + { + private readonly ConcurrentDictionary _consumersConnected; + + public ConsumerConnectionRepository() + { + _consumersConnected = new ConcurrentDictionary(); + } + + public Consumer GetConsumerById(string id) + { + return _consumersConnected.ContainsKey(id) + ? _consumersConnected[id] + : null; + } + + public void AddConsumer(string id, Consumer consumer) + { + _consumersConnected.TryAdd(id, consumer); + } + + public void AddConsumerConnection(string id) + { + if (_consumersConnected.ContainsKey(id)) + _consumersConnected[id].Connections.Add(Guid.NewGuid()); + } + + public void RemoveConsumer(string id) + { + if (!_consumersConnected.ContainsKey(id)) + return; + + if (_consumersConnected[id].Connections.Count == 0) + _consumersConnected.TryRemove(id, out _); + } + + public void RemoveConsumerConnection(string id) + { + if (!_consumersConnected.ContainsKey(id)) + return; + + if (_consumersConnected[id].Connections.Count != 0) + _consumersConnected[id].Connections.RemoveAt(0); + } + } +} \ No newline at end of file diff --git a/src/Storage.Core/Service/System/SystemService.cs b/src/Storage.Core/Service/System/SystemService.cs index 19ac4a8..ba98b42 100644 --- a/src/Storage.Core/Service/System/SystemService.cs +++ b/src/Storage.Core/Service/System/SystemService.cs @@ -1,4 +1,5 @@ using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Connection; +using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers; using Buildersoft.Andy.X.Storage.Core.Service.XNodes; using Buildersoft.Andy.X.Storage.IO.Locations; using Buildersoft.Andy.X.Storage.IO.Readers; @@ -23,6 +24,8 @@ public class SystemService private readonly ProducerIOService _producerIOService; private readonly ConsumerIOService _consumerIOService; private readonly MessageIOService _messageIOService; + private readonly IConsumerConnectionRepository _consumerConnectionRepository; + private readonly List nodes; private readonly DataStorageConfiguration dataStorage; private readonly AgentConfiguration agent; @@ -38,7 +41,8 @@ public SystemService( TenantIOService tenantIOService, ProducerIOService producerIOService, ConsumerIOService consumerIOService, - MessageIOService messageIOService) + MessageIOService messageIOService, + IConsumerConnectionRepository consumerConnectionRepository) { _logger = logger; _serviceProvider = serviceProvider; @@ -49,6 +53,8 @@ public SystemService( _producerIOService = producerIOService; _consumerIOService = consumerIOService; _messageIOService = messageIOService; + _consumerConnectionRepository = consumerConnectionRepository; + nodes = _serviceProvider.GetService(typeof(List)) as List; dataStorage = _serviceProvider.GetService(typeof(DataStorageConfiguration)) as DataStorageConfiguration; agent = _serviceProvider.GetService(typeof(AgentConfiguration)) as AgentConfiguration; @@ -142,7 +148,8 @@ private void InitializeServices() _tenantIOService, _producerIOService, _consumerIOService, - _messageIOService); + _messageIOService, + _consumerConnectionRepository); } } else diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 4566695..94f8778 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -1,4 +1,5 @@ -using Buildersoft.Andy.X.Storage.Core.Service.System; +using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers; +using Buildersoft.Andy.X.Storage.Core.Service.System; using Buildersoft.Andy.X.Storage.IO.Locations; using Buildersoft.Andy.X.Storage.IO.Services; using Buildersoft.Andy.X.Storage.Model.App.Consumers; @@ -29,6 +30,8 @@ public class ConsumerEventHandler private readonly ConsumerIOService _consumerIOService; private readonly MessageIOService _messageIOService; private readonly PartitionConfiguration _partitionConfiguration; + private readonly IConsumerConnectionRepository _consumerConnectionRepository; + private readonly ConcurrentDictionary _unacknowledgedMessageProcesses; public ConsumerEventHandler( @@ -36,13 +39,15 @@ public ConsumerEventHandler( XNodeEventService xNodeEventService, ConsumerIOService consumerIOService, MessageIOService messageIOService, - PartitionConfiguration partitionConfiguration) + PartitionConfiguration partitionConfiguration, + IConsumerConnectionRepository consumerConnectionRepository) { _logger = logger; _xNodeEventService = xNodeEventService; _consumerIOService = consumerIOService; _messageIOService = messageIOService; _partitionConfiguration = partitionConfiguration; + _consumerConnectionRepository = consumerConnectionRepository; _unacknowledgedMessageProcesses = new ConcurrentDictionary(); @@ -59,7 +64,8 @@ private void InitializeEvents() private async void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj) { - _consumerIOService.TryCreateConsumerDirectory(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Consumer() + string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); + var consumer = new Consumer() { Id = obj.Id, Name = obj.ConsumerName, @@ -70,28 +76,33 @@ private async void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj SubscriptionType = obj.SubscriptionType, ConsumerSettings = new ConsumerSettings() { InitialPosition = obj.InitialPosition }, CreatedDate = DateTime.Now - }); + }; + _consumerIOService.TryCreateConsumerDirectory(obj.Tenant, obj.Product, obj.Component, obj.Topic, consumer); + + _consumerConnectionRepository.AddConsumer(consumerKey, consumer); + _consumerConnectionRepository.AddConsumerConnection(consumerKey); // notify other nodes in cluster that a consumer has been disconnected await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() { ConnectionType = ConnectionType.Connected, - Id = obj.Id, - SubscriptionType = obj.SubscriptionType, + Tenant = obj.Tenant, + Product = obj.Product, Component = obj.Component, + Topic = obj.Topic, + Id = obj.Id, ConsumerName = obj.ConsumerName, + SubscriptionType = obj.SubscriptionType, InitialPosition = InitialPosition.Latest, - Product = obj.Product, - Tenant = obj.Tenant, - Topic = obj.Topic, }); } private async void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedArgs obj) { - _consumerIOService.WriteDisconnectedConsumerLog(obj.Tenant, obj.Product, obj.Component, obj.Topic, new Consumer() + string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); + var consumer = new Consumer() { Id = obj.Id, Name = obj.ConsumerName, @@ -99,13 +110,21 @@ private async void XNodeEventService_ConsumerDisconnected(ConsumerDisconnectedAr Product = obj.Product, Component = obj.Component, Topic = obj.Topic, - CreatedDate = DateTime.Now, SubscriptionType = obj.SubscriptionType, - }); - string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); + CreatedDate = DateTime.Now + }; + _consumerIOService.WriteDisconnectedConsumerLog(obj.Tenant, obj.Product, obj.Component, obj.Topic, consumer); - if (obj.SubscriptionType != SubscriptionType.Shared) - ReleaseUnacknoledgedMessageTasks(consumerKey); + var consumerState = _consumerConnectionRepository.GetConsumerById(consumerKey); + _consumerConnectionRepository.RemoveConsumerConnection(consumerKey); + + // We will not remove the consumer, only when the sending of msgs is done + // _consumerConnectionRepository.RemoveConsumer(consumerKey); + + if (obj.SubscriptionType != SubscriptionType.Shared && consumerState.StorageStateProperty.IsNewConsumer == false) + { + ReleaseUnacknoledgedMessageTasks(consumerKey, true); + } // notify other nodes in cluster that a consumer has been disconnected await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() @@ -137,7 +156,8 @@ private void XNodeEventService_ConsumerUnacknowledgedMessagesRequested(ConsumerC return; // We are adding the task to the Dictionary, when the task is done - _unacknowledgedMessageProcesses.TryAdd(consumerKey, Task.Run(async () => await TransmitUnacknowledgedMessages(obj))); + if (!_unacknowledgedMessageProcesses.ContainsKey(consumerKey)) + _unacknowledgedMessageProcesses.TryAdd(consumerKey, Task.Run(async () => await TransmitUnacknowledgedMessages(obj))); } private async Task TransmitUnacknowledgedMessages(ConsumerConnectedArgs obj) @@ -188,25 +208,30 @@ private async Task TransmitUnacknowledgedMessages(ConsumerConnectedArgs obj) } } - await AnalysePartitionFiles(obj, partitionFiles, isNewConsumer, unackedMessages); + _consumerConnectionRepository.GetConsumerById(consumerKey).StorageStateProperty.IsNewConsumer = isNewConsumer; + await AnalysePartitionFiles(obj, partitionFiles, unackedMessages); } catch (Exception ex) { _logger.LogError($"Couldn't sent unacknoledge messages to consumer '{obj.ConsumerName}' at {consumerKey.Replace("~", "/")}; errorDetails = {ex.Message}"); } - ReleaseUnacknoledgedMessageTasks(consumerKey); + ReleaseUnacknoledgedMessageTasks(consumerKey, true); } - private void ReleaseUnacknoledgedMessageTasks(string consumerKey) + private void ReleaseUnacknoledgedMessageTasks(string consumerKey, bool forceRelease = true) { if (_unacknowledgedMessageProcesses.ContainsKey(consumerKey) != true) return; - _logger.LogWarning($"Unacknowledged message transmitter for '{consumerKey.Replace("~", "/")}' has been released"); + if (forceRelease == true) + { + _logger.LogWarning($"Unacknowledged message transmitter for '{consumerKey.Replace("~", "/")}' has been released"); - _unacknowledgedMessageProcesses[consumerKey].Dispose(); - _unacknowledgedMessageProcesses.TryRemove(consumerKey, out _); + _consumerConnectionRepository.RemoveConsumer(consumerKey); + _unacknowledgedMessageProcesses[consumerKey].Dispose(); + _unacknowledgedMessageProcesses.TryRemove(consumerKey, out _); + } } private void CheckPointerDbConnection(ConsumerPointerContext tenantContext, string consumerKey) @@ -225,8 +250,10 @@ private void CheckPointerDbConnection(ConsumerPointerContext tenantContext, stri } } - private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List partitionFiles, bool isNewConsumer, IEnumerable unackedMessages) + private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List partitionFiles, IEnumerable unackedMessages) { + string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); + var isNewConsumer = _consumerConnectionRepository.GetConsumerById(consumerKey).StorageStateProperty.IsNewConsumer; foreach (var paritionFile in partitionFiles) { // here we do check partitions db messages.... @@ -248,7 +275,7 @@ private async Task AnalysePartitionFiles(ConsumerConnectedArgs obj, List rows, DateTime partitionDate, bool isNewConsumer = false) + private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List rows, DateTime partitionDate) { + string consumerKey = GenerateConsumerKey(obj.Tenant, obj.Product, obj.Component, obj.Topic, obj.ConsumerName); + var isNewConsumer = _consumerConnectionRepository.GetConsumerById(consumerKey).StorageStateProperty.IsNewConsumer; + if (isNewConsumer == true) CachePointers(obj, rows, partitionDate); @@ -287,9 +317,14 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List>() } }); - await SendToNodes(consumerMessages); + + if (_consumerConnectionRepository.GetConsumerById(consumerKey) != null) + await SendToNodes(consumerMessages); } - await SendToNodes(consumerMessages, true); + if (_consumerConnectionRepository.GetConsumerById(consumerKey) != null) + await SendToNodes(consumerMessages, true); + + consumerMessages.Clear(); } private async Task SendToNodes(List consumerMessages, bool sendTheRest = false) @@ -322,7 +357,7 @@ private async Task SendToNodes(List consumerMessages, bool send private void CachePointers(ConsumerConnectedArgs obj, List rows, DateTime partitionDate) { // Unacknowledge message, add to the pointer, and send - _logger.LogInformation($"Pointers are caching for {obj.ConsumerName}"); + _logger.LogInformation($"Pointers are caching for {obj.ConsumerName} at {obj.Tenant}/{obj.Product}/{obj.Component}/{obj.Topic}"); for (int i = 0; i < rows.Count; i++) { _consumerIOService.WriteMessageAcknowledged(new Model.Events.Messages.MessageAcknowledgedArgs() diff --git a/src/Storage.Core/Service/XNodes/XNodeEventService.cs b/src/Storage.Core/Service/XNodes/XNodeEventService.cs index 05bce23..4e5257b 100644 --- a/src/Storage.Core/Service/XNodes/XNodeEventService.cs +++ b/src/Storage.Core/Service/XNodes/XNodeEventService.cs @@ -1,4 +1,5 @@ using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Connection; +using Buildersoft.Andy.X.Storage.Core.Abstraction.Repository.Consumers; using Buildersoft.Andy.X.Storage.Core.Provider; using Buildersoft.Andy.X.Storage.Core.Service.System; using Buildersoft.Andy.X.Storage.Core.Service.XNodes.Handlers; @@ -29,6 +30,8 @@ public class XNodeEventService private readonly ProducerIOService producerIOService; private readonly ConsumerIOService consumerIOService; private readonly MessageIOService messageIOService; + private readonly IConsumerConnectionRepository consumerConnectionRepository; + private HubConnection _connection; public event Action StorageConnected; @@ -84,7 +87,8 @@ public XNodeEventService(ILogger logger, TenantIOService tenantIOService, ProducerIOService producerIOService, ConsumerIOService consumerIOService, - MessageIOService messageIOService) + MessageIOService messageIOService, + IConsumerConnectionRepository consumerConnectionRepository) { this.logger = logger; this.xNodeConnectionRepository = xNodeConnectionRepository; @@ -92,9 +96,12 @@ public XNodeEventService(ILogger logger, this.producerIOService = producerIOService; this.consumerIOService = consumerIOService; this.messageIOService = messageIOService; + this.consumerConnectionRepository = consumerConnectionRepository; this.agentId = agentId; this.nodeConfig = nodeConfig; this.partitionConfiguration = partitionConfiguration; + + var provider = new XNodeConnectionProvider(nodeConfig, dataStorageConfig, agentConfiguration, agentId); _connection = provider.GetHubConnection(); @@ -167,7 +174,7 @@ private void InitializeEventHandlers() agentEventHandler = new AgentEventHandler(logger, this, tenantIOService); tenantEventHandler = new TenantEventHandler(logger, this, tenantIOService); producerEventHandler = new ProducerEventHandler(logger, this, producerIOService); - consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService, partitionConfiguration); + consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService, partitionConfiguration, consumerConnectionRepository); messageEventHandler = new MessageEventHandler(logger, this, messageIOService); } diff --git a/src/Storage.Model/App/Consumers/Consumer.cs b/src/Storage.Model/App/Consumers/Consumer.cs index 9032e75..6bcb6b7 100644 --- a/src/Storage.Model/App/Consumers/Consumer.cs +++ b/src/Storage.Model/App/Consumers/Consumer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace Buildersoft.Andy.X.Storage.Model.App.Consumers { @@ -13,18 +14,38 @@ public class Consumer public string Name { get; set; } public SubscriptionType SubscriptionType { get; set; } + // we are adding a list of consumer connection, is needed for failover and shared subscriptions + public List Connections { get; set; } + public DateTime CreatedDate { get; set; } public ConsumerSettings ConsumerSettings { get; set; } + + public StorageState StorageStateProperty { get; set; } + public Consumer() { ConsumerSettings = new ConsumerSettings(); + Connections = new List(); + + + StorageStateProperty = new StorageState(); } + } + public class StorageState + { + public bool IsNewConsumer { get; set; } + + public StorageState() + { + IsNewConsumer = false; + } } public class ConsumerSettings { public InitialPosition InitialPosition { get; set; } + public ConsumerSettings() { InitialPosition = InitialPosition.Latest; @@ -37,10 +58,12 @@ public enum SubscriptionType /// Only one reader /// Exclusive, + /// /// One reader with one backup /// Failover, + /// /// Shared to more than one reader. /// @@ -52,4 +75,4 @@ public enum InitialPosition Earliest, Latest } -} +} \ No newline at end of file From 85b6f70c19c95556e0ae1fe610c7d8b1a061c0cb Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sun, 13 Mar 2022 20:38:32 +0100 Subject: [PATCH 26/32] bugfix: add consumer conenction to di --- src/Storage.App/Startup.cs | 1 + .../Repository/Consumers/ConsumerConnectionRepository.cs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storage.App/Startup.cs b/src/Storage.App/Startup.cs index f082ea0..c3a87b9 100644 --- a/src/Storage.App/Startup.cs +++ b/src/Storage.App/Startup.cs @@ -27,6 +27,7 @@ public void ConfigureServices(IServiceCollection services) services.AddConfigurations(Configuration); services.AddSerilogLoggingConfiguration(Configuration); services.AddNodeServiceRepository(); + services.AddConsumerConnectionRepository(); services.AddIOServices(); services.AddStartService(); diff --git a/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs b/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs index ab3abe7..91bff0b 100644 --- a/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs +++ b/src/Storage.Core/Repository/Consumers/ConsumerConnectionRepository.cs @@ -50,4 +50,4 @@ public void RemoveConsumerConnection(string id) _consumersConnected[id].Connections.RemoveAt(0); } } -} \ No newline at end of file +} From 7aed6dce79bd671d9a707f24c64f7d04c3b95a30 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sun, 13 Mar 2022 21:00:00 +0100 Subject: [PATCH 27/32] fix: Send Messages to node if there are already connections for consumer connected --- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 94f8778..5dee24b 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -318,10 +318,10 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List Date: Fri, 22 Apr 2022 22:39:56 +0200 Subject: [PATCH 28/32] v2.1/bug/98 Messages are not caching in the consumers when messages are stored Update AutoFlushUnacknowledgedBatchPointers at ConsumerConnector --- src/Storage.App/appsettings.json | 3 +-- src/Storage.IO/Connectors/ConsumerConnector.cs | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Storage.App/appsettings.json b/src/Storage.App/appsettings.json index 1a5a464..2a19420 100644 --- a/src/Storage.App/appsettings.json +++ b/src/Storage.App/appsettings.json @@ -20,8 +20,7 @@ "XNodes": [ { - //"ServiceUrl": "https://localhost:6541", - "ServiceUrl": "http://localhost:9001", + "ServiceUrl": "https://localhost:6541", "Subscription": 1, "JwtToken": "na", "Username": "admin", diff --git a/src/Storage.IO/Connectors/ConsumerConnector.cs b/src/Storage.IO/Connectors/ConsumerConnector.cs index c024372..95b4e59 100644 --- a/src/Storage.IO/Connectors/ConsumerConnector.cs +++ b/src/Storage.IO/Connectors/ConsumerConnector.cs @@ -162,12 +162,12 @@ private void RemoveRegisteredFromDictionary(ConcurrentDictionary= _partitionConfiguration.SizeInMemory) @@ -175,7 +175,6 @@ private void AutoFlushUnacknowledgedBatchPointers(bool flushAnyway = false) var batchToInsert = new List(BatchUnacknowledgedConsumerMessagesToMerge.Values); ConsumerPointerContext.BulkInsertOrUpdate(BatchUnacknowledgedConsumerMessagesToMerge.Values.ToList()); RemoveRegisteredFromDictionary(BatchUnacknowledgedConsumerMessagesToMerge, batchToInsert); - } } else From fea13991c4c59464fffa4a8733da1da718fdd715 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Sat, 23 Apr 2022 01:10:07 +0200 Subject: [PATCH 29/32] v2.1/feature/100 Send messages to consumer after partition is cached Add condition on ConsumerEventHandler to check if consumer is connected --- .../XNodes/Handlers/ConsumerEventHandler.cs | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 5dee24b..509f495 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -297,34 +297,37 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List(); - foreach (var row in rows) + // Check if the consumer is connected, if yes send messages. + if (IsConsumerConnected(consumerKey)) { - consumerMessages.Add(new ConsumerMessage() + var consumerMessages = new List(); + foreach (var row in rows) { - Consumer = obj.ConsumerName, - Message = new Message() + consumerMessages.Add(new ConsumerMessage() { - Tenant = obj.Tenant, - Product = obj.Product, - Component = obj.Component, - Topic = obj.Topic, - Id = row.MessageId, + Consumer = obj.ConsumerName, + Message = new Message() + { + Tenant = obj.Tenant, + Product = obj.Product, + Component = obj.Component, + Topic = obj.Topic, + Id = row.MessageId, - SentDate = row.SentDate, + SentDate = row.SentDate, - MessageRaw = row.Payload.JsonToObject(), - Headers = row.Headers.JsonToObject>() - } - }); + MessageRaw = row.Payload.JsonToObject(), + Headers = row.Headers.JsonToObject>() + } + }); - if (_consumerConnectionRepository.GetConsumerById(consumerKey).Connections.Count != 0) - await SendToNodes(consumerMessages); + if (IsConsumerConnected(consumerKey)) + await SendToNodes(consumerMessages); + } + if (IsConsumerConnected(consumerKey)) + await SendToNodes(consumerMessages, true); + consumerMessages.Clear(); } - if (_consumerConnectionRepository.GetConsumerById(consumerKey).Connections.Count != 0) - await SendToNodes(consumerMessages, true); - - consumerMessages.Clear(); } private async Task SendToNodes(List consumerMessages, bool sendTheRest = false) @@ -419,10 +422,16 @@ private async Task NotifyNodesForConsumerConnection(NotifyConsumerConnection obj } } - private string GenerateConsumerKey(string tenant, string product, string component, string topic, string consumer) { return $"{tenant}~{product}~{component}~{topic}~{consumer}"; } + + private bool IsConsumerConnected(string consumerKey) + { + if (_consumerConnectionRepository.GetConsumerById(consumerKey).Connections.Count != 0) + return true; + return false; + } } } From e4abecef42b77726f69810896e8b147afc08e775 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 3 May 2022 00:22:14 +0200 Subject: [PATCH 30/32] v2.1/feature/102 Change Payload and Headers to JSON type --- src/Storage.IO/Services/SystemIOService.cs | 2 +- src/Storage.Model/Entities/Message.cs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storage.IO/Services/SystemIOService.cs b/src/Storage.IO/Services/SystemIOService.cs index 0956ddb..2e3a4e0 100644 --- a/src/Storage.IO/Services/SystemIOService.cs +++ b/src/Storage.IO/Services/SystemIOService.cs @@ -17,7 +17,7 @@ public SystemIOService(ILogger logger) Console.Write(" ###"); Console.ForegroundColor = generalColor; Console.WriteLine(" ###"); Console.ForegroundColor = ConsoleColor.Red; Console.Write(" ###"); Console.ForegroundColor = generalColor; Console.Write(" ###"); - Console.WriteLine(" Andy X Storage 2.1.0-preview. Copyright (C) 2022 Buildersoft LLC"); + Console.WriteLine(" Andy X Storage 2.1.0. Copyright (C) 2022 Buildersoft LLC"); Console.ForegroundColor = ConsoleColor.Red; Console.Write(" #### "); Console.ForegroundColor = generalColor; Console.WriteLine("Licensed under the Apache License 2.0. See https://bit.ly/3DqVQbx"); Console.ForegroundColor = ConsoleColor.Red; diff --git a/src/Storage.Model/Entities/Message.cs b/src/Storage.Model/Entities/Message.cs index b86652f..c398662 100644 --- a/src/Storage.Model/Entities/Message.cs +++ b/src/Storage.Model/Entities/Message.cs @@ -1,6 +1,6 @@ using System; -using System.Collections.Generic; using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations.Schema; namespace Buildersoft.Andy.X.Storage.Model.Entities { @@ -8,8 +8,13 @@ public class Message { [Key] public Guid MessageId { get; set; } + + [Column(TypeName = "json")] public string Payload { get; set; } + + [Column(TypeName = "json")] public string Headers { get; set; } + public DateTime StoredDate { get; set; } public DateTime SentDate { get; set; } } From ee5945ea6774a4dfab15f57ca3f394bceade2f5e Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 3 May 2022 00:25:50 +0200 Subject: [PATCH 31/32] prepare for release --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bfefa3b..8ac5e7c 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ -What is Andy X Data Storage? +What is Andy X Storage? ============ -Andy X Data Storage is an open-source standalone service that is used to store messages for Andy X. The Data Storage service is offers support for Multitenancy storage. X Data Storage hosts all messages and makes sure that all of them are readable for the client. +Andy X Storage is an open-source standalone service that is used to store messages for Andy X. The Data Storage service is offers support for Multitenancy storage. X Data Storage hosts all messages and makes sure that all of them are readable for the client. ## Get Started From 653e1ef09cf5734321793d8c932070b5050080ec Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 6 May 2022 19:15:14 +0200 Subject: [PATCH 32/32] Preparing for release --- src/Storage.App/appsettings.json | 2 +- .../Service/XNodes/Handlers/ConsumerEventHandler.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storage.App/appsettings.json b/src/Storage.App/appsettings.json index 2a19420..8ee487c 100644 --- a/src/Storage.App/appsettings.json +++ b/src/Storage.App/appsettings.json @@ -20,7 +20,7 @@ "XNodes": [ { - "ServiceUrl": "https://localhost:6541", + "ServiceUrl": "http://localhost:6540", "Subscription": 1, "JwtToken": "na", "Username": "admin", diff --git a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs index 509f495..1278470 100644 --- a/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs +++ b/src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs @@ -82,7 +82,7 @@ private async void XNodeEventService_ConsumerConnected(ConsumerConnectedArgs obj _consumerConnectionRepository.AddConsumer(consumerKey, consumer); _consumerConnectionRepository.AddConsumerConnection(consumerKey); - // notify other nodes in cluster that a consumer has been disconnected + // notify other nodes in cluster that a consumer has been connected await NotifyNodesForConsumerConnection(new NotifyConsumerConnection() { ConnectionType = ConnectionType.Connected,