Skip to content

Commit

Permalink
Allow creating NpgmqClient using a connection object, to allow for be…
Browse files Browse the repository at this point in the history
…tter control over connection lifetime and to support transactions.

Added exception handling.
  • Loading branch information
brianpursley committed Jun 28, 2024
1 parent 9bcead5 commit 12f86ed
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 124 deletions.
58 changes: 45 additions & 13 deletions Npgmq.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Npgmq;
using Npgsql;

var configuration = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddUserSecrets(Assembly.GetExecutingAssembly())
.Build();

var npgmq = new NpgmqClient(configuration.GetConnectionString("ExampleDB")!);
var connectionString = configuration.GetConnectionString("ExampleDB")!;

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
// Test Npgmq with connection string
{
Foo = "Test",
Bar = 123
});
Console.WriteLine($"Sent message with id {msgId}");
var npgmq = new NpgmqClient(connectionString);

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection string test",
Bar = 1
});
Console.WriteLine($"Sent message with id {msgId}");

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
// Test Npgmq with connection object and a transaction
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
var npgmq = new NpgmqClient(connection);

await using (var tx = connection.BeginTransaction())
{
var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection object test",
Bar = 2
});
Console.WriteLine($"Sent message with id {msgId}");

await tx.CommitAsync();
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

internal class MyMessageType
Expand Down
89 changes: 86 additions & 3 deletions Npgmq.Test/NpgmqClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public NpgmqClientTest()
.Build();

_connectionString = configuration.GetConnectionString("Test")!;

_connection = new NpgsqlConnection(_connectionString);
_sut = new NpgmqClient(_connectionString);
_sut = new NpgmqClient(_connection);
}

public void Dispose()
Expand Down Expand Up @@ -545,6 +544,27 @@ public async Task ReadBatchAsync_should_return_list_of_messages()
});
}

[Fact]
public async Task ConnectionString_should_be_used_to_connect()
{
// Arrange
await ResetTestQueueAsync();
var sut2 = new NpgmqClient(_connectionString);

// Act
var msgId = await sut2.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_add_message()
{
Expand All @@ -565,6 +585,69 @@ public async Task SendAsync_should_add_message()
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_commit_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.CommitAsync();

// Assert
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await connection2.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_rollback_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.RollbackAsync();

// Assert
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
}

[Fact]
public async Task SendAsync_should_add_string_message()
{
Expand Down Expand Up @@ -640,4 +723,4 @@ public async Task SetVtAsync_should_change_vt_for_a_message()
Assert.NotNull(message2);
Assert.Equal(msgId, message2.MsgId);
}
}
}
1 change: 1 addition & 0 deletions Npgmq.sln
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
LICENSE = LICENSE
README.md = README.md
global.json = global.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{8C37002D-05C6-4B1F-B4FC-C2F45C5E5328}"
Expand Down
Loading

0 comments on commit 12f86ed

Please sign in to comment.