From 22a512b953df8e90939555e32f881e0563715088 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Mon, 22 May 2023 14:53:09 -0700 Subject: [PATCH] Feat[pool]: Implement io.Closer for bytes buffers --- pool/bytes.go | 31 +++++- pool/bytes_bench_test.go | 173 +++++++++++++++++++++++++++++++ pool/bytes_test.go | 219 +++++++++++---------------------------- 3 files changed, 260 insertions(+), 163 deletions(-) create mode 100644 pool/bytes_bench_test.go diff --git a/pool/bytes.go b/pool/bytes.go index e25b24e..e0554fc 100644 --- a/pool/bytes.go +++ b/pool/bytes.go @@ -2,6 +2,7 @@ package pool import ( "bytes" + "errors" "io" "sync" ) @@ -52,8 +53,8 @@ func (cf BufferFactory) MustPut(buf *Buffer) { // Get returns a buffer from the pool. func (cf BufferFactory) Get() *Buffer { return &Buffer{ - cf.pool.Get().(*bytes.Buffer), - &sync.Once{}, + Buffer: cf.pool.Get().(*bytes.Buffer), + o: &sync.Once{}, } } @@ -61,6 +62,14 @@ func (cf BufferFactory) Get() *Buffer { type Buffer struct { *bytes.Buffer o *sync.Once + p *BufferFactory +} + +// WithParent sets the parent of the buffer. This is useful for chaining factories, and for facilitating +// in-line buffer return with functions like Buffer.Close(). Be mindful, however, that this adds a bit of overhead. +func (c Buffer) WithParent(p *BufferFactory) *Buffer { + c.p = p + return &c } // Bytes returns a slice of length b.Len() holding the unread portion of the buffer. @@ -223,7 +232,10 @@ func (c Buffer) WriteString(str string) (int, error) { return c.Buffer.WriteString(str) } -// Grow grows the buffer's capacity, if necessary, to guarantee space for another n bytes. After Grow(n), at least n bytes can be written to the buffer without another allocation. If n is negative, Grow will panic. If the buffer can't grow it will panic with ErrTooLarge. +// Grow grows the buffer's capacity, if necessary, to guarantee space for another n bytes. +// After Grow(n), at least n bytes can be written to the buffer without another allocation. +// If n is negative, Grow will panic. If the buffer can't grow it will panic with ErrTooLarge. +// // If the buffer has already been returned to the pool, Grow will return ErrBufferReturned. // // *This is from the bytes.Buffer docs.* @@ -410,3 +422,16 @@ func (c Buffer) Next(n int) []byte { } return c.Buffer.Next(n) } + +// Close implements io.Closer. It returns the buffer to the pool. This +func (c Buffer) Close() error { + if c.Buffer == nil { + return errors.New("buffer already returned to pool") + } + if c.p == nil { + return errors.New( + "buffer does not know it's parent pool and therefore cannot return itself, use Buffer.WithParent to set the parent pool", + ) + } + return c.p.Put(&c) +} diff --git a/pool/bytes_bench_test.go b/pool/bytes_bench_test.go new file mode 100644 index 0000000..f7a75b4 --- /dev/null +++ b/pool/bytes_bench_test.go @@ -0,0 +1,173 @@ +package pool + +import ( + "bytes" + "fmt" + "os" + "strings" + "testing" +) + +// ========================================================================= + +// BenchmarkBufferFactory tries to emulate real world usage of a buffer pool. +// It creates a buffer, writes to it, and then returns it to the pool. +// +// Then it repeats this process b.N times. +// This should be a decent way to test the performance of a buffer pool. +// +// See bytes_bench.go for more information. +func BenchmarkBufferFactory(b *testing.B) { + benchmarkBufferFactory(b) +} + +// BenchmarkNotUsingPackage is a benchmark that does not use git.tcp.direct/kayos/common/pool. +// It mimics the behavior of the BufferFactory benchmark, but does not use a buffer pool. +// +// See bytes_test.go for more information. +func BenchmarkNotUsingPackage(b *testing.B) { + benchmarkNewBytesBuffer(b) +} + +// ========================================================================= + +const ( + hello64 = `its me ur new best friend tell me a buf i tell you where it ends` + hello = `hello world, it's me, your new best friend. tell me the buffer and i'll tell you where it ends.` + lip = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed a ante sit amet purus blandit auctor. Nullam ornare enim sed nibh consequat molestie. Duis est lectus, vestibulum vel felis vel, convallis cursus ex. Morbi nec placerat orci. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Praesent a erat sit amet libero convallis ornare a venenatis dolor. Pellentesque euismod risus et metus porttitor, vel consectetur lacus tempus. Integer elit arcu, condimentum quis nisi eget, dapibus imperdiet nulla. Cras sit amet ante in urna varius tempus. Integer tristique sagittis nunc vel tincidunt. Integer non suscipit ligula, et fermentum sem. Duis id odio lorem. Sed id placerat urna, eu vehicula risus. Duis porttitor hendrerit risus. Curabitur id tellus ac arcu aliquet finibus. Pellentesque et nisl ante. Mauris sapien nisl, pretium in ligula tempus, posuere mattis turpis. Proin et tempus enim. Nullam at diam est. Vivamus ut lectus hendrerit, interdum ex id, ultricies sapien. Praesent rhoncus turpis dolor, quis lobortis tortor pellentesque id. Pellentesque eget nisi laoreet, fringilla augue eu, cursus risus. Integer consectetur ornare laoreet. Praesent ligula sem, tincidunt at ligula at, condimentum venenatis tortor. Nam laoreet enim leo, sed finibus lorem egestas vel. Maecenas varius a leo non placerat. Donec scelerisque, risus vel finibus ornare, arcu ligula interdum justo, in ultricies urna mi et neque. Curabitur sed sem dui. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Maecenas eget laoreet nisi. Nam rhoncus sapien ac interdum sagittis. Nulla fermentum sem nec tellus dignissim lacinia. Curabitur ornare lectus non dictum laoreet. Praesent tempor risus at tortor tempor finibus. Cras id dolor mi. Mauris ut mi quis est vehicula molestie. Mauris eu varius urna. Integer sodales nunc at risus rutrum eleifend. In sed bibendum lectus. Morbi ipsum sapien, blandit in dignissim eu, ultrices non odio. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Nulla eget volutpat ligula, at elementum dui. Aliquam sed enim scelerisque, facilisis magna vitae, dignissim enim. Pellentesque non ultricies urna. Proin fermentum erat semper efficitur auctor. Vestibulum posuere non tortor vitae tincidunt.` +) + +var lipTenIcedTea = strings.Repeat(lip, 10) + +func poolbench(f BufferFactory) { + buf := f.Get() + buf.MustWrite([]byte(hello64)) + f.MustPut(buf) + buf = f.Get() + buf.MustWrite([]byte(hello)) + f.MustPut(buf) + buf = f.Get() + buf.MustWrite([]byte(lip)) + f.MustPut(buf) + buf = f.Get() + buf.MustWrite([]byte(lipTenIcedTea)) + f.MustPut(buf) +} + +func parabench(pb *testing.PB, f BufferFactory) { + for pb.Next() { + poolbench(f) + } +} + +func sized(b *testing.B, size int, para int) { + b.ReportAllocs() + + f := NewBufferFactory() + if size != 0 { + f = NewSizedBufferFactory(size) + } + + if para != 0 { + b.SetParallelism(para) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { parabench(pb, f) }) + return + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + poolbench(f) + } +} + +// ------------------------------------------------------------ +// ------------------------------------------------------------ +// ------------------------------------------------------------ + +func sizedbytesbench(initial func() []byte) { + buf := bytes.NewBuffer(initial()) + buf.Write([]byte(hello64)) + buf = bytes.NewBuffer(initial()) + buf.Write([]byte(hello)) + buf = bytes.NewBuffer(initial()) + buf.Write([]byte(lip)) + buf = bytes.NewBuffer(initial()) + buf.Write([]byte(lipTenIcedTea)) +} + +func parabytesbench(pb *testing.PB, size int) { + for pb.Next() { + if size != 0 { + sizedbytesbench(func() []byte { return make([]byte, 0, size) }) + } else { + sizedbytesbench(func() []byte { return nil }) + } + } +} + +func bytessized(b *testing.B, size int, para int) { + b.ReportAllocs() + if para != 0 { + b.SetParallelism(para) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { parabytesbench(pb, size) }) + return + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sizedbytesbench(func() []byte { return nil }) + } +} + +var divier = []byte(strings.Repeat("-", 130) + "\n") + +func bigDivide(n int) { + if n > 1 { + _, _ = os.Stdout.Write([]byte{'\n'}) + } + for i := 0; i < n; i++ { + _, _ = os.Stdout.Write(divier) + } + + _, _ = os.Stdout.Write([]byte{'\n'}) +} + +func benchmarkBufferFactory(b *testing.B) { + b.ReportAllocs() + concurrency := []int{0, 2, 4, 8} + size := []int{64, 1024, 4096, 65536} + + defer bigDivide(2) + + for _, c := range concurrency { + for _, s := range size { + label := fmt.Sprintf("Concurrent-x%d-%d-bytes", c, s) + if c == 0 { + label = fmt.Sprintf("SingleProc-%d-bytes", s) + } + b.Run(label, func(b *testing.B) { sized(b, s, c) }) + } + _, _ = os.Stdout.Write(divier) + } +} + +func benchmarkNewBytesBuffer(b *testing.B) { + b.ReportAllocs() + concurrency := []int{0, 2, 4, 8} + size := []int{64, 1024, 4096, 65536} + + defer bigDivide(1) + + for _, c := range concurrency { + for _, s := range size { + label := fmt.Sprintf("Concurrent-x%d-%d-bytes", c, s) + if c == 0 { + label = fmt.Sprintf("SingleProc-%d-bytes", s) + } + b.Run(label, func(b *testing.B) { bytessized(b, s, c) }) + } + _, _ = os.Stdout.Write(divier) + } +} diff --git a/pool/bytes_test.go b/pool/bytes_test.go index 9f5434a..35e780f 100644 --- a/pool/bytes_test.go +++ b/pool/bytes_test.go @@ -2,9 +2,7 @@ package pool import ( "bytes" - "fmt" "io" - "os" "strings" "testing" ) @@ -507,167 +505,68 @@ func TestBufferFactory(t *testing.T) { t.Errorf("Expected sized buffer from fresh factory to be cap == 4, got: %d", buf.Cap()) } }) -} - -// ========================================================================= - -// BenchmarkBufferFactory tries to emulate real world usage of a buffer pool. -// It creates a buffer, writes to it, and then returns it to the pool. -// -// Then it repeats this process b.N times. -// This should be a decent way to test the performance of a buffer pool. -// -// See bytes_bench.go for more information. -func BenchmarkBufferFactory(b *testing.B) { - benchmarkBufferFactory(b) -} - -// BenchmarkNotUsingPackage is a benchmark that does not use git.tcp.direct/kayos/common/pool. -// It mimics the behavior of the BufferFactory benchmark, but does not use a buffer pool. -// -// See bytes_test.go for more information. -func BenchmarkNotUsingPackage(b *testing.B) { - benchmarkNewBytesBuffer(b) -} - -// ========================================================================= - -const ( - hello64 = `its me ur new best friend tell me a buf i tell you where it ends` - hello = `hello world, it's me, your new best friend. tell me the buffer and i'll tell you where it ends.` - lip = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed a ante sit amet purus blandit auctor. Nullam ornare enim sed nibh consequat molestie. Duis est lectus, vestibulum vel felis vel, convallis cursus ex. Morbi nec placerat orci. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Praesent a erat sit amet libero convallis ornare a venenatis dolor. Pellentesque euismod risus et metus porttitor, vel consectetur lacus tempus. Integer elit arcu, condimentum quis nisi eget, dapibus imperdiet nulla. Cras sit amet ante in urna varius tempus. Integer tristique sagittis nunc vel tincidunt. Integer non suscipit ligula, et fermentum sem. Duis id odio lorem. Sed id placerat urna, eu vehicula risus. Duis porttitor hendrerit risus. Curabitur id tellus ac arcu aliquet finibus. Pellentesque et nisl ante. Mauris sapien nisl, pretium in ligula tempus, posuere mattis turpis. Proin et tempus enim. Nullam at diam est. Vivamus ut lectus hendrerit, interdum ex id, ultricies sapien. Praesent rhoncus turpis dolor, quis lobortis tortor pellentesque id. Pellentesque eget nisi laoreet, fringilla augue eu, cursus risus. Integer consectetur ornare laoreet. Praesent ligula sem, tincidunt at ligula at, condimentum venenatis tortor. Nam laoreet enim leo, sed finibus lorem egestas vel. Maecenas varius a leo non placerat. Donec scelerisque, risus vel finibus ornare, arcu ligula interdum justo, in ultricies urna mi et neque. Curabitur sed sem dui. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Maecenas eget laoreet nisi. Nam rhoncus sapien ac interdum sagittis. Nulla fermentum sem nec tellus dignissim lacinia. Curabitur ornare lectus non dictum laoreet. Praesent tempor risus at tortor tempor finibus. Cras id dolor mi. Mauris ut mi quis est vehicula molestie. Mauris eu varius urna. Integer sodales nunc at risus rutrum eleifend. In sed bibendum lectus. Morbi ipsum sapien, blandit in dignissim eu, ultrices non odio. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Nulla eget volutpat ligula, at elementum dui. Aliquam sed enim scelerisque, facilisis magna vitae, dignissim enim. Pellentesque non ultricies urna. Proin fermentum erat semper efficitur auctor. Vestibulum posuere non tortor vitae tincidunt.` -) - -var lipTenIcedTea = strings.Repeat(lip, 10) - -func poolbench(f BufferFactory) { - buf := f.Get() - buf.MustWrite([]byte(hello64)) - f.MustPut(buf) - buf = f.Get() - buf.MustWrite([]byte(hello)) - f.MustPut(buf) - buf = f.Get() - buf.MustWrite([]byte(lip)) - f.MustPut(buf) - buf = f.Get() - buf.MustWrite([]byte(lipTenIcedTea)) - f.MustPut(buf) -} - -func parabench(pb *testing.PB, f BufferFactory) { - for pb.Next() { - poolbench(f) - } -} - -func sized(b *testing.B, size int, para int) { - b.ReportAllocs() - - f := NewBufferFactory() - if size != 0 { - f = NewSizedBufferFactory(size) - } - - if para != 0 { - b.SetParallelism(para) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { parabench(pb, f) }) - return - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - poolbench(f) - } -} - -// ------------------------------------------------------------ -// ------------------------------------------------------------ -// ------------------------------------------------------------ - -func sizedbytesbench(initial func() []byte) { - buf := bytes.NewBuffer(initial()) - buf.Write([]byte(hello64)) - buf = bytes.NewBuffer(initial()) - buf.Write([]byte(hello)) - buf = bytes.NewBuffer(initial()) - buf.Write([]byte(lip)) - buf = bytes.NewBuffer(initial()) - buf.Write([]byte(lipTenIcedTea)) -} - -func parabytesbench(pb *testing.PB, size int) { - for pb.Next() { - if size != 0 { - sizedbytesbench(func() []byte { return make([]byte, 0, size) }) - } else { - sizedbytesbench(func() []byte { return nil }) + t.Run("BufferWithParent", func(t *testing.T) { + t.Parallel() + buf := bf.Get() + buf.MustWrite([]byte("hello")) + buf.MustWrite([]byte("world")) + buf.MustWrite([]byte("!")) + if buf.p != nil { + t.Fatalf("The parent is not nil without assignment") } - } -} - -func bytessized(b *testing.B, size int, para int) { - if para != 0 { - b.SetParallelism(para) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { parabytesbench(pb, size) }) - return - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - sizedbytesbench(func() []byte { return nil }) - } -} - -var divier = []byte(strings.Repeat("-", 130) + "\n") - -func bigDivide(n int) { - if n > 1 { - _, _ = os.Stdout.Write([]byte{'\n'}) - } - for i := 0; i < n; i++ { - _, _ = os.Stdout.Write(divier) - } - - _, _ = os.Stdout.Write([]byte{'\n'}) -} - -func benchmarkBufferFactory(b *testing.B) { - b.ReportAllocs() - concurrency := []int{0, 2, 4, 8} - size := []int{64, 1024, 4096, 65536} - - defer bigDivide(2) - - for _, c := range concurrency { - for _, s := range size { - label := fmt.Sprintf("Concurrent-x%d-%d-bytes", c, s) - if c == 0 { - label = fmt.Sprintf("SingleProc-%d-bytes", s) - } - b.Run(label, func(b *testing.B) { sized(b, s, c) }) + buf = buf.WithParent(&bf) + if buf.p == nil { + t.Errorf("The parent is nil after assignment") + } + if buf.p != &bf { + t.Errorf("The parent is not the buffer factory") + } + if buf.String() != "helloworld!" { + t.Fatalf("The string is not 'helloworld!': %v", buf.String()) + } + bf.MustPut(buf) + }) + t.Run("BufferClose", func(t *testing.T) { + t.Parallel() + buf := bf.Get() + buf.MustWrite([]byte("hello")) + buf.MustWrite([]byte("world")) + buf.MustWrite([]byte("!")) + if err := buf.Close(); err == nil { + t.Fatal("The error is nil after closing the buffer with no parent") + } + if buf.String() != "helloworld!" { + t.Fatalf("The string is not 'helloworld!' after unsuccessful close: %v", buf.String()) + } + buf = buf.WithParent(&bf) + if err := buf.Close(); err != nil { + t.Fatal(err) + } + }) + t.Run("BufferCannotClose", func(t *testing.T) { + t.Parallel() + buf := bf.Get() + buf = buf.WithParent(&bf) + buf.MustWrite([]byte("hello")) + buf.MustWrite([]byte("world")) + buf.MustWrite([]byte("!")) + if buf.String() != "helloworld!" { + t.Fatalf("The string is not 'helloworld!': %v", buf.String()) + } + bf.MustPut(buf) + if buf.Close() == nil { + t.Fatal("The error is nil after closing an already returned buffer with parent") } - _, _ = os.Stdout.Write(divier) - } -} - -func benchmarkNewBytesBuffer(b *testing.B) { - b.ReportAllocs() - concurrency := []int{0, 2, 4, 8} - size := []int{64, 1024, 4096, 65536} - - defer bigDivide(1) - for _, c := range concurrency { - for _, s := range size { - label := fmt.Sprintf("Concurrent-x%d-%d-bytes", c, s) - if c == 0 { - label = fmt.Sprintf("SingleProc-%d-bytes", s) + defer func() { + if r := recover(); r == nil { + t.Fatal("The panic is not nil") } - b.Run(label, func(b *testing.B) { bytessized(b, s, c) }) + }() + + buf = nil + if buf.Close() == nil { + t.Fatal("The error is nil after closing a nil buffer") } - _, _ = os.Stdout.Write(divier) - } + }) }