Skip to content

Commit

Permalink
Retry initial connection
Browse files Browse the repository at this point in the history
  • Loading branch information
naggingant committed Sep 29, 2021
1 parent b32065f commit e4cde5f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
30 changes: 28 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package amqp
import (
"fmt"
"context"
"time"
"sync"

"github.com/streadway/amqp"
Expand All @@ -12,6 +13,11 @@ import (
"github.com/luraproject/lura/proxy"
)

const (
retryInterval = 3 * time.Second
maxRetries = 15
)

func NewBackendFactory(ctx context.Context, logger logging.Logger, bf proxy.BackendFactory) proxy.BackendFactory {
f := backendFactory{
logger: logger,
Expand Down Expand Up @@ -48,8 +54,28 @@ func (f backendFactory) New(remote *config.Backend) proxy.Proxy {
return f.bf(remote)
}

func (f backendFactory) newChannel(path string) (*amqp.Channel, closer, error) {
conn, err := amqp.Dial(path)
func (f backendFactory) newChannel(ctx context.Context, path string) (*amqp.Channel, closer, error) {
var (
retries int
conn *amqp.Connection
err error
)
for {
conn, err = amqp.Dial(path)
if err == nil {
break
}
retries += 1
f.logger.Error(fmt.Sprintf("AMQP: connection attempt #%d: %s", retries, err.Error()))
if retries > maxRetries {
break
}
select {
case <-time.After(retryInterval):
case <-ctx.Done():
break
}
}
if err != nil {
return nil, nopCloser, err
}
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (f backendFactory) initConsumer(ctx context.Context, remote *config.Backend
return consumerBackend(remote, msgs), nil
}

ch, close, err := f.newChannel(dns)
ch, close, err := f.newChannel(ctx, dns)
if err != nil {
f.logger.Error(fmt.Sprintf("AMQP: getting the channel for %s/%s: %s", dns, cfg.Name, err.Error()))
return proxy.NoopProxy, err
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (f backendFactory) initProducer(ctx context.Context, remote *config.Backend
return proxy.NoopProxy, err
}

ch, close, err := f.newChannel(dns)
ch, close, err := f.newChannel(ctx, dns)
if err != nil {
f.logger.Error(fmt.Sprintf("AMQP: getting the channel for %s/%s: %s", dns, cfg.Name, err.Error()))
return proxy.NoopProxy, err
Expand Down
9 changes: 3 additions & 6 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *rpcChannel) Load() *amqp.Channel {
}

func (f backendFactory) initRpcChannel(ctx context.Context, cfg *rpcCfg, dns string, rch *rpcChannel) error {
ch, close, err := f.newChannel(dns)
ch, close, err := f.newChannel(ctx, dns)
if err != nil {
f.logger.Error(fmt.Sprintf("AMQP: getting the channel for %s/%s: %s", dns, cfg.Name, err.Error()))
return err
Expand Down Expand Up @@ -107,11 +107,8 @@ func (f backendFactory) initRpcChannel(ctx context.Context, cfg *rpcCfg, dns str
if !ok {
break
}
for {
time.Sleep(time.Duration(3) * time.Second)
if err := f.initRpcChannel(ctx, cfg, dns, rch); err == nil {
break
}
if err := f.initRpcChannel(ctx, cfg, dns, rch); err == nil {
break
}
case reply, ok := <-replies:
if !ok {
Expand Down

0 comments on commit e4cde5f

Please sign in to comment.