From 25b17e487543fa954b7236054892aee1a9d5ef79 Mon Sep 17 00:00:00 2001 From: Artem Poltorzhitskiy Date: Sat, 10 Feb 2024 20:24:56 +0100 Subject: [PATCH] Optimization: blobs query (#125) --- internal/storage/postgres/blob_log.go | 102 ++++++++----- internal/storage/postgres/blob_log_test.go | 158 +++++++++++++++++++++ internal/storage/postgres/storage_test.go | 143 ------------------- 3 files changed, 225 insertions(+), 178 deletions(-) create mode 100644 internal/storage/postgres/blob_log_test.go diff --git a/internal/storage/postgres/blob_log.go b/internal/storage/postgres/blob_log.go index 3cb83c28..915799ad 100644 --- a/internal/storage/postgres/blob_log.go +++ b/internal/storage/postgres/blob_log.go @@ -26,14 +26,19 @@ func NewBlobLog(db *database.Bun) *BlobLog { } func (bl *BlobLog) ByNamespace(ctx context.Context, nsId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) { - query := bl.DB().NewSelect().Model(&logs). - Where("blob_log.namespace_id = ?", nsId). - Relation("Signer"). - Relation("Tx") - - query = blobLogFilters(query, fltrs) - - err = query.Scan(ctx) + blobsQuery := bl.DB().NewSelect().Model((*storage.BlobLog)(nil)). + Where("blob_log.namespace_id = ?", nsId) + + blobsQuery = blobLogFilters(blobsQuery, fltrs) + + err = bl.DB().NewSelect(). + ColumnExpr("blob_log.*"). + ColumnExpr("signer.address as signer__address"). + ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types"). + TableExpr("(?) as blob_log", blobsQuery). + Join("left join address as signer on signer.id = blob_log.signer_id"). + Join("left join tx on tx.id = blob_log.tx_id"). + Scan(ctx, &logs) return } @@ -42,13 +47,11 @@ func (bl *BlobLog) ByProviders(ctx context.Context, providers []storage.RollupPr return nil, nil } - query := bl.DB().NewSelect().Model(&logs). - Relation("Signer"). - Relation("Namespace"). - Relation("Tx") + blobQuery := bl.DB().NewSelect(). + Model((*storage.BlobLog)(nil)) for i := range providers { - query = query.WhereGroup(" OR ", func(sq *bun.SelectQuery) *bun.SelectQuery { + blobQuery = blobQuery.WhereGroup(" OR ", func(sq *bun.SelectQuery) *bun.SelectQuery { sq.Where("blob_log.signer_id = ?", providers[i].AddressId) if providers[i].NamespaceId > 0 { sq.Where("blob_log.namespace_id = ?", providers[i].NamespaceId) @@ -57,47 +60,76 @@ func (bl *BlobLog) ByProviders(ctx context.Context, providers []storage.RollupPr }) } - query = blobLogFilters(query, fltrs) - - err = query.Scan(ctx) + blobQuery = blobLogFilters(blobQuery, fltrs) + + err = bl.DB().NewSelect(). + ColumnExpr("blob_log.*"). + ColumnExpr("signer.address as signer__address"). + ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time"). + ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types"). + TableExpr("(?) as blob_log", blobQuery). + Join("left join address as signer on signer.id = blob_log.signer_id"). + Join("left join namespace as ns on ns.id = blob_log.namespace_id"). + Join("left join tx on tx.id = blob_log.tx_id"). + Scan(ctx, &logs) return } func (bl *BlobLog) BySigner(ctx context.Context, signerId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) { - query := bl.DB().NewSelect().Model(&logs). - Relation("Namespace"). - Relation("Tx"). + blobQuery := bl.DB().NewSelect(). + Model((*storage.BlobLog)(nil)). Where("signer_id = ?", signerId) - query = blobLogFilters(query, fltrs) + blobQuery = blobLogFilters(blobQuery, fltrs) - err = query.Scan(ctx) + err = bl.DB().NewSelect(). + ColumnExpr("blob_log.*"). + ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time"). + ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types"). + TableExpr("(?) as blob_log", blobQuery). + Join("left join namespace as ns on ns.id = blob_log.namespace_id"). + Join("left join tx on tx.id = blob_log.tx_id"). + Scan(ctx, &logs) return } func (bl *BlobLog) ByTxId(ctx context.Context, txId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) { - query := bl.DB().NewSelect().Model(&logs). - Relation("Namespace"). - Relation("Tx"). - Relation("Signer"). + blobLogQuery := bl.DB().NewSelect(). + Model((*storage.BlobLog)(nil)). Where("tx_id = ?", txId) - query = blobLogFilters(query, fltrs) - - err = query.Scan(ctx) + blobLogQuery = blobLogFilters(blobLogQuery, fltrs) + + err = bl.DB().NewSelect(). + ColumnExpr("blob_log.*"). + ColumnExpr("signer.address as signer__address"). + ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time"). + ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types"). + TableExpr("(?) as blob_log", blobLogQuery). + Join("left join address as signer on signer.id = blob_log.signer_id"). + Join("left join namespace as ns on ns.id = blob_log.namespace_id"). + Join("left join tx on tx.id = blob_log.tx_id"). + Scan(ctx, &logs) return } func (bl *BlobLog) ByHeight(ctx context.Context, height types.Level, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) { - query := bl.DB().NewSelect().Model(&logs). - Relation("Namespace"). - Relation("Tx"). - Relation("Signer"). + blobLogQuery := bl.DB().NewSelect(). + Model((*storage.BlobLog)(nil)). Where("blob_log.height = ?", height) - query = blobLogFilters(query, fltrs) - - err = query.Scan(ctx) + blobLogQuery = blobLogFilters(blobLogQuery, fltrs) + + err = bl.DB().NewSelect(). + ColumnExpr("blob_log.*"). + ColumnExpr("signer.address as signer__address"). + ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time"). + ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types"). + TableExpr("(?) as blob_log", blobLogQuery). + Join("left join address as signer on signer.id = blob_log.signer_id"). + Join("left join namespace as ns on ns.id = blob_log.namespace_id"). + Join("left join tx on tx.id = blob_log.tx_id"). + Scan(ctx, &logs) return } diff --git a/internal/storage/postgres/blob_log_test.go b/internal/storage/postgres/blob_log_test.go new file mode 100644 index 00000000..7a2d98e6 --- /dev/null +++ b/internal/storage/postgres/blob_log_test.go @@ -0,0 +1,158 @@ +package postgres + +import ( + "context" + "time" + + "github.com/celenium-io/celestia-indexer/internal/storage" + sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" +) + +func (s *StorageTestSuite) TestBlobLogsByNamespace() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + logs, err := s.storage.BlobLogs.ByNamespace(ctx, 2, storage.BlobLogFilters{ + Limit: 2, + Offset: 0, + Sort: sdk.SortOrderAsc, + SortBy: "size", + }) + s.Require().NoError(err) + s.Require().Len(logs, 2) + + log := logs[0] + s.Require().EqualValues(1, log.Id) + s.Require().EqualValues(0, log.Height) + s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) + s.Require().EqualValues(10, log.Size) + s.Require().EqualValues(2, log.NamespaceId) + s.Require().EqualValues(1, log.SignerId) + s.Require().EqualValues(1, log.MsgId) + s.Require().EqualValues(4, log.TxId) + + s.Require().NotNil(log.Signer) + s.Require().EqualValues("celestia1mm8yykm46ec3t0dgwls70g0jvtm055wk9ayal8", log.Signer.Address) + + s.Require().NotNil(log.Tx) + s.Require().EqualValues(4, log.Tx.Id) +} + +func (s *StorageTestSuite) TestBlobLogsSigner() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + logs, err := s.storage.BlobLogs.BySigner(ctx, 1, storage.BlobLogFilters{ + Limit: 2, + Offset: 0, + Sort: sdk.SortOrderAsc, + SortBy: "size", + }) + s.Require().NoError(err) + s.Require().Len(logs, 2) + + log := logs[0] + s.Require().EqualValues(1, log.Id) + s.Require().EqualValues(0, log.Height) + s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) + s.Require().EqualValues(10, log.Size) + s.Require().EqualValues(2, log.NamespaceId) + s.Require().EqualValues(1, log.SignerId) + s.Require().EqualValues(1, log.MsgId) + s.Require().EqualValues(4, log.TxId) + s.Require().NotNil(log.Namespace) + s.Require().NotNil(log.TxId) +} + +func (s *StorageTestSuite) TestBlobLogsTx() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + logs, err := s.storage.BlobLogs.ByTxId(ctx, 4, storage.BlobLogFilters{ + Limit: 2, + Offset: 0, + Sort: sdk.SortOrderAsc, + SortBy: "size", + }) + s.Require().NoError(err) + s.Require().Len(logs, 2) + + log := logs[0] + s.Require().EqualValues(1, log.Id) + s.Require().EqualValues(0, log.Height) + s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) + s.Require().EqualValues(10, log.Size) + s.Require().EqualValues(2, log.NamespaceId) + s.Require().EqualValues(1, log.SignerId) + s.Require().EqualValues(1, log.MsgId) + s.Require().EqualValues(4, log.TxId) + s.Require().NotNil(log.Namespace) + s.Require().NotNil(log.TxId) + s.Require().NotNil(log.Signer) +} + +func (s *StorageTestSuite) TestCountBlobLogsTx() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + count, err := s.storage.BlobLogs.CountByTxId(ctx, 4) + s.Require().NoError(err) + s.Require().EqualValues(count, 2) +} + +func (s *StorageTestSuite) TestBlobLogsByHeight() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + logs, err := s.storage.BlobLogs.ByHeight(ctx, 1000, storage.BlobLogFilters{ + Limit: 2, + Offset: 0, + Sort: sdk.SortOrderDesc, + SortBy: "size", + }) + s.Require().NoError(err) + s.Require().Len(logs, 2) + + log := logs[0] + s.Require().EqualValues(2, log.Id) + s.Require().EqualValues(1000, log.Height) + s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) + s.Require().EqualValues(20, log.Size) + s.Require().EqualValues(1, log.NamespaceId) + s.Require().EqualValues(1, log.SignerId) + s.Require().EqualValues(1, log.MsgId) + s.Require().EqualValues(3, log.TxId) + s.Require().NotNil(log.Namespace) + s.Require().NotNil(log.Tx) + s.Require().NotNil(log.Signer) +} + +func (s *StorageTestSuite) TestCountBlobLogsByHeight() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + count, err := s.storage.BlobLogs.CountByHeight(ctx, 1000) + s.Require().NoError(err) + s.Require().EqualValues(count, 4) +} + +func (s *StorageTestSuite) TestBlobLogsByProviders() { + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + logs, err := s.storage.BlobLogs.ByProviders(ctx, []storage.RollupProvider{ + { + AddressId: 1, + NamespaceId: 1, + }, + }, storage.BlobLogFilters{ + Limit: 10, + }) + s.Require().NoError(err) + s.Require().Len(logs, 1) + + log := logs[0] + s.Require().NotNil(log.Tx) + s.Require().NotNil(log.Namespace) + s.Require().NotNil(log.Signer) +} diff --git a/internal/storage/postgres/storage_test.go b/internal/storage/postgres/storage_test.go index 730695ca..3d8eed21 100644 --- a/internal/storage/postgres/storage_test.go +++ b/internal/storage/postgres/storage_test.go @@ -645,149 +645,6 @@ func (s *StorageTestSuite) TestTxGas() { s.Require().EqualValues("1", tx1.GasPrice.String()) } -func (s *StorageTestSuite) TestBlobLogsByNamespace() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - logs, err := s.storage.BlobLogs.ByNamespace(ctx, 2, storage.BlobLogFilters{ - Limit: 2, - Offset: 0, - Sort: sdk.SortOrderAsc, - SortBy: "size", - }) - s.Require().NoError(err) - s.Require().Len(logs, 2) - - log := logs[0] - s.Require().EqualValues(1, log.Id) - s.Require().EqualValues(0, log.Height) - s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) - s.Require().EqualValues(10, log.Size) - s.Require().EqualValues(2, log.NamespaceId) - s.Require().EqualValues(1, log.SignerId) - s.Require().EqualValues(1, log.MsgId) - s.Require().EqualValues(4, log.TxId) - - s.Require().NotNil(log.Signer) - s.Require().EqualValues("celestia1mm8yykm46ec3t0dgwls70g0jvtm055wk9ayal8", log.Signer.Address) - - s.Require().NotNil(log.Tx) -} - -func (s *StorageTestSuite) TestBlobLogsSigner() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - logs, err := s.storage.BlobLogs.BySigner(ctx, 1, storage.BlobLogFilters{ - Limit: 2, - Offset: 0, - Sort: sdk.SortOrderAsc, - SortBy: "size", - }) - s.Require().NoError(err) - s.Require().Len(logs, 2) - - log := logs[0] - s.Require().EqualValues(1, log.Id) - s.Require().EqualValues(0, log.Height) - s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) - s.Require().EqualValues(10, log.Size) - s.Require().EqualValues(2, log.NamespaceId) - s.Require().EqualValues(1, log.SignerId) - s.Require().EqualValues(1, log.MsgId) - s.Require().EqualValues(4, log.TxId) - s.Require().NotNil(log.Namespace) - s.Require().NotNil(log.TxId) -} - -func (s *StorageTestSuite) TestBlobLogsTx() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - logs, err := s.storage.BlobLogs.ByTxId(ctx, 4, storage.BlobLogFilters{ - Limit: 2, - Offset: 0, - Sort: sdk.SortOrderAsc, - SortBy: "size", - }) - s.Require().NoError(err) - s.Require().Len(logs, 2) - - log := logs[0] - s.Require().EqualValues(1, log.Id) - s.Require().EqualValues(0, log.Height) - s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) - s.Require().EqualValues(10, log.Size) - s.Require().EqualValues(2, log.NamespaceId) - s.Require().EqualValues(1, log.SignerId) - s.Require().EqualValues(1, log.MsgId) - s.Require().EqualValues(4, log.TxId) - s.Require().NotNil(log.Namespace) - s.Require().NotNil(log.TxId) - s.Require().NotNil(log.Signer) -} - -func (s *StorageTestSuite) TestCountBlobLogsTx() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - count, err := s.storage.BlobLogs.CountByTxId(ctx, 4) - s.Require().NoError(err) - s.Require().EqualValues(count, 2) -} - -func (s *StorageTestSuite) TestBlobLogsByHeight() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - logs, err := s.storage.BlobLogs.ByHeight(ctx, 1000, storage.BlobLogFilters{ - Limit: 2, - Offset: 0, - Sort: sdk.SortOrderDesc, - SortBy: "size", - }) - s.Require().NoError(err) - s.Require().Len(logs, 2) - - log := logs[0] - s.Require().EqualValues(2, log.Id) - s.Require().EqualValues(1000, log.Height) - s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment) - s.Require().EqualValues(20, log.Size) - s.Require().EqualValues(1, log.NamespaceId) - s.Require().EqualValues(1, log.SignerId) - s.Require().EqualValues(1, log.MsgId) - s.Require().EqualValues(3, log.TxId) - s.Require().NotNil(log.Namespace) - s.Require().NotNil(log.TxId) - s.Require().NotNil(log.Signer) -} - -func (s *StorageTestSuite) TestCountBlobLogsByHeight() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - count, err := s.storage.BlobLogs.CountByHeight(ctx, 1000) - s.Require().NoError(err) - s.Require().EqualValues(count, 4) -} - -func (s *StorageTestSuite) TestBlobLogsByProviders() { - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - logs, err := s.storage.BlobLogs.ByProviders(ctx, []storage.RollupProvider{ - { - AddressId: 1, - NamespaceId: 1, - }, - }, storage.BlobLogFilters{ - Limit: 10, - }) - s.Require().NoError(err) - s.Require().Len(logs, 1) -} - func (s *StorageTestSuite) TestValidatorByAddress() { ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) defer ctxCancel()