Skip to content

Commit

Permalink
Lints and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <dusan@dusanmalusev.dev>
  • Loading branch information
CodeLieutenant committed Apr 16, 2024
1 parent a45e393 commit 6e11ac5
Show file tree
Hide file tree
Showing 21 changed files with 299 additions and 276 deletions.
18 changes: 9 additions & 9 deletions amqp_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package amqp
package amqp_test

import (
"context"
Expand Down Expand Up @@ -33,7 +33,7 @@ func handler(_ context.Context, msg Message) error {
func ExampleConsumer() {
c, err := consumer.NewFunc(handler,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Expand Down Expand Up @@ -63,18 +63,18 @@ func ExampleConsumer() {

type MyHandler struct{}

func (h MyHandler) Handle(ctx context.Context, msg Message) error {
func (h MyHandler) Handle(_ context.Context, msg Message) error {
defer cnt.Add(1)
//nolint:forbidigo
fmt.Printf("[INFO] Message received: %d %s\n", cnt.Load(), msg.Name)
_, _ = fmt.Printf("[INFO] Message received: %d %s\n", cnt.Load(), msg.Name)
return nil
}

func ExampleConsumerWithHandler() {
c, err := consumer.New[Message](MyHandler{},
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
Expand Down Expand Up @@ -110,7 +110,7 @@ func ExampleConsumerWithSignal() {

c, err := consumer.NewFunc(handler,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithContext[Message](ctx),
Expand Down Expand Up @@ -143,7 +143,7 @@ func ExampleConsumerWithSignal() {

type MyRawHandler struct{}

func (h MyRawHandler) Handle(ctx context.Context, d *amqp091.Delivery) error {
func (h MyRawHandler) Handle(_ context.Context, d *amqp091.Delivery) error {
defer cnt.Add(1)
var msg Message

Expand All @@ -158,7 +158,7 @@ func (h MyRawHandler) Handle(ctx context.Context, d *amqp091.Delivery) error {
func Example_ConsumerWithRawHandler() {
c, err := consumer.NewRaw(MyRawHandler{},
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Expand Down
Binary file removed bin/task
Binary file not shown.
53 changes: 24 additions & 29 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ var DefaultConfig = Config{
ReconnectRetry: 10,
Channels: 100,
ReconnectInterval: 1 * time.Second,
FrameSize: 8192,
}

var ErrRetriesExhausted = errors.New("number of retries to acquire connection exhausted")

type (
Connection struct {
cancel context.CancelFunc
conn atomic.Pointer[amqp091.Connection]
*Config
cancel context.CancelFunc
conn atomic.Pointer[amqp091.Connection]
config *Config
onBeforeConnectionReady OnReconnectingFunc
onConnectionReady OnConnectionReady
onError OnErrorFunc
Expand Down Expand Up @@ -60,13 +59,13 @@ type (

func New(ctx context.Context, config Config, events Events) (*Connection, error) {
if events.OnConnectionReady == nil {
return nil, fmt.Errorf("OnConnectionReady is required")
return nil, ErrOnConnectionReady
}

ctx, cancel := context.WithCancel(ctx)

c := &Connection{
Config: &config,
config: &config,
cancel: cancel,
onBeforeConnectionReady: events.OnBeforeConnectionReady,
onConnectionReady: events.OnConnectionReady,
Expand All @@ -78,28 +77,28 @@ func New(ctx context.Context, config Config, events Events) (*Connection, error)
c.connectionDispose()
})

return c.reconnect(ctx, &config)
return c.reconnect(ctx)
}

func (c *Connection) reconnect(ctx context.Context, config *Config) (*Connection, error) {
func (c *Connection) reconnect(ctx context.Context) (*Connection, error) {
connect := c.connect()

if err := connect(ctx); err == nil {
return c, nil
}

var err error
timer := time.NewTimer(config.ReconnectInterval)
timer := time.NewTimer(c.config.ReconnectInterval)
defer timer.Stop()

for i := 0; i < config.ReconnectRetry; i++ {
for i := 0; i < c.config.ReconnectRetry; i++ {
select {
case <-timer.C:
if err = connect(ctx); err == nil {
return c, nil
}

timer.Reset(config.ReconnectInterval)
timer.Reset(c.config.ReconnectInterval)
case <-ctx.Done():
return c, ctx.Err()
}
Expand All @@ -108,10 +107,6 @@ func (c *Connection) reconnect(ctx context.Context, config *Config) (*Connection
return c, err
}

func (c *Connection) hasConnectionClosed(err error) bool {
return !errors.Is(err, amqp091.ErrClosed) && c.conn.Load().IsClosed()
}

func (c *Connection) hasChannelClosed(err error) bool {
return errors.Is(err, amqp091.ErrClosed) && !c.conn.Load().IsClosed()
}
Expand Down Expand Up @@ -139,7 +134,7 @@ func (c *Connection) handleReconnect(ctx context.Context, connection *amqp091.Co

c.connectionDispose()

if _, err := c.reconnect(ctx, c.Config); err != nil {
if _, err := c.reconnect(ctx); err != nil {
return
}
}
Expand All @@ -149,13 +144,13 @@ func (c *Connection) handleReconnect(ctx context.Context, connection *amqp091.Co
func (c *Connection) connect() func(ctx context.Context) error {
connectionURI := fmt.Sprintf(
"amqp://%s:%s@%s",
c.User,
c.Password,
net.JoinHostPort(c.Host, strconv.FormatInt(int64(c.Port), 10)),
c.config.User,
c.config.Password,
net.JoinHostPort(c.config.Host, strconv.FormatInt(int64(c.config.Port), 10)),
)

properties := amqp091.NewConnectionProperties()
properties.SetClientConnectionName(c.ConnectionName)
properties.SetClientConnectionName(c.config.ConnectionName)
if err := properties.Validate(); err != nil {
panic("Invalid connection properties: " + err.Error())
}
Expand All @@ -165,26 +160,26 @@ func (c *Connection) connect() func(ctx context.Context) error {

config := amqp091.Config{
SASL: nil,
Vhost: c.Vhost,
ChannelMax: c.Channels,
FrameSize: c.FrameSize,
Vhost: c.config.Vhost,
ChannelMax: c.config.Channels,
FrameSize: c.config.FrameSize,
Heartbeat: 3 * time.Second,
Properties: properties,
Dial: amqp091.DefaultDial(c.ReconnectInterval),
Dial: amqp091.DefaultDial(c.config.ReconnectInterval),
}

if c.onBeforeConnectionReady != nil {
if err := c.onBeforeConnectionReady(ctx); err != nil {
if c.onError != nil {
c.onError(&OnBeforeConnectError{inner: err})
c.onError(&OnBeforeConnectError{Inner: err})
}
return err
}
}

conn, err := amqp091.DialConfig(connectionURI, config)
if err != nil {
c.onError(&ConnectInitError{inner: err})
c.onError(&ConnectInitError{Inner: err})
return err
}

Expand All @@ -194,7 +189,7 @@ func (c *Connection) connect() func(ctx context.Context) error {

if err = c.onConnectionReady(ctx, conn); err != nil {
if c.onError != nil {
c.onError(&ConnectInitError{inner: err})
c.onError(&ConnectInitError{Inner: err})
}

return err
Expand All @@ -212,7 +207,7 @@ func (c *Connection) connectionDispose() {
}

if err := conn.Close(); err != nil && c.onError != nil {
c.onError(&OnConnectionCloseError{inner: err})
c.onError(&OnConnectionCloseError{Inner: err})
}
}

Expand Down
22 changes: 15 additions & 7 deletions connection/errors.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
package connection

import "fmt"
import (
"errors"
"fmt"
)

var (
ErrOnConnectionReady = errors.New("onConnectionReady is required")
ErrRetriesExhausted = errors.New("number of retries to acquire connection exhausted")
)

type OnBeforeConnectError struct {
inner error
Inner error
}

type ConnectInitError struct {
inner error
Inner error
}

type OnConnectionCloseError struct {
inner error
Inner error
}

func (e OnBeforeConnectError) Error() string {
return fmt.Sprintf("non library error before reconnecting: %v", e.inner)
return fmt.Sprintf("non library error before reconnecting: %v", e.Inner)
}

func (e ConnectInitError) Error() string {
return fmt.Sprintf("non library error after reconnect: %v", e.inner)
return fmt.Sprintf("non library error after reconnect: %v", e.Inner)
}

func (e OnConnectionCloseError) Error() string {
return fmt.Sprintf("error on closing previous connection: %v", e.inner)
return fmt.Sprintf("error on closing previous connection: %v", e.Inner)
}
23 changes: 12 additions & 11 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package consumer
import (
"context"
"errors"
"golang.org/x/sync/semaphore"
"io"
"reflect"

"golang.org/x/sync/semaphore"

"github.com/nano-interactive/go-amqp/v3/connection"
"github.com/nano-interactive/go-amqp/v3/serializer"
)
Expand All @@ -17,7 +18,7 @@ var (
)

type (
Message interface{}
Message any

Consumer[T Message] struct {
watcher *semaphore.Weighted
Expand All @@ -31,7 +32,7 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ..
var msg T

if reflect.ValueOf(msg).Kind() == reflect.Ptr {
return Consumer[T]{}, errors.New("message type must be a value type")
return Consumer[T]{}, ErrMessageTypeInvalid
}

cfg := Config[T]{
Expand All @@ -40,7 +41,7 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ..
Workers: 1,
},
retryCount: 1,
serializer: serializer.JsonSerializer[T]{},
serializer: serializer.JSON[T]{},
ctx: context.Background(),
onError: func(err error) {
if errors.Is(err, connection.ErrRetriesExhausted) {
Expand All @@ -58,11 +59,11 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ..
}

if queueDeclare.QueueName == "" {
return Consumer[T]{}, errors.New("q name is required... Please call WithQueueName(queueName) option function")
return Consumer[T]{}, ErrQueueNameRequired
}

if cfg.onMessageError == nil {
panic("onMessageError is required")
return Consumer[T]{}, ErrOnMessageCallbackRequired
}

return Consumer[T]{
Expand All @@ -74,7 +75,7 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ..
}

func NewRawFunc[T Message](h RawHandlerFunc, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
return NewRaw[T](h, queueDeclare, options...)
return NewRaw(h, queueDeclare, options...)
}

func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
Expand All @@ -90,7 +91,7 @@ func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...
)

if cfg.serializer == nil {
s = serializer.JsonSerializer[T]{}
s = serializer.JSON[T]{}
} else {
s = cfg.serializer
}
Expand All @@ -109,7 +110,7 @@ func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...
rawHandler = privHandler
}

return NewRaw[T](rawHandler, queueDeclare, options...)
return NewRaw(rawHandler, queueDeclare, options...)
}

func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
Expand All @@ -125,7 +126,7 @@ func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T
)

if cfg.serializer == nil {
s = serializer.JsonSerializer[T]{}
s = serializer.JSON[T]{}
} else {
s = cfg.serializer
}
Expand All @@ -144,7 +145,7 @@ func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T
rawHandler = privHandler
}

return NewRaw[T](rawHandler, queueDeclare, options...)
return NewRaw(rawHandler, queueDeclare, options...)
}

func (c Consumer[T]) Close() error {
Expand Down
25 changes: 25 additions & 0 deletions consumer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package consumer

import (
"errors"
"fmt"
)

var (
ErrQueueNameRequired = errors.New("queue name is required... Please call WithQueueName(queueName) option function")
ErrOnMessageCallbackRequired = errors.New("onMessageError is required")
ErrMessageTypeInvalid = errors.New("message type must be a value type")
)

type (
QueueDeclarationError struct{ Inner error }
ListenerStartFailedError struct{ Inner error }
)

func (e *QueueDeclarationError) Error() string {
return fmt.Sprintf("queue declaration error: %v", e.Inner)
}

func (e *ListenerStartFailedError) Error() string {
return fmt.Sprintf("failed to start listener: %v", e.Inner)
}
Loading

0 comments on commit 6e11ac5

Please sign in to comment.