From e2a76429499e66d409ea9313404695618ce5ddcf Mon Sep 17 00:00:00 2001 From: jabolina1 Date: Sun, 5 Jul 2020 14:40:03 -0300 Subject: [PATCH 1/2] changes on rabbitmq connection --- pkg/relt/rabbitmq.go | 242 ++++++++++++++++++++----------------------- pkg/relt/relt.go | 10 +- test/relt_test.go | 14 ++- 3 files changed, 129 insertions(+), 137 deletions(-) diff --git a/pkg/relt/rabbitmq.go b/pkg/relt/rabbitmq.go index 31f6e79..de41e9b 100644 --- a/pkg/relt/rabbitmq.go +++ b/pkg/relt/rabbitmq.go @@ -27,18 +27,17 @@ type core struct { // Configuration for both Relt and AMQP. configuration ReltConfiguration - // When connected or when a sessions arrives, publish into - // the channel. - connections chan chan session + // Channel to access the RabbitMQ broker. + broker *amqp.Channel + + // Connection to the RabbitMQ broker. + connection *amqp.Connection // Channel for publishing received messages. received chan Recv // Channel for receiving messages to be published. sending chan Send - - // Use to emit when subscribed to a queue. - subscribed chan bool } // Subscribe to a queue and starts consuming. @@ -56,48 +55,28 @@ type core struct { // // To any messages received, it will be publish onto the messages channel, and // this method will be executed until the connections channel is not closed. -func (c core) subscribe() { - for conn := range c.connections { - var sub session +func (c core) subscribe(consumer <-chan amqp.Delivery) { + defer func() { + log.Println("closing rabbitmq consumer") + }() + + for { select { - case sub = <-conn: + case packet, ok := <-consumer: + if !ok { + log.Printf("consumer channel closed.") + break + } + for err := c.broker.Ack(packet.DeliveryTag, false); err != nil; { + log.Printf("failed acking. %v", err) + } + c.received <- Recv{ + Data: packet.Body, + Error: nil, + } case <-c.cancellable.Done(): return } - - if _, err := sub.QueueDeclare(c.configuration.Name, false, true, true, true, nil); err != nil { - log.Fatalf("failed declaring queue %s: %v", c.configuration.Name, err) - } - - if err := sub.QueueBind(c.configuration.Name, "*", string(c.configuration.Exchange), false, nil); err != nil { - log.Fatalf("failed binding queue %s: %v", c.configuration.Name, err) - } - - consume, err := sub.Consume(c.configuration.Name, "", false, true, false, false, nil) - if err != nil { - log.Fatalf("failed consuming queue %s: %v", c.configuration.Name, err) - } - - c.subscribed <- true - - Consume: - for { - select { - case packet, ok := <-consume: - if !ok { - break Consume - } - for err := sub.Ack(packet.DeliveryTag, false); err != nil; { - log.Printf("failed acking. %v", err) - } - c.received <- Recv{ - Data: packet.Body, - Error: nil, - } - case <-c.cancellable.Done(): - return - } - } } } @@ -108,88 +87,30 @@ func (c core) subscribe() { // // This will keep polling until the context is cancelled, and // will receive messages to be published through the channel. -func (c core) publish() { - for conn := range c.connections { - var pending = make(chan Send, 1) - var pub session - - select { - case pub = <-conn: - case <-c.cancellable.Done(): - return - } - - confirm := make(chan amqp.Confirmation, 1) - if pub.Confirm(false) == nil { - pub.NotifyPublish(confirm) - } else { - close(confirm) - } - - Publish: - for { - select { - case _, ok := <-confirm: - if !ok { - break Publish - } - case body := <-pending: - err := pub.Publish(string(body.Address), "*", false, false, amqp.Publishing{ - Body: body.Data, - }) - if err != nil { - log.Printf("failed publishing %#v. %v", body, err) - pending <- body - pub.Connection.Close() - break Publish - } - case body, running := <-c.sending: - if !running { - return - } - pending <- body - case <-c.cancellable.Done(): - return - } - } - } -} - -// Keeps running forever providing connections -// through the channel. -// This method will stop when the core context -// is done. -func (c core) connect() { - sess := make(chan session) +func (c core) publish(confirm <- chan amqp.Confirmation) { defer func() { - close(sess) - close(c.connections) + log.Println("closing rabbitmq publisher") }() for { select { - case c.connections <- sess: - case <-c.cancellable.Done(): - return - } - - conn, err := amqp.Dial(c.configuration.Url) - if err != nil { - log.Fatalf("failed connection [%s]: %v", c.configuration.Url, err) - } - - ch, err := conn.Channel() - if err != nil { - log.Fatalf("could not defined channel: %v", err) - } - - err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", true, false, false, false, nil) - if err != nil { - log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err) - } + case _, ok := <-confirm: + if !ok { + log.Println("failed confirmation") + continue + } + case body, running := <-c.sending: + if !running { + return + } + err := c.broker.Publish(string(body.Address), "*", false, false, amqp.Publishing{ + Body: body.Data, + }) - select { - case sess <- session{conn, ch}: + if err != nil { + log.Printf("failed publishing %#v. %v", body, err) + break + } case <-c.cancellable.Done(): return } @@ -205,17 +126,23 @@ func (c core) start() { defer func() { close(c.sending) close(c.received) + c.broker.Close() + c.connection.Close() }() - c.ctx.spawn(c.connect) - c.ctx.spawn(c.subscribe) - c.ctx.spawn(c.publish) + fails := make(chan *amqp.Error) + c.connection.NotifyClose(fails) - for { + for !c.connection.IsClosed() { select { - case <-c.subscribed: case <-c.cancellable.Done(): return + case err := <-fails: + log.Printf("error from connection. %v", err) + c.received <- Recv{ + Data: nil, + Error: err, + } } } } @@ -225,21 +152,74 @@ func (c core) close() { c.cancel() } +func (c *core) declarations() error { + conn, err := amqp.Dial(c.configuration.Url) + if err != nil { + return err + } + + ch, err := conn.Channel() + if err != nil { + return err + } + + c.connection = conn + c.broker = ch + + err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", true, false, false, false, nil) + if err != nil { + log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err) + } + + _, err = ch.QueueDeclare(c.configuration.Name, false, true, true, false, nil) + if err != nil { + return err + } + + if err = ch.QueueBind(c.configuration.Name, "*", string(c.configuration.Exchange), false, nil); err != nil { + return err + } + + consumer, err := ch.Consume(c.configuration.Name, "", false, true, false, false, nil) + if err != nil { + return err + } + + confirm := make(chan amqp.Confirmation, 1) + if ch.Confirm(false) == nil { + ch.NotifyPublish(confirm) + } else { + close(confirm) + } + + c.ctx.spawn(func() { + c.subscribe(consumer) + }) + + c.ctx.spawn(func() { + c.publish(confirm) + }) + + c.ctx.spawn(c.start) + + return nil +} + // Creates a new instance of the core structure. // This will also start running and consuming messages. -func newCore(relt Relt) *core { +func newCore(relt Relt) (*core, error) { ctx, cancel := context.WithCancel(context.Background()) c := &core{ ctx: relt.ctx, cancellable: ctx, cancel: cancel, configuration: relt.configuration, - connections: make(chan chan session), received: make(chan Recv), sending: make(chan Send), - subscribed: make(chan bool), } - c.ctx.spawn(c.start) - <-c.subscribed - return c + err := c.declarations() + if err != nil { + return nil, err + } + return c, nil } diff --git a/pkg/relt/relt.go b/pkg/relt/relt.go index dbf7653..09ab6d0 100644 --- a/pkg/relt/relt.go +++ b/pkg/relt/relt.go @@ -93,7 +93,7 @@ func (r *Relt) Close() { // Creates a new instance of the reliable transport, // and start all needed routines. -func NewRelt(configuration ReltConfiguration) *Relt { +func NewRelt(configuration ReltConfiguration) (*Relt, error) { ctx, done := context.WithCancel(context.Background()) relt := &Relt{ ctx: &invoker{ @@ -103,7 +103,11 @@ func NewRelt(configuration ReltConfiguration) *Relt { finish: done, configuration: configuration, } - relt.core = newCore(*relt) + c, err := newCore(*relt) + if err != nil { + return nil, err + } + relt.core = c relt.ctx.spawn(relt.run) - return relt + return relt, nil } diff --git a/test/relt_test.go b/test/relt_test.go index 93887ff..a42909c 100644 --- a/test/relt_test.go +++ b/test/relt_test.go @@ -10,14 +10,22 @@ import ( func TestRelt_StartAndStop(t *testing.T) { conf := relt.DefaultReltConfiguration() - relt := relt.NewRelt(*conf) + relt, err := relt.NewRelt(*conf) + if err != nil { + t.Fatalf("failed connecting. %v", err) + return + } relt.Close() } func TestRelt_PublishAndReceiveMessage(t *testing.T) { conf := relt.DefaultReltConfiguration() conf.Name = "random-test-name" - r := relt.NewRelt(*conf) + r, err := relt.NewRelt(*conf) + if err != nil { + t.Fatalf("failed connecting. %v", err) + return + } defer r.Close() data := []byte("hello") @@ -35,7 +43,7 @@ func TestRelt_PublishAndReceiveMessage(t *testing.T) { } }() - err := r.Broadcast(relt.Send{ + err = r.Broadcast(relt.Send{ Address: conf.Exchange, Data: data, }) From de4835b2f4f17cdea5ac896eb81346f0f2e757a7 Mon Sep 17 00:00:00 2001 From: jabolina1 Date: Sun, 5 Jul 2020 16:19:16 -0300 Subject: [PATCH 2/2] updated rabbitmq connection --- pkg/relt/rabbitmq.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/relt/rabbitmq.go b/pkg/relt/rabbitmq.go index de41e9b..fee1bcb 100644 --- a/pkg/relt/rabbitmq.go +++ b/pkg/relt/rabbitmq.go @@ -87,7 +87,7 @@ func (c core) subscribe(consumer <-chan amqp.Delivery) { // // This will keep polling until the context is cancelled, and // will receive messages to be published through the channel. -func (c core) publish(confirm <- chan amqp.Confirmation) { +func (c core) publish(confirm <-chan amqp.Confirmation) { defer func() { log.Println("closing rabbitmq publisher") }() @@ -166,7 +166,7 @@ func (c *core) declarations() error { c.connection = conn c.broker = ch - err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", true, false, false, false, nil) + err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", false, true, false, false, nil) if err != nil { log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err) }