diff --git a/confirms.go b/confirms.go index 06cbaa71..38f82682 100644 --- a/confirms.go +++ b/confirms.go @@ -1,13 +1,16 @@ package amqp -import "sync" +import ( + "sync" + "sync/atomic" +) // confirms resequences and notifies one or multiple publisher confirmation listeners type confirms struct { m sync.Mutex listeners []chan Confirmation sequencer map[uint64]Confirmation - published uint64 + published *uint64 expecting uint64 } @@ -15,7 +18,7 @@ type confirms struct { func newConfirms() *confirms { return &confirms{ sequencer: map[uint64]Confirmation{}, - published: 0, + published: new(uint64), expecting: 1, } } @@ -29,11 +32,7 @@ func (c *confirms) Listen(l chan Confirmation) { // publish increments the publishing counter func (c *confirms) Publish() uint64 { - c.m.Lock() - defer c.m.Unlock() - - c.published++ - return c.published + return atomic.AddUint64(c.published, 1) } // confirm confirms one publishing, increments the expecting delivery tag, and @@ -48,7 +47,7 @@ func (c *confirms) confirm(confirmation Confirmation) { // resequence confirms any out of order delivered confirmations func (c *confirms) resequence() { - for c.expecting <= c.published { + for c.expecting <= atomic.LoadUint64(c.published) { sequenced, found := c.sequencer[c.expecting] if !found { return