From 27d276689f8f6babda2e311ad53e514d698b99d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 15 Jul 2020 22:56:28 +0200 Subject: [PATCH 1/2] Increase publish performance by writing all frames in one go By writing all 3 frames required for a publish in one go, and only locking and flushing the output buffer once we increase the performance about 3 times. Messages that spans over multiple body frames are still written one at a time, when messages are that large (>128KB) the locking and flushing is not the bottleneck, and it allows us to not allocate a dynamic array for each publish and allows us to use a fixed size array instead. --- channel.go | 30 +++++++++++++++++++----------- connection.go | 28 ++++++++++++++++++++++++++++ write.go | 14 ++++++++++++++ 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/channel.go b/channel.go index cd19ce7e..7f8b3edd 100644 --- a/channel.go +++ b/channel.go @@ -229,21 +229,18 @@ func (ch *Channel) sendOpen(msg message) (err error) { } else { size = len(body) } + var frames [3]frame - if err = ch.connection.send(&methodFrame{ + frames[0] = &methodFrame{ ChannelId: ch.id, Method: content, - }); err != nil { - return } - if err = ch.connection.send(&headerFrame{ + frames[1] = &headerFrame{ ChannelId: ch.id, ClassId: class, Size: uint64(len(body)), Properties: props, - }); err != nil { - return } // chunk body into size (max frame size - frame header size) @@ -252,11 +249,22 @@ func (ch *Channel) sendOpen(msg message) (err error) { j = len(body) } - if err = ch.connection.send(&bodyFrame{ - ChannelId: ch.id, - Body: body[i:j], - }); err != nil { - return + // Send first body frame together with the publish and header frame + if i == 0 { + frames[2] = &bodyFrame{ + ChannelId: ch.id, + Body: body[i:j], + } + if err = ch.connection.sendFrames(frames); err != nil { + return + } + } else { + if err = ch.connection.send(&bodyFrame{ + ChannelId: ch.id, + Body: body[i:j], + }); err != nil { + return + } } } } else { diff --git a/connection.go b/connection.go index b9d8e8ee..e37c533a 100644 --- a/connection.go +++ b/connection.go @@ -383,6 +383,34 @@ func (c *Connection) send(f frame) error { return err } +func (c *Connection) sendFrames(frames [3]frame) error { + if c.IsClosed() { + return ErrClosed + } + + c.sendM.Lock() + err := c.writer.WriteFrames(frames) + c.sendM.Unlock() + + if err != nil { + // shutdown could be re-entrant from signaling notify chans + go c.shutdown(&Error{ + Code: FrameError, + Reason: err.Error(), + }) + } else { + // Broadcast we sent a frame, reducing heartbeats, only + // if there is something that can receive - like a non-reentrant + // call or if the heartbeater isn't running + select { + case c.sends <- time.Now(): + default: + } + } + + return err +} + func (c *Connection) shutdown(err *Error) { atomic.StoreInt32(&c.closed, 1) diff --git a/write.go b/write.go index 94a46d11..27cec27d 100644 --- a/write.go +++ b/write.go @@ -27,6 +27,20 @@ func (w *writer) WriteFrame(frame frame) (err error) { return } +func (w *writer) WriteFrames(frames [3]frame) (err error) { + for _, frame := range frames { + if err = frame.write(w.w); err != nil { + return + } + } + + if buf, ok := w.w.(*bufio.Writer); ok { + err = buf.Flush() + } + + return +} + func (f *methodFrame) write(w io.Writer) (err error) { var payload bytes.Buffer From f81b9169a6ad92866a14694149f091851425fa08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 15 Jul 2020 23:00:51 +0200 Subject: [PATCH 2/2] Enable Nagle's algorithm to increase publish performance of small message By disabling TCP no delay (enable Nagle's algorithm) many small messages can be sent in a single TCP packet, we increase the publish rate about 100% when messages are small. This will not increase latency in any normal circumstances, but theoretically could if for instance one channel is publishing a message, and another channel is declaring a queue and then waiting for the CreateOK response, then due to a delayed ack from the server a 40ms delay could be added to the wait of the Queue CreateOK. --- connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/connection.go b/connection.go index e37c533a..50ae8b07 100644 --- a/connection.go +++ b/connection.go @@ -189,6 +189,7 @@ func DialConfig(url string, config Config) (*Connection, error) { if err != nil { return nil, err } + conn.(*net.TCPConn).SetNoDelay(false) if uri.Scheme == "amqps" { if config.TLSClientConfig == nil {