Skip to content

Commit

Permalink
fix: issues with caches (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxirmx authored Sep 15, 2024
1 parent a57ab67 commit 5a3fe02
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 106 deletions.
3 changes: 2 additions & 1 deletion dkgNode/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft.Hosting.Lifetime": "Warning"
"Microsoft.Hosting.Lifetime": "Warning",
"Microsoft.EntityFrameworkCore": "Warning"
}
}
}
12 changes: 9 additions & 3 deletions dkgNodeLibrary/Services/DkgNodeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ public async Task RunDkg(HttpClient httpClient, string[] encodedPublicKeys, Canc
Name, PublicKeys.Length, Round);
Status = WaitingStepTwo;

if (dieOnStep2) Thread.Sleep(4000);
if (dieOnStep2)
{
Thread.Sleep(40000);
}
statusResponse = await ReportStatus(httpClient, encodedDeals);
if (!ShallContinue([WaitingStepTwo, RunningStepTwo], stoppingToken)) return;

Expand All @@ -282,7 +285,7 @@ public async Task RunDkg(HttpClient httpClient, string[] encodedPublicKeys, Canc
Name, PublicKeys.Length, Round);
Status = WaitingStepThree;

if (dieOnStep2) Thread.Sleep(4000);
if (dieOnStep2) Thread.Sleep(40000);
statusResponse = await ReportStatus(httpClient, encodedResponses);
if (!ShallContinue([WaitingStepThree, RunningStepThree], stoppingToken)) return;

Expand All @@ -296,7 +299,10 @@ public async Task RunDkg(HttpClient httpClient, string[] encodedPublicKeys, Canc
if (!await ReportStatusAndCheck(httpClient, [RunningStepThree], stoppingToken)) return;
RunDkgStepThree(statusResponse.Data);


if (dieOnStep3)
{
Thread.Sleep(4000);
}
DistributedPublicKey = null;
string[] encodedResult = [];

Expand Down
37 changes: 23 additions & 14 deletions dkgServiceNode/Controllers/OpsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

using Solnet.Wallet;
using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace dkgServiceNode.Controllers
{
Expand Down Expand Up @@ -315,21 +316,28 @@ internal async Task<ObjectResult> AcceptFinished(Round? round, Node node, NodesR
return _500UndefinedRound();
}

runner.SetResultWaitingTime(round);

if (stReport.Data.Length != 0)
if (node.Status == NStatus.Finished)
{
runner.SetResult(round, node, stReport.Data);
await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id);
await UpdateRoundState(round);
return Accepted(CreateStatusResponse(round, lastRoundHistory, stReport.Status, node.Random));
return Ok(CreateStatusResponse(round, lastRoundHistory, NStatus.Finished, node.Random));
}
else
{
runner.SetNoResult(round, node);
await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id);
await UpdateRoundState(round);
return _400NoResult(round.Id, node.Name, node.PublicKey);
runner.SetResultWaitingTime(round);

if (stReport.Data.Length != 0)
{
runner.SetResult(round, node, stReport.Data);
await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id);
await UpdateRoundState(round);
return Accepted(CreateStatusResponse(round, lastRoundHistory, stReport.Status, node.Random));
}
else
{
runner.SetNoResult(round, node);
await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id);
await UpdateRoundState(round);
return _400NoResult(round.Id, node.Name, node.PublicKey);
}
}
}
internal async Task<ObjectResult> AcceptFailed(Round? round, Node node, NodesRoundHistory? lastRoundHistory, StatusReport stReport)
Expand Down Expand Up @@ -514,15 +522,16 @@ public async Task<ActionResult<StatusResponse>> Status(StatusReport statusReport
RStatus? rStatus = null;
if (round != null)
{
await UpdateRoundState(round);
// await UpdateRoundState(round);
rStatus = round.Status;
}

if (actionMap.TryGetValue((rStatus, statusReport.Status), out var function))
{
logger.LogDebug("State transition round [{id}] node [{name}] : ({rStatus}, {nStatus}) -> {f}",
logger.LogDebug("State transition round [{id}] node [{name}] : ({rStatus}, {nStatus}) -> {f}(Data: {data})",
(round != null ? round.Id.ToString() : "null"),
node.Name, rStatus, statusReport.Status, function.Method.Name);
node.Name, rStatus, statusReport.Status, function.Method.Name,
statusReport.Data.Length == 0 ? "empty" : statusReport.Data);
res = await function(round, node, lastRoundHistory, statusReport);

}
Expand Down
78 changes: 26 additions & 52 deletions dkgServiceNode/Data/DbEnsure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace dkgServiceNode.Data
{
public static class DbEnsure
{
readonly static string sqlScript_0_8_0 = @"
readonly static string sqlScript_0_12_1 = @"
START TRANSACTION;
DROP TABLE IF EXISTS ""users"";
Expand Down Expand Up @@ -91,50 +91,6 @@ public static class DbEnsure
CREATE INDEX ""idx_nodes_round_history_round_id"" ON ""nodes_round_history"" (""round_id"");
CREATE INDEX ""idx_nodes_round_history_node_id"" ON ""nodes_round_history"" (""node_id"");
CREATE OR REPLACE FUNCTION update_nodes_round_history() RETURNS TRIGGER AS $$
BEGIN
IF OLD.round_id IS NOT NULL AND NEW.round_id IS NULL THEN
-- Check if a record already exists in nodes_round_history
IF EXISTS (SELECT 1 FROM nodes_round_history WHERE node_id = OLD.id AND round_id = OLD.round_id) THEN
-- Update the existing record
UPDATE nodes_round_history
SET node_final_status = OLD.status
WHERE node_id = OLD.id AND round_id = OLD.round_id;
ELSE
-- Insert a new record
INSERT INTO nodes_round_history (round_id, node_id, node_final_status, node_random)
VALUES (OLD.round_id, OLD.id, OLD.status, OLD.random);
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER nodes_before_update_trigger
BEFORE UPDATE ON nodes
FOR EACH ROW
EXECUTE PROCEDURE update_nodes_round_history();
DROP TABLE IF EXISTS ""versions"";
CREATE TABLE ""versions"" (
""id"" SERIAL PRIMARY KEY,
""version"" VARCHAR(16) NOT NULL,
""date"" DATE NOT NULL DEFAULT now()
);
INSERT INTO ""versions"" (""version"", ""date"") VALUES
('0.8.0', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"');
COMMIT;
";

readonly static string sqlScript_0_12_1 = @"
START TRANSACTION;
DROP TRIGGER IF EXISTS nodes_before_update_trigger ON nodes;
DROP FUNCTION IF EXISTS update_nodes_round_history();
CREATE OR REPLACE PROCEDURE upsert_node_round_history(
p_node_id INT,
p_round_id INT,
Expand Down Expand Up @@ -163,6 +119,14 @@ INSERT INTO nodes_round_history(node_id, round_id, node_final_status, node_rando
END;
$$;
DROP TABLE IF EXISTS ""versions"";
CREATE TABLE ""versions"" (
""id"" SERIAL PRIMARY KEY,
""version"" VARCHAR(16) NOT NULL,
""date"" DATE NOT NULL DEFAULT now()
);
INSERT INTO ""versions"" (""version"", ""date"") VALUES
('0.12.1', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"');
Expand Down Expand Up @@ -199,6 +163,16 @@ GROUP BY
COMMIT;
";

readonly static string sqlScript_0_13_1 = @"
START TRANSACTION;
DROP FUNCTION IF EXISTS update_nodes_round_history();
INSERT INTO ""versions"" (""version"", ""date"") VALUES
('0.13.1', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"');
COMMIT;
";

private static string PuVersionUpdateQuery(string v)
{
return @"
Expand All @@ -220,7 +194,7 @@ private static bool VCheck(string v, NpgsqlConnection connection)
return (rows != null && (long)rows != 0);
}

public static int Ensure_0_8_0(NpgsqlConnection connection)
public static int Ensure_0_12_1(NpgsqlConnection connection)
{
// Check if table 'versions' exists
var sql = "SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'versions';";
Expand All @@ -231,14 +205,14 @@ public static int Ensure_0_8_0(NpgsqlConnection connection)

if (rows != null && (long)rows != 0)
{
sql = "SELECT COUNT(*) FROM versions WHERE version = '0.8.0';";
sql = "SELECT COUNT(*) FROM versions WHERE version = '0.12.1';";
command = new NpgsqlCommand(sql, connection);
rows = command.ExecuteScalar();
}

if (rows == null || (long)rows == 0)
{
var scriptCommand = new NpgsqlCommand(sqlScript_0_8_0, connection);
var scriptCommand = new NpgsqlCommand(sqlScript_0_12_1, connection);
r = scriptCommand.ExecuteNonQuery();
}

Expand All @@ -265,12 +239,12 @@ public static void Ensure(NpgsqlConnection connection, ILogger logger)
{
try
{
logger.LogInformation("Initializing database at 0.8.0");
Ensure_0_8_0(connection);
logger.LogInformation("Update to 0.12.1");
EnsureVersion("0.12.1", sqlScript_0_12_1, connection);
logger.LogInformation("Initializing database at 0.12.1");
Ensure_0_12_1(connection);
logger.LogInformation("Update to 0.13.0");
EnsureVersion("0.13.0", sqlScript_0_13_0, connection);
logger.LogInformation("Update to 0.13.1");
EnsureVersion("0.13.1", sqlScript_0_13_1, connection);
}
catch (Exception ex)
{
Expand Down
2 changes: 1 addition & 1 deletion dkgServiceNode/Data/DkgContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public async Task AddRoundAsync(Round round)
{
Rounds.Add(round);
await SaveChangesAsync();
roundsCache.AddRoundToCache(round);
roundsCache.SaveRoundToCache(round);
}
catch (Exception ex)
{
Expand Down
42 changes: 21 additions & 21 deletions dkgServiceNode/Services/Cache/NodesCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace dkgServiceNode.Services.Cache
public class NodesCache
{
private readonly Dictionary<int, Node> _cacheNodes = new();
private readonly ConcurrentDictionary<string, int> _addressToId = new();
private readonly Dictionary<string, int> _addressToId = new();
private readonly object _cacheNodesLock = new();
public void SaveNodeToCacheNoLock(Node node)
{
Expand Down Expand Up @@ -65,11 +65,18 @@ public void SaveNodeToCache(Node node)

public Node? GetNodeByAddress(string address)
{
if (_addressToId.TryGetValue(address, out var id))
Node? res = null;
lock (_cacheNodesLock)
{
return GetNodeById(id);
if (_addressToId.TryGetValue(address, out var id))
{
if (_cacheNodes.TryGetValue(id, out Node? node))
{
res = new Node(node);
}
}
}
return null;
return res;
}

public List<Node> GetAllNodes()
Expand Down Expand Up @@ -128,32 +135,25 @@ private List<Node> GetFilteredNodesInternal(string search)
return filteredNodes;
}

private void RemoveToIdEntries(ConcurrentDictionary<string, int> dictionary, int nodeId)
public void UpdateNodeInCache(Node node)
{
var keysToRemove = dictionary
.Where(kvp => kvp.Value == nodeId)
.Select(kvp => kvp.Key)
.ToList();

foreach (var key in keysToRemove)
lock (_cacheNodesLock)
{
dictionary.TryRemove(key, out _);
var addr = _cacheNodes[node.Id]?.Address;
if (addr != null && addr!= node.Address)
{
_addressToId.Remove(addr);
}
_cacheNodes[node.Id] = new Node(node);
_addressToId[node.Address] = node.Id;
}
}

public void UpdateNodeInCache(Node node)
{
// Update node in the cache
RemoveToIdEntries(_addressToId, node.Id);
SaveNodeToCache(node);
}

public void DeleteNodeFromCache(Node node)
{
// Remove node from the cache
RemoveToIdEntries(_addressToId, node.Id);
lock (_cacheNodesLock)
{
_addressToId.Remove(node.Address);
_cacheNodes.Remove(node.Id);
}
}
Expand Down
Loading

0 comments on commit 5a3fe02

Please sign in to comment.