diff --git a/conn.go b/conn.go index 321da690..c6134d4b 100644 --- a/conn.go +++ b/conn.go @@ -3,7 +3,6 @@ package nsq import ( "bufio" "bytes" - "compress/flate" "crypto/tls" "encoding/json" "errors" @@ -15,7 +14,11 @@ import ( "sync/atomic" "time" + // The golang snappy appears to be strictly better than the klauspost + // version, but the klauspost flate also appears to be strictly better + // than compress/flate. "github.com/golang/snappy" + "github.com/klauspost/compress/flate" ) // IdentifyResponse represents the metadata @@ -119,8 +122,7 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn { // The logger parameter is an interface that requires the following // method to be implemented (such as the the stdlib log.Logger): // -// Output(calldepth int, s string) -// +// Output(calldepth int, s string) func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) { c.logGuard.Lock() defer c.logGuard.Unlock() @@ -468,7 +470,7 @@ func (c *Conn) upgradeSnappy() error { conn = c.tlsConn } c.r = snappy.NewReader(conn) - c.w = snappy.NewWriter(conn) + c.w = snappy.NewBufferedWriter(conn) frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize) if err != nil { return err diff --git a/consumer_test.go b/consumer_test.go index dcb34df0..1954fb29 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -2,10 +2,13 @@ package nsq import ( "bytes" + goflate "compress/flate" + "crypto/rand" "crypto/tls" "encoding/json" "errors" "fmt" + "io" "io/ioutil" "log" "net" @@ -13,10 +16,277 @@ import ( "net/http/httptest" "strconv" "strings" + "sync" + "sync/atomic" "testing" "time" + + gosnappy "github.com/golang/snappy" + "github.com/klauspost/compress/flate" + "github.com/klauspost/compress/snappy" ) +var ipsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce ut aliquet ante. Donec consequat mollis nulla, dictum pulvinar lacus fringilla non. Curabitur euismod sagittis elit a efficitur. Maecenas laoreet lobortis metus ac viverra. Duis nec mattis quam, nec maximus odio. Aenean euismod, ante et lacinia tristique, neque nibh imperdiet quam, in aliquet nibh nibh in dui. Quisque vestibulum dui sit amet dui pellentesque interdum. Ut aliquam odio nec tortor euismod, ac iaculis urna ultrices. Ut quam libero, tristique nec sollicitudin et, suscipit id ex. Cras egestas quam eget egestas lobortis. Donec ultrices consectetur turpis, vel pharetra odio dictum sit amet. Integer pulvinar ullamcorper urna, a pellentesque ipsum rutrum sed. Aenean ac molestie lorem, pulvinar facilisis metus. In hac habitasse platea dictumst. Quisque sed leo tincidunt ligula accumsan ultrices eu in ex. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; + +Sed sollicitudin est eget lobortis aliquam. Integer tellus leo, porta vitae tempus id, sodales ac dolor. Mauris cursus pharetra lorem a venenatis. Ut facilisis eu purus vel semper. Sed viverra, ipsum sit amet auctor pulvinar, risus nibh consequat velit, euismod egestas metus nisl ac mauris. Aliquam quis auctor dolor. Mauris fermentum urna at erat fringilla, at gravida ex luctus. Phasellus quis lacus volutpat massa vestibulum ornare. Nullam maximus ullamcorper ipsum non dignissim. Nulla id mi sed nulla gravida sagittis. Nam ullamcorper massa at consequat porta. Integer in est ut nulla tincidunt finibus. Ut id urna euismod, accumsan velit at, feugiat odio. + +Fusce vehicula nisl augue, ut ultrices enim scelerisque in. Integer vitae erat in lacus accumsan molestie in ac leo. Donec eleifend auctor quam eget scelerisque. Curabitur posuere erat sed erat eleifend scelerisque. Duis tortor diam, lacinia quis hendrerit vitae, varius et sapien. Cras at velit tempor, dapibus odio sed, viverra neque. Integer in ullamcorper ante. Curabitur et dictum dolor, ut faucibus risus. Nulla vel libero nulla. + +Donec odio erat, vulputate in vulputate tempus, rutrum at ipsum. Phasellus diam eros, accumsan posuere orci ac, pellentesque ornare erat. Donec vestibulum ante maximus mollis vestibulum. Phasellus aliquet semper ligula, ut rhoncus enim luctus eu. Sed quis viverra ex. Donec pretium mollis justo, nec blandit risus dictum commodo. Aliquam nunc metus, facilisis nec fermentum sit amet, ultrices non ante. Cras cursus lacus nec dolor iaculis ornare. Pellentesque nibh lectus, convallis a nisi non, sollicitudin scelerisque tellus. Curabitur elit nulla, sollicitudin at nibh eu, rhoncus tincidunt dui. Aliquam sed interdum risus. Curabitur a interdum augue. Donec finibus enim velit, nec venenatis ante suscipit quis. In id nunc lacus. + +Fusce et mauris et elit aliquam semper accumsan non magna. Fusce nisi ante, pretium quis mollis eu, vehicula non nisi. Proin euismod tincidunt turpis vel vehicula. Fusce ut dui sodales, ullamcorper orci sit amet, euismod nisl. Proin egestas lacus ut mollis cursus. Suspendisse libero ante, tristique vitae mi sit amet, efficitur rhoncus magna. Sed porttitor, velit vitae efficitur finibus, felis leo ultricies elit, eu suscipit purus elit ut elit. + +In sit amet fermentum neque, sit amet fermentum ipsum. Curabitur mollis imperdiet lorem, in auctor ligula consequat finibus. Proin eu mi at mauris pretium suscipit. Fusce leo dolor, feugiat in lacus mattis, iaculis venenatis purus. Vivamus dictum elementum dignissim. Maecenas accumsan blandit eros, at dapibus velit ultricies vitae. Phasellus ut velit vel sapien fermentum volutpat id ac orci. Maecenas lorem diam, lacinia vitae mi et, fringilla lacinia dolor. Nullam purus elit, tempus eu gravida sit amet, scelerisque sit amet quam. Aenean non elit vitae lacus fringilla varius. Sed pharetra nisi sit amet bibendum malesuada. Fusce euismod nulla ac lectus rutrum, ac vehicula nibh maximus. Praesent eu tincidunt orci, nec varius libero. Nam vitae lorem rhoncus, hendrerit sapien placerat, varius turpis. Fusce massa ligula, venenatis nec diam ac, vestibulum commodo libero. Sed id risus ac libero posuere posuere. + +Duis tincidunt condimentum tristique. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Pellentesque elementum risus et viverra faucibus. Morbi et ex vitae ante aliquam auctor ut in felis. Integer sed vehicula eros. Vivamus mattis aliquet felis condimentum ullamcorper. Nullam lacus ipsum, gravida at gravida nec, egestas ac nulla. Vestibulum risus nisi, ullamcorper venenatis tortor sed, ullamcorper fermentum eros. Morbi ultrices, mi sed euismod venenatis, leo nulla ultrices justo, nec placerat enim nunc a elit. Quisque dictum magna quis aliquam vestibulum. Duis hendrerit ante in eros lacinia, at gravida orci congue. Donec nec nibh velit. + +Vivamus aliquam tellus non ipsum vestibulum, nec sollicitudin leo convallis. Nullam risus nulla, bibendum non lacus ut, bibendum pretium felis. Sed sed sed.` + +func BenchmarkCompression(b *testing.B) { + type myOps interface { + Flush() error + Reset(writer io.Writer) + } + + type readResetter interface { + Reset(reader io.Reader) + } + + type oddResetter interface { + Reset(r io.Reader, dict []byte) error + } + + for _, payloadType := range []string{"compressible", "random"} { + b.Run(payloadType, func(b *testing.B) { + for _, size := range []int{100, 4096, 65536, 1048576, 2097152, 5048576} { + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + toSend := make([]byte, size) + if payloadType == "compressible" { + for i := 0; i < len(toSend); { + i += copy(toSend[i:], ipsum) + } + } else { + if _, err := rand.Read(toSend); err != nil { + b.Fatal(err) + } + } + buf := bytes.NewBuffer(make([]byte, 0, size+1000)) + + for _, option := range []string{"normal", "deflate1", "godeflate3", "deflate3", "deflate", "deflate9", "snappy", "gosnappy"} { + b.Run(option, func(b *testing.B) { + var w io.Writer + var r io.Reader + + switch option { + case "deflate1": + fw, err := flate.NewWriter(buf, 1) + if err != nil { + b.Fatal(err) + } + fr := flate.NewReader(buf) + w, r = fw, fr + defer fw.Close() + defer fr.Close() + case "deflate": + fw, err := flate.NewWriter(buf, 6) + if err != nil { + b.Fatal(err) + } + fr := flate.NewReader(buf) + w, r = fw, fr + defer fw.Close() + defer fr.Close() + case "deflate9": + fw, err := flate.NewWriter(buf, 9) + if err != nil { + b.Fatal(err) + } + fr := flate.NewReader(buf) + w, r = fw, fr + defer fw.Close() + defer fr.Close() + case "deflate3": + fw, err := flate.NewWriter(buf, 3) + if err != nil { + b.Fatal(err) + } + fr := flate.NewReader(buf) + w, r = fw, fr + defer fw.Close() + defer fr.Close() + case "godeflate3": + fw, err := goflate.NewWriter(buf, 3) + if err != nil { + b.Fatal(err) + } + fr := goflate.NewReader(buf) + w, r = fw, fr + defer fw.Close() + defer fr.Close() + case "snappy": + sw := snappy.NewBufferedWriter(buf) + sr := snappy.NewReader(buf) + w, r = sw, sr + defer sw.Close() + case "gosnappy": + sw := gosnappy.NewBufferedWriter(buf) + sr := gosnappy.NewReader(buf) + w, r = sw, sr + defer sw.Close() + + case "normal": + w = buf + r = buf + } + + in := make([]byte, len(toSend)) + ops, _ := w.(myOps) + rOps, _ := r.(readResetter) + roOps, _ := r.(oddResetter) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + buf.Reset() + if ops != nil { + ops.Reset(buf) + } + if rOps != nil { + rOps.Reset(buf) + } + if roOps != nil { + roOps.Reset(buf, nil) + } + + n, err := w.Write(toSend) + if err != nil { + b.Fatal(err) + } + if n != len(toSend) { + b.Fatalf("expected %d got %d", len(toSend), n) + } + + if ops != nil { + if err := ops.Flush(); err != nil { + b.Fatal(err) + } + } + + b.ReportMetric(float64(buf.Len()), "cmpSizeB") + + n, err = io.ReadFull(r, in) + if err != nil { + b.Fatal(err) + } + if n != len(toSend) { + b.Fatalf("expected %d got %d", len(toSend), n) + } + if !bytes.Equal(in, toSend) { + b.Fatal("Result does not match!") + } + } + }) + } + }) + } + }) + } +} + +type benchConsumer struct { + count int32 + target int32 + wg sync.WaitGroup +} + +func (h *benchConsumer) HandleMessage(message *Message) error { + if atomic.AddInt32(&h.count, 1) == h.target { + h.wg.Done() + } + return nil +} + +func BenchmarkSendReceive(b *testing.B) { + tests := []struct { + size int + option string + }{ + {size: 100, option: "normal"}, + {size: 100, option: "deflate"}, + {size: 100, option: "snappy"}, + {size: 4096, option: "normal"}, + {size: 4096, option: "deflate"}, + {size: 4096, option: "snappy"}, + {size: 65536, option: "normal"}, + {size: 65536, option: "deflate"}, + {size: 65536, option: "snappy"}, + {size: 1048576, option: "normal"}, + {size: 1048576, option: "deflate"}, + {size: 1048576, option: "snappy"}, + {size: 2097152, option: "normal"}, + {size: 2097152, option: "deflate"}, + {size: 2097152, option: "snappy"}, + {size: 5048576, option: "normal"}, + {size: 5048576, option: "deflate"}, + {size: 5048576, option: "snappy"}, + } + + for _, test := range tests { + b.Run(fmt.Sprintf("size=%d/%s", test.size, test.option), func(b *testing.B) { + config := NewConfig() + + switch test.option { + case "deflate": + config.Deflate = true + case "snappy": + config.Snappy = true + case "normal": + } + + topicName := "bench_send_receive" + strconv.Itoa(int(time.Now().Unix())) + q, err := NewConsumer(topicName, "ch", config) + if err != nil { + b.Fatal(err) + } + defer q.Stop() + q.SetLogger(nullLogger, LogLevelInfo) + + h := &benchConsumer{ + target: int32(b.N), + } + h.wg.Add(1) + q.AddHandler(h) + + if err := q.ConnectToNSQD("127.0.0.1:4150"); err != nil { + b.Fatal(err) + } + + p, err := NewProducer("127.0.0.1:4150", config) + if err != nil { + b.Fatal(err) + } + defer p.Stop() + p.SetLogger(nullLogger, LogLevelInfo) + + toSend := make([]byte, test.size) + if _, err := rand.Read(toSend); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if err := p.Publish(topicName, toSend); err != nil { + b.Fatal(err) + } + } + + h.wg.Wait() + }) + } +} + type MyTestHandler struct { t *testing.T q *Consumer diff --git a/go.mod b/go.mod index d6cfc37e..96fa4b88 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/nsqio/go-nsq go 1.17 -require github.com/golang/snappy v0.0.4 +require ( + github.com/golang/snappy v0.0.4 + github.com/klauspost/compress v1.17.8 +) diff --git a/go.sum b/go.sum index 74eae48d..7b7af99d 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=