Skip to content

Commit

Permalink
Merge pull request #4 from nano-interactive/feat/new-publisher
Browse files Browse the repository at this point in the history
Refactor connection options in producer
  • Loading branch information
CodeLieutenant authored Apr 23, 2024
2 parents 342f736 + 0a9f6c7 commit bed98fc
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 114 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
with:
go-version: ${{ matrix.go }}
- name: Install Task
uses: arduino/setup-task@v1
uses: arduino/setup-task@v2
with:
version: 3.x
repo-token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -45,7 +45,7 @@ jobs:
go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
task test
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4.0.1
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: nano-interactive/go-amqp
25 changes: 14 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,25 @@ is places in the library so that the users of the library don't even think about

c, err := consumer.NewFunc(
handler,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
connection.Config{
Host: "127.0.0.1",
Port: 5672,
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-consumer",
},
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
)

c.Start(context.Background())
go c.Start(context.Background())

time.Sleep(100*time.Second)
// Wait for some event to exit the Consumer
time.Sleep(30*time.Second)

// c.CloseWithContext(context.Background()) -> for timeouts
c.Close()

```
Expand Down Expand Up @@ -134,13 +136,14 @@ Publising message is simple, the abstraction is very simple

```go
pub, err := publisher.New[Message](
"testing_publisher",
publisher.WithConnectionOptions[Message](connection.Config{
context.TODO(),
connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-publisher",
}),
}
"testing_publisher",
)
if err != nil {
panic(err)
Expand All @@ -156,7 +159,7 @@ message := Message{
Name: "Nano Interactive",
}

if err = pub.Publish(context.Background(), message); err != nil {
if err = pub.Publish(context.TODO(), message); err != nil {
panic(err)
}

Expand Down
8 changes: 2 additions & 6 deletions amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,9 @@ func Example_ConsumerWithRawHandler() {

func ExamplePublisher() {
pub, err := publisher.New[Message](
context.Background(),
connection.DefaultConfig,
"testing_publisher",
publisher.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-publisher",
}),
)
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func main() {

ctx := context.Background()

pub, err := publisher.New(
pub, err := publisher.New[Message](
ctx,
connConfig,
"testing_publisher",
publisher.WithContext[Message](ctx),
publisher.WithConnectionOptions[Message](connConfig),
)
if err != nil {
panic(err)
Expand Down
100 changes: 49 additions & 51 deletions publisher/config.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package publisher

import (
"context"

"github.com/nano-interactive/go-amqp/v3/connection"
"github.com/nano-interactive/go-amqp/v3/serializer"
)

type (
Config[T any] struct {
ctx context.Context
serializer serializer.Serializer[T]
onError connection.OnErrorFunc
exchange ExchangeDeclare
connectionOptions connection.Config
messageBuffering int
// ctx context.Context
serializer serializer.Serializer[T]
onError connection.OnErrorFunc
exchange ExchangeDeclare
// connectionOptions connection.Config
messageBuffering int
}

Option[T any] func(*Config[T])
Expand All @@ -38,49 +36,49 @@ func WithOnErrorFunc[T any](onError connection.OnErrorFunc) Option[T] {
}
}

func WithContext[T any](ctx context.Context) Option[T] {
return func(c *Config[T]) {
c.ctx = ctx
}
}

func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] {
return func(c *Config[T]) {
if connectionOptions.Channels == 0 {
connectionOptions.Channels = connection.DefaultConfig.Channels
}

if connectionOptions.Vhost == "" {
connectionOptions.Vhost = connection.DefaultConfig.Vhost
}

if connectionOptions.Host == "" {
connectionOptions.Host = connection.DefaultConfig.Host
}

if connectionOptions.Port == 0 {
connectionOptions.Port = connection.DefaultConfig.Port
}

if connectionOptions.User == "" {
connectionOptions.User = connection.DefaultConfig.User
}

if connectionOptions.Password == "" {
connectionOptions.Password = connection.DefaultConfig.Password
}

if connectionOptions.ReconnectInterval == 0 {
connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval
}

if connectionOptions.ReconnectRetry == 0 {
connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry
}

c.connectionOptions = connectionOptions
}
}
// func WithContext[T any](ctx context.Context) Option[T] {
// return func(c *Config[T]) {
// c.ctx = ctx
// }
// }

// func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] {
// return func(c *Config[T]) {
// if connectionOptions.Channels == 0 {
// connectionOptions.Channels = connection.DefaultConfig.Channels
// }

// if connectionOptions.Vhost == "" {
// connectionOptions.Vhost = connection.DefaultConfig.Vhost
// }

// if connectionOptions.Host == "" {
// connectionOptions.Host = connection.DefaultConfig.Host
// }

// if connectionOptions.Port == 0 {
// connectionOptions.Port = connection.DefaultConfig.Port
// }

// if connectionOptions.User == "" {
// connectionOptions.User = connection.DefaultConfig.User
// }

// if connectionOptions.Password == "" {
// connectionOptions.Password = connection.DefaultConfig.Password
// }

// if connectionOptions.ReconnectInterval == 0 {
// connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval
// }

// if connectionOptions.ReconnectRetry == 0 {
// connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry
// }

// c.connectionOptions = connectionOptions
// }
// }

func WithBufferedMessages[T any](capacity int) Option[T] {
return func(c *Config[T]) {
Expand Down
48 changes: 31 additions & 17 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"

"github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/semaphore"

"github.com/nano-interactive/go-amqp/v3/connection"
"github.com/nano-interactive/go-amqp/v3/serializer"
Expand All @@ -35,10 +35,10 @@ type (
serializer serializer.Serializer[T]
conn *connection.Connection
ch atomic.Pointer[amqp091.Channel]
semaphore *semaphore.Weighted
cancel context.CancelFunc
exchangeName string
routingKey string
wg sync.WaitGroup
closing atomic.Bool
gettingCh atomic.Bool
}
Expand Down Expand Up @@ -77,19 +77,20 @@ func (p *Publisher[T]) swapChannel(connection *amqp091.Connection, cfg Config[T]
}

func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091.Connection, notifyClose chan *amqp091.Error, cfg Config[T]) {
defer p.wg.Done()
errCh := make(chan error)
defer close(errCh)
var err error

defer func() {
close(errCh)
p.semaphore.Release(1)
p.closing.Store(true)
ch := p.ch.Load()
if !ch.IsClosed() {
_ = ch.Close()
}
}()

var err error

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -121,7 +122,10 @@ func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091.

func (p *Publisher[T]) onConnectionReady(cfg Config[T]) connection.OnConnectionReady {
return func(ctx context.Context, connection *amqp091.Connection) error {
p.wg.Add(1)
if err := p.semaphore.Acquire(ctx, 1); err != nil {
return err
}

notifyClose, err := p.swapChannel(connection, cfg)
if err != nil {
return err
Expand Down Expand Up @@ -150,16 +154,14 @@ func newChannel(
return ch, notifyClose, nil
}

func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error) {
func New[T any](ctx context.Context, connectionOpts connection.Config, exchangeName string, options ...Option[T]) (*Publisher[T], error) {
if exchangeName == "" {
return nil, ErrExchangeNameRequired
}

cfg := Config[T]{
serializer: serializer.JSON[T]{},
messageBuffering: 1,
connectionOptions: connection.DefaultConfig,
ctx: context.Background(),
serializer: serializer.JSON[T]{},
messageBuffering: 1,
exchange: ExchangeDeclare{
name: exchangeName,
RoutingKey: "",
Expand All @@ -183,16 +185,17 @@ func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error
option(&cfg)
}

ctx, cancel := context.WithCancel(cfg.ctx)
ctx, cancel := context.WithCancel(ctx)

publisher := &Publisher[T]{
serializer: cfg.serializer,
cancel: cancel,
exchangeName: exchangeName,
routingKey: cfg.exchange.RoutingKey,
semaphore: semaphore.NewWeighted(1),
cancel: cancel,
}

conn, err := connection.New(ctx, cfg.connectionOptions, connection.Events{
conn, err := connection.New(ctx, connectionOpts, connection.Events{
OnConnectionReady: publisher.onConnectionReady(cfg),
OnBeforeConnectionReady: func(_ context.Context) error {
publisher.gettingCh.Store(true)
Expand All @@ -205,6 +208,7 @@ func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error
}

publisher.conn = conn

return publisher, nil
}

Expand Down Expand Up @@ -239,9 +243,19 @@ func (p *Publisher[T]) Publish(ctx context.Context, msg T, _ ...PublishConfig) e
)
}

func (p *Publisher[T]) Close() error {
func (p *Publisher[T]) CloseWithContext(ctx context.Context) error {
p.closing.Store(true)
p.cancel()
p.wg.Wait()
return p.conn.Close()

if err := p.semaphore.Acquire(ctx, 1); err != nil {
return err
}

defer p.conn.Close()

return nil
}

func (p *Publisher[T]) Close() error {
return p.CloseWithContext(context.Background())
}
Loading

0 comments on commit bed98fc

Please sign in to comment.