From 529230a63ca43722dfc3bba866b961a01083eec2 Mon Sep 17 00:00:00 2001 From: Artem Poltorzhitskiy Date: Wed, 26 Jun 2024 12:27:18 +0200 Subject: [PATCH] Fix: don't send to S3 the same blobs (#224) * Fix: don't send to S3 the same blobs * Add test --- pkg/indexer/blob_saver/module.go | 31 ++++++++++++++++----------- pkg/indexer/blob_saver/module_test.go | 10 +++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/pkg/indexer/blob_saver/module.go b/pkg/indexer/blob_saver/module.go index 09c9f132..772ec125 100644 --- a/pkg/indexer/blob_saver/module.go +++ b/pkg/indexer/blob_saver/module.go @@ -44,6 +44,7 @@ type Module struct { kind string blocks *sync.Map[pkgTypes.Level, *[]blob.Blob] + blobs *sync.Map[string, struct{}] storage blob.Storage head pkgTypes.Level } @@ -57,6 +58,7 @@ func NewModule( m := Module{ BaseModule: modules.New("blob_saver"), blocks: sync.NewMap[pkgTypes.Level, *[]blob.Blob](), + blobs: sync.NewMap[string, struct{}](), kind: kind, } @@ -153,6 +155,7 @@ func (module *Module) processEndOfBlock(ctx context.Context, height pkgTypes.Lev module.head = height module.blocks.Delete(height) + module.blobs.Clear() return nil } @@ -167,20 +170,24 @@ func (module *Module) processBlob(msg *Msg) error { return errors.Wrap(err, "can't create commitment") } + blb := blob.Blob{ + Commitment: commitment, + Blob: msg.Blob, + Height: uint64(msg.Height), + } + key := blb.String() + + // skip blobs with the same commitments in the current block. + if _, ok := module.blobs.Get(key); ok { + return nil + } + module.blobs.Set(key, struct{}{}) + if blobs, ok := module.blocks.Get(msg.Height); ok { - *blobs = append(*blobs, blob.Blob{ - Commitment: commitment, - Blob: msg.Blob, - Height: uint64(msg.Height), - }) + *blobs = append(*blobs, blb) } else { - module.blocks.Set(msg.Height, &[]blob.Blob{ - { - Commitment: commitment, - Blob: msg.Blob, - Height: uint64(msg.Height), - }, - }) + + module.blocks.Set(msg.Height, &[]blob.Blob{blb}) } return nil diff --git a/pkg/indexer/blob_saver/module_test.go b/pkg/indexer/blob_saver/module_test.go index 7f9f71b9..ba963fe5 100644 --- a/pkg/indexer/blob_saver/module_test.go +++ b/pkg/indexer/blob_saver/module_test.go @@ -39,6 +39,12 @@ func TestBlobSaverModule(t *testing.T) { ShareVersion: 0, NamespaceVersion: 0, } + b2Copy := &types.Blob{ + NamespaceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x67, 0x6d}, + Data: []byte("0x676d"), + ShareVersion: 0, + NamespaceVersion: 0, + } commitment, err := base64.StdEncoding.DecodeString("uwghsElFtoHNqQ3JrsDGj8uLW456izVbegVL/AunMOw=") require.NoError(t, err, "decode commitment") @@ -83,6 +89,10 @@ func TestBlobSaverModule(t *testing.T) { Height: 101, Blob: b2, }) + input.Push(&Msg{ + Height: 101, + Blob: b2Copy, + }) input.Push(&Msg{ Height: 101, EndBlock: true,