diff --git a/object/slicer/options.go b/object/slicer/options.go index ac6d46f4..50540292 100644 --- a/object/slicer/options.go +++ b/object/slicer/options.go @@ -19,6 +19,9 @@ type Options struct { sessionToken *session.Object bearerToken *bearer.Token + + payloadSizeFixed bool + payloadSize uint64 } // SetObjectPayloadLimit specifies data size limit for produced physically @@ -61,6 +64,17 @@ func (x *Options) SetPayloadBuffer(payloadBuffer []byte) { x.payloadBuffer = payloadBuffer } +// SetPayloadSize allows to specify object's payload size known in advance. If +// set, reading functions will read at least size bytes while writing functions +// will expect exactly size bytes. +// +// If the size is known, the option is recommended as it improves the +// performance of the application using the [Slicer]. +func (x *Options) SetPayloadSize(size uint64) { + x.payloadSizeFixed = true + x.payloadSize = size +} + // ObjectPayloadLimit returns required max object size. func (x *Options) ObjectPayloadLimit() uint64 { return x.objectPayloadLimit diff --git a/object/slicer/slicer.go b/object/slicer/slicer.go index 73edac75..76b27d53 100644 --- a/object/slicer/slicer.go +++ b/object/slicer/slicer.go @@ -213,6 +213,10 @@ func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.R return rootID, fmt.Errorf("read payload chunk: %w", err) } + if writer.payloadSizeFixed && writer.rootMeta.length < writer.payloadSize { + return oid.ID{}, io.ErrUnexpectedEOF + } + if err = writer.Close(); err != nil { return rootID, fmt.Errorf("writer close: %w", err) } @@ -287,10 +291,16 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec rootMeta: newDynamicObjectMetadata(opts.withHomoChecksum), childMeta: newDynamicObjectMetadata(opts.withHomoChecksum), payloadSizeLimit: childPayloadSizeLimit(opts), + payloadSizeFixed: opts.payloadSizeFixed, + payloadSize: opts.payloadSize, prmObjectPutInit: prm, stubObject: &stubObject, } + if res.payloadSizeFixed && res.payloadSize < res.payloadSizeLimit { + res.payloadSizeLimit = res.payloadSize + } + res.payloadBuffer = opts.payloadBuffer res.rootMeta.reset() res.metaWriter = &res.rootMeta @@ -320,6 +330,8 @@ type PayloadWriter struct { // max payload size of produced objects in bytes payloadSizeLimit uint64 + payloadSizeFixed bool + payloadSize uint64 metaWriter io.Writer @@ -331,6 +343,8 @@ type PayloadWriter struct { stubObject *object.Object } +var errPayloadSizeExceeded = errors.New("payload size exceeded") + // Write writes next chunk of the object data. Concatenation of all chunks forms // the payload of the final object. When the data is over, the PayloadWriter // should be closed. @@ -340,6 +354,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { return 0, nil } + if x.payloadSizeFixed && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize { + return 0, errPayloadSizeExceeded + } + buffered := x.rootMeta.length if x.withSplit { buffered = x.childMeta.length @@ -432,6 +450,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { // Close finalizes object with written payload data, saves the object and closes // the stream. Reference to the stored object can be obtained by ID method. func (x *PayloadWriter) Close() error { + if x.payloadSizeFixed && x.rootMeta.length < x.payloadSize { + return io.ErrUnexpectedEOF + } + buffered := x.rootMeta.length if x.withSplit { buffered = x.childMeta.length diff --git a/object/slicer/slicer_test.go b/object/slicer/slicer_test.go index e170ed62..f0519bf0 100644 --- a/object/slicer/slicer_test.go +++ b/object/slicer/slicer_test.go @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) } func testSlicer(t *testing.T, size, sizeLimit uint64) { + testSlicerWithKnownSize(t, size, sizeLimit, true) + testSlicerWithKnownSize(t, size, sizeLimit, false) +} + +func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) { in, opts := randomInput(t, size, sizeLimit) + if known { + opts.SetPayloadSize(uint64(len(in.payload))) + } + checker := &slicedObjectChecker{ opts: opts, tb: t, @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) { for i := object.TypeRegular; i <= object.TypeLock; i++ { in.objectType = i - t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) { + t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) { testSlicerByHeaderType(t, checker, in, opts) }) } @@ -1008,3 +1017,131 @@ func TestOptions_SetPayloadBuffer(t *testing.T) { }) } } + +func TestKnownPayloadSize(t *testing.T) { + ctx := context.Background() + t.Run("overflow", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 21)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorContains(t, err, "payload size exceeded") + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + for i := byte(0); i < 21; i++ { + _, err = w.Write([]byte{1}) + if i < 20 { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, "payload size exceeded") + } + } + }) + }) + + t.Run("flaw", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 19)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + _, err = w.Write(make([]byte, 19)) + require.NoError(t, err) + + err = w.Close() + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + }) + }) +} + +func BenchmarkKnownPayloadSize(b *testing.B) { + ctx := context.Background() + for _, tc := range []struct { + sizeLimit uint64 + size uint64 + }{ + {sizeLimit: 1 << 10, size: 1}, + {sizeLimit: 1 << 10, size: 1 << 10}, + {sizeLimit: 1 << 10, size: 10 << 10}, + } { + b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) { + b.Run("read", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts) + require.NoError(b, err) + } + }) + + b.Run("write", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts) + require.NoError(b, err) + + _, err = w.Write(payload) + if err == nil { + err = w.Close() + } + require.NoError(b, err) + } + }) + }) + } +}