Skip to content

Commit

Permalink
Allow creating NpgmqClient using a connection object, to allow for be…
Browse files Browse the repository at this point in the history
…tter control over connection lifetime and to support transactions.
  • Loading branch information
brianpursley committed Jun 28, 2024
1 parent 9bcead5 commit 06b9772
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 271 deletions.
58 changes: 45 additions & 13 deletions Npgmq.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Npgmq;
using Npgsql;

var configuration = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddUserSecrets(Assembly.GetExecutingAssembly())
.Build();

var npgmq = new NpgmqClient(configuration.GetConnectionString("ExampleDB")!);
var connectionString = configuration.GetConnectionString("ExampleDB")!;

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
// Test Npgmq with connection string
{
Foo = "Test",
Bar = 123
});
Console.WriteLine($"Sent message with id {msgId}");
var npgmq = new NpgmqClient(connectionString);

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection string test",
Bar = 1
});
Console.WriteLine($"Sent message with id {msgId}");

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
// Test Npgmq with connection object and a transaction
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
var npgmq = new NpgmqClient(connection);

await using (var tx = connection.BeginTransaction())
{
var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection object test",
Bar = 2
});
Console.WriteLine($"Sent message with id {msgId}");

await tx.CommitAsync();
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

internal class MyMessageType
Expand Down
107 changes: 105 additions & 2 deletions Npgmq.Test/NpgmqClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public NpgmqClientTest()
.Build();

_connectionString = configuration.GetConnectionString("Test")!;

_connection = new NpgsqlConnection(_connectionString);
_sut = new NpgmqClient(_connectionString);
_sut = new NpgmqClient(_connection);
}

public void Dispose()
Expand Down Expand Up @@ -545,6 +544,47 @@ public async Task ReadBatchAsync_should_return_list_of_messages()
});
}

[Fact]
public async Task Client_s()
{
// Arrange
await ResetTestQueueAsync();

// Act
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task ConnectionString_should_be_used_to_connect()
{
// Arrange
await ResetTestQueueAsync();
var sut2 = new NpgmqClient(_connectionString);

// Act
var msgId = await sut2.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_add_message()
{
Expand All @@ -565,6 +605,69 @@ public async Task SendAsync_should_add_message()
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_commit_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.CommitAsync();

// Assert
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await connection2.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_rollback_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.RollbackAsync();

// Assert
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
}

[Fact]
public async Task SendAsync_should_add_string_message()
{
Expand Down
1 change: 1 addition & 0 deletions Npgmq.sln
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
LICENSE = LICENSE
README.md = README.md
global.json = global.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{8C37002D-05C6-4B1F-B4FC-C2F45C5E5328}"
Expand Down
Loading

0 comments on commit 06b9772

Please sign in to comment.