From 4a8baaa5577cf419f71f50b46bef4598c463ffc9 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 19 Dec 2023 20:30:08 +0400 Subject: [PATCH] slicer: Allow to specify known size of the full payload Previously, there was no ability to specify pre-known size of payload of the object to be sliced. The main drawback was the slicer's inability to determine the optimal buffer size needed to read the payload. Therefore, the slicer always allocated a buffer of `MaxObjectSize` size. With this behavior, the smaller the size of the loaded payload (down to 0), the more memory was wasted. For example, this could lead to 64MB allocations for 1K objects which is a 65,000-fold excess. Now the slicer supports an optional fixed payload size via `Options.SetPayloadSize` method. When used, this option tunes behavior to allocate payload buffer according to the size. The option could be used with files, in-memory data and other cases to improve application performance. Closes #540. Signed-off-by: Leonard Lyubich --- object/slicer/options.go | 14 ++++ object/slicer/slicer.go | 22 ++++++ object/slicer/slicer_test.go | 139 ++++++++++++++++++++++++++++++++++- 3 files changed, 174 insertions(+), 1 deletion(-) diff --git a/object/slicer/options.go b/object/slicer/options.go index ac6d46f4..50540292 100644 --- a/object/slicer/options.go +++ b/object/slicer/options.go @@ -19,6 +19,9 @@ type Options struct { sessionToken *session.Object bearerToken *bearer.Token + + payloadSizeFixed bool + payloadSize uint64 } // SetObjectPayloadLimit specifies data size limit for produced physically @@ -61,6 +64,17 @@ func (x *Options) SetPayloadBuffer(payloadBuffer []byte) { x.payloadBuffer = payloadBuffer } +// SetPayloadSize allows to specify object's payload size known in advance. If +// set, reading functions will read at least size bytes while writing functions +// will expect exactly size bytes. +// +// If the size is known, the option is recommended as it improves the +// performance of the application using the [Slicer]. +func (x *Options) SetPayloadSize(size uint64) { + x.payloadSizeFixed = true + x.payloadSize = size +} + // ObjectPayloadLimit returns required max object size. func (x *Options) ObjectPayloadLimit() uint64 { return x.objectPayloadLimit diff --git a/object/slicer/slicer.go b/object/slicer/slicer.go index 73edac75..76b27d53 100644 --- a/object/slicer/slicer.go +++ b/object/slicer/slicer.go @@ -213,6 +213,10 @@ func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.R return rootID, fmt.Errorf("read payload chunk: %w", err) } + if writer.payloadSizeFixed && writer.rootMeta.length < writer.payloadSize { + return oid.ID{}, io.ErrUnexpectedEOF + } + if err = writer.Close(); err != nil { return rootID, fmt.Errorf("writer close: %w", err) } @@ -287,10 +291,16 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec rootMeta: newDynamicObjectMetadata(opts.withHomoChecksum), childMeta: newDynamicObjectMetadata(opts.withHomoChecksum), payloadSizeLimit: childPayloadSizeLimit(opts), + payloadSizeFixed: opts.payloadSizeFixed, + payloadSize: opts.payloadSize, prmObjectPutInit: prm, stubObject: &stubObject, } + if res.payloadSizeFixed && res.payloadSize < res.payloadSizeLimit { + res.payloadSizeLimit = res.payloadSize + } + res.payloadBuffer = opts.payloadBuffer res.rootMeta.reset() res.metaWriter = &res.rootMeta @@ -320,6 +330,8 @@ type PayloadWriter struct { // max payload size of produced objects in bytes payloadSizeLimit uint64 + payloadSizeFixed bool + payloadSize uint64 metaWriter io.Writer @@ -331,6 +343,8 @@ type PayloadWriter struct { stubObject *object.Object } +var errPayloadSizeExceeded = errors.New("payload size exceeded") + // Write writes next chunk of the object data. Concatenation of all chunks forms // the payload of the final object. When the data is over, the PayloadWriter // should be closed. @@ -340,6 +354,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { return 0, nil } + if x.payloadSizeFixed && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize { + return 0, errPayloadSizeExceeded + } + buffered := x.rootMeta.length if x.withSplit { buffered = x.childMeta.length @@ -432,6 +450,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { // Close finalizes object with written payload data, saves the object and closes // the stream. Reference to the stored object can be obtained by ID method. func (x *PayloadWriter) Close() error { + if x.payloadSizeFixed && x.rootMeta.length < x.payloadSize { + return io.ErrUnexpectedEOF + } + buffered := x.rootMeta.length if x.withSplit { buffered = x.childMeta.length diff --git a/object/slicer/slicer_test.go b/object/slicer/slicer_test.go index e170ed62..f0519bf0 100644 --- a/object/slicer/slicer_test.go +++ b/object/slicer/slicer_test.go @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) } func testSlicer(t *testing.T, size, sizeLimit uint64) { + testSlicerWithKnownSize(t, size, sizeLimit, true) + testSlicerWithKnownSize(t, size, sizeLimit, false) +} + +func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) { in, opts := randomInput(t, size, sizeLimit) + if known { + opts.SetPayloadSize(uint64(len(in.payload))) + } + checker := &slicedObjectChecker{ opts: opts, tb: t, @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) { for i := object.TypeRegular; i <= object.TypeLock; i++ { in.objectType = i - t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) { + t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) { testSlicerByHeaderType(t, checker, in, opts) }) } @@ -1008,3 +1017,131 @@ func TestOptions_SetPayloadBuffer(t *testing.T) { }) } } + +func TestKnownPayloadSize(t *testing.T) { + ctx := context.Background() + t.Run("overflow", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 21)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorContains(t, err, "payload size exceeded") + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + for i := byte(0); i < 21; i++ { + _, err = w.Write([]byte{1}) + if i < 20 { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, "payload size exceeded") + } + } + }) + }) + + t.Run("flaw", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 19)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + _, err = w.Write(make([]byte, 19)) + require.NoError(t, err) + + err = w.Close() + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + }) + }) +} + +func BenchmarkKnownPayloadSize(b *testing.B) { + ctx := context.Background() + for _, tc := range []struct { + sizeLimit uint64 + size uint64 + }{ + {sizeLimit: 1 << 10, size: 1}, + {sizeLimit: 1 << 10, size: 1 << 10}, + {sizeLimit: 1 << 10, size: 10 << 10}, + } { + b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) { + b.Run("read", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts) + require.NoError(b, err) + } + }) + + b.Run("write", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts) + require.NoError(b, err) + + _, err = w.Write(payload) + if err == nil { + err = w.Close() + } + require.NoError(b, err) + } + }) + }) + } +}