Skip to content

Commit

Permalink
Merge pull request #2 from jabolina/change-rabbitmq
Browse files Browse the repository at this point in the history
Change rabbitmq
  • Loading branch information
jabolina authored Jul 5, 2020
2 parents b2ac967 + de4835b commit 597159c
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 137 deletions.
242 changes: 111 additions & 131 deletions pkg/relt/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@ type core struct {
// Configuration for both Relt and AMQP.
configuration ReltConfiguration

// When connected or when a sessions arrives, publish into
// the channel.
connections chan chan session
// Channel to access the RabbitMQ broker.
broker *amqp.Channel

// Connection to the RabbitMQ broker.
connection *amqp.Connection

// Channel for publishing received messages.
received chan Recv

// Channel for receiving messages to be published.
sending chan Send

// Use to emit when subscribed to a queue.
subscribed chan bool
}

// Subscribe to a queue and starts consuming.
Expand All @@ -56,48 +55,28 @@ type core struct {
//
// To any messages received, it will be publish onto the messages channel, and
// this method will be executed until the connections channel is not closed.
func (c core) subscribe() {
for conn := range c.connections {
var sub session
func (c core) subscribe(consumer <-chan amqp.Delivery) {
defer func() {
log.Println("closing rabbitmq consumer")
}()

for {
select {
case sub = <-conn:
case packet, ok := <-consumer:
if !ok {
log.Printf("consumer channel closed.")
break
}
for err := c.broker.Ack(packet.DeliveryTag, false); err != nil; {
log.Printf("failed acking. %v", err)
}
c.received <- Recv{
Data: packet.Body,
Error: nil,
}
case <-c.cancellable.Done():
return
}

if _, err := sub.QueueDeclare(c.configuration.Name, false, true, true, true, nil); err != nil {
log.Fatalf("failed declaring queue %s: %v", c.configuration.Name, err)
}

if err := sub.QueueBind(c.configuration.Name, "*", string(c.configuration.Exchange), false, nil); err != nil {
log.Fatalf("failed binding queue %s: %v", c.configuration.Name, err)
}

consume, err := sub.Consume(c.configuration.Name, "", false, true, false, false, nil)
if err != nil {
log.Fatalf("failed consuming queue %s: %v", c.configuration.Name, err)
}

c.subscribed <- true

Consume:
for {
select {
case packet, ok := <-consume:
if !ok {
break Consume
}
for err := sub.Ack(packet.DeliveryTag, false); err != nil; {
log.Printf("failed acking. %v", err)
}
c.received <- Recv{
Data: packet.Body,
Error: nil,
}
case <-c.cancellable.Done():
return
}
}
}
}

Expand All @@ -108,88 +87,30 @@ func (c core) subscribe() {
//
// This will keep polling until the context is cancelled, and
// will receive messages to be published through the channel.
func (c core) publish() {
for conn := range c.connections {
var pending = make(chan Send, 1)
var pub session

select {
case pub = <-conn:
case <-c.cancellable.Done():
return
}

confirm := make(chan amqp.Confirmation, 1)
if pub.Confirm(false) == nil {
pub.NotifyPublish(confirm)
} else {
close(confirm)
}

Publish:
for {
select {
case _, ok := <-confirm:
if !ok {
break Publish
}
case body := <-pending:
err := pub.Publish(string(body.Address), "*", false, false, amqp.Publishing{
Body: body.Data,
})
if err != nil {
log.Printf("failed publishing %#v. %v", body, err)
pending <- body
pub.Connection.Close()
break Publish
}
case body, running := <-c.sending:
if !running {
return
}
pending <- body
case <-c.cancellable.Done():
return
}
}
}
}

// Keeps running forever providing connections
// through the channel.
// This method will stop when the core context
// is done.
func (c core) connect() {
sess := make(chan session)
func (c core) publish(confirm <-chan amqp.Confirmation) {
defer func() {
close(sess)
close(c.connections)
log.Println("closing rabbitmq publisher")
}()

for {
select {
case c.connections <- sess:
case <-c.cancellable.Done():
return
}

conn, err := amqp.Dial(c.configuration.Url)
if err != nil {
log.Fatalf("failed connection [%s]: %v", c.configuration.Url, err)
}

ch, err := conn.Channel()
if err != nil {
log.Fatalf("could not defined channel: %v", err)
}

err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", true, false, false, false, nil)
if err != nil {
log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err)
}
case _, ok := <-confirm:
if !ok {
log.Println("failed confirmation")
continue
}
case body, running := <-c.sending:
if !running {
return
}
err := c.broker.Publish(string(body.Address), "*", false, false, amqp.Publishing{
Body: body.Data,
})

select {
case sess <- session{conn, ch}:
if err != nil {
log.Printf("failed publishing %#v. %v", body, err)
break
}
case <-c.cancellable.Done():
return
}
Expand All @@ -205,17 +126,23 @@ func (c core) start() {
defer func() {
close(c.sending)
close(c.received)
c.broker.Close()
c.connection.Close()
}()

c.ctx.spawn(c.connect)
c.ctx.spawn(c.subscribe)
c.ctx.spawn(c.publish)
fails := make(chan *amqp.Error)
c.connection.NotifyClose(fails)

for {
for !c.connection.IsClosed() {
select {
case <-c.subscribed:
case <-c.cancellable.Done():
return
case err := <-fails:
log.Printf("error from connection. %v", err)
c.received <- Recv{
Data: nil,
Error: err,
}
}
}
}
Expand All @@ -225,21 +152,74 @@ func (c core) close() {
c.cancel()
}

func (c *core) declarations() error {
conn, err := amqp.Dial(c.configuration.Url)
if err != nil {
return err
}

ch, err := conn.Channel()
if err != nil {
return err
}

c.connection = conn
c.broker = ch

err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", false, true, false, false, nil)
if err != nil {
log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err)
}

_, err = ch.QueueDeclare(c.configuration.Name, false, true, true, false, nil)
if err != nil {
return err
}

if err = ch.QueueBind(c.configuration.Name, "*", string(c.configuration.Exchange), false, nil); err != nil {
return err
}

consumer, err := ch.Consume(c.configuration.Name, "", false, true, false, false, nil)
if err != nil {
return err
}

confirm := make(chan amqp.Confirmation, 1)
if ch.Confirm(false) == nil {
ch.NotifyPublish(confirm)
} else {
close(confirm)
}

c.ctx.spawn(func() {
c.subscribe(consumer)
})

c.ctx.spawn(func() {
c.publish(confirm)
})

c.ctx.spawn(c.start)

return nil
}

// Creates a new instance of the core structure.
// This will also start running and consuming messages.
func newCore(relt Relt) *core {
func newCore(relt Relt) (*core, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &core{
ctx: relt.ctx,
cancellable: ctx,
cancel: cancel,
configuration: relt.configuration,
connections: make(chan chan session),
received: make(chan Recv),
sending: make(chan Send),
subscribed: make(chan bool),
}
c.ctx.spawn(c.start)
<-c.subscribed
return c
err := c.declarations()
if err != nil {
return nil, err
}
return c, nil
}
10 changes: 7 additions & 3 deletions pkg/relt/relt.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *Relt) Close() {

// Creates a new instance of the reliable transport,
// and start all needed routines.
func NewRelt(configuration ReltConfiguration) *Relt {
func NewRelt(configuration ReltConfiguration) (*Relt, error) {
ctx, done := context.WithCancel(context.Background())
relt := &Relt{
ctx: &invoker{
Expand All @@ -103,7 +103,11 @@ func NewRelt(configuration ReltConfiguration) *Relt {
finish: done,
configuration: configuration,
}
relt.core = newCore(*relt)
c, err := newCore(*relt)
if err != nil {
return nil, err
}
relt.core = c
relt.ctx.spawn(relt.run)
return relt
return relt, nil
}
14 changes: 11 additions & 3 deletions test/relt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@ import (

func TestRelt_StartAndStop(t *testing.T) {
conf := relt.DefaultReltConfiguration()
relt := relt.NewRelt(*conf)
relt, err := relt.NewRelt(*conf)
if err != nil {
t.Fatalf("failed connecting. %v", err)
return
}
relt.Close()
}

func TestRelt_PublishAndReceiveMessage(t *testing.T) {
conf := relt.DefaultReltConfiguration()
conf.Name = "random-test-name"
r := relt.NewRelt(*conf)
r, err := relt.NewRelt(*conf)
if err != nil {
t.Fatalf("failed connecting. %v", err)
return
}
defer r.Close()

data := []byte("hello")
Expand All @@ -35,7 +43,7 @@ func TestRelt_PublishAndReceiveMessage(t *testing.T) {
}
}()

err := r.Broadcast(relt.Send{
err = r.Broadcast(relt.Send{
Address: conf.Exchange,
Data: data,
})
Expand Down

0 comments on commit 597159c

Please sign in to comment.