Skip to content

Commit

Permalink
Optimization: blobs query (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Feb 10, 2024
1 parent bab4939 commit 25b17e4
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 178 deletions.
102 changes: 67 additions & 35 deletions internal/storage/postgres/blob_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ func NewBlobLog(db *database.Bun) *BlobLog {
}

func (bl *BlobLog) ByNamespace(ctx context.Context, nsId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) {
query := bl.DB().NewSelect().Model(&logs).
Where("blob_log.namespace_id = ?", nsId).
Relation("Signer").
Relation("Tx")

query = blobLogFilters(query, fltrs)

err = query.Scan(ctx)
blobsQuery := bl.DB().NewSelect().Model((*storage.BlobLog)(nil)).
Where("blob_log.namespace_id = ?", nsId)

blobsQuery = blobLogFilters(blobsQuery, fltrs)

err = bl.DB().NewSelect().
ColumnExpr("blob_log.*").
ColumnExpr("signer.address as signer__address").
ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types").
TableExpr("(?) as blob_log", blobsQuery).
Join("left join address as signer on signer.id = blob_log.signer_id").
Join("left join tx on tx.id = blob_log.tx_id").
Scan(ctx, &logs)
return
}

Expand All @@ -42,13 +47,11 @@ func (bl *BlobLog) ByProviders(ctx context.Context, providers []storage.RollupPr
return nil, nil
}

query := bl.DB().NewSelect().Model(&logs).
Relation("Signer").
Relation("Namespace").
Relation("Tx")
blobQuery := bl.DB().NewSelect().
Model((*storage.BlobLog)(nil))

for i := range providers {
query = query.WhereGroup(" OR ", func(sq *bun.SelectQuery) *bun.SelectQuery {
blobQuery = blobQuery.WhereGroup(" OR ", func(sq *bun.SelectQuery) *bun.SelectQuery {
sq.Where("blob_log.signer_id = ?", providers[i].AddressId)
if providers[i].NamespaceId > 0 {
sq.Where("blob_log.namespace_id = ?", providers[i].NamespaceId)
Expand All @@ -57,47 +60,76 @@ func (bl *BlobLog) ByProviders(ctx context.Context, providers []storage.RollupPr
})
}

query = blobLogFilters(query, fltrs)

err = query.Scan(ctx)
blobQuery = blobLogFilters(blobQuery, fltrs)

err = bl.DB().NewSelect().
ColumnExpr("blob_log.*").
ColumnExpr("signer.address as signer__address").
ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time").
ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types").
TableExpr("(?) as blob_log", blobQuery).
Join("left join address as signer on signer.id = blob_log.signer_id").
Join("left join namespace as ns on ns.id = blob_log.namespace_id").
Join("left join tx on tx.id = blob_log.tx_id").
Scan(ctx, &logs)
return
}

func (bl *BlobLog) BySigner(ctx context.Context, signerId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) {
query := bl.DB().NewSelect().Model(&logs).
Relation("Namespace").
Relation("Tx").
blobQuery := bl.DB().NewSelect().
Model((*storage.BlobLog)(nil)).
Where("signer_id = ?", signerId)

query = blobLogFilters(query, fltrs)
blobQuery = blobLogFilters(blobQuery, fltrs)

err = query.Scan(ctx)
err = bl.DB().NewSelect().
ColumnExpr("blob_log.*").
ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time").
ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types").
TableExpr("(?) as blob_log", blobQuery).
Join("left join namespace as ns on ns.id = blob_log.namespace_id").
Join("left join tx on tx.id = blob_log.tx_id").
Scan(ctx, &logs)
return
}

func (bl *BlobLog) ByTxId(ctx context.Context, txId uint64, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) {
query := bl.DB().NewSelect().Model(&logs).
Relation("Namespace").
Relation("Tx").
Relation("Signer").
blobLogQuery := bl.DB().NewSelect().
Model((*storage.BlobLog)(nil)).
Where("tx_id = ?", txId)

query = blobLogFilters(query, fltrs)

err = query.Scan(ctx)
blobLogQuery = blobLogFilters(blobLogQuery, fltrs)

err = bl.DB().NewSelect().
ColumnExpr("blob_log.*").
ColumnExpr("signer.address as signer__address").
ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time").
ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types").
TableExpr("(?) as blob_log", blobLogQuery).
Join("left join address as signer on signer.id = blob_log.signer_id").
Join("left join namespace as ns on ns.id = blob_log.namespace_id").
Join("left join tx on tx.id = blob_log.tx_id").
Scan(ctx, &logs)
return
}

func (bl *BlobLog) ByHeight(ctx context.Context, height types.Level, fltrs storage.BlobLogFilters) (logs []storage.BlobLog, err error) {
query := bl.DB().NewSelect().Model(&logs).
Relation("Namespace").
Relation("Tx").
Relation("Signer").
blobLogQuery := bl.DB().NewSelect().
Model((*storage.BlobLog)(nil)).
Where("blob_log.height = ?", height)

query = blobLogFilters(query, fltrs)

err = query.Scan(ctx)
blobLogQuery = blobLogFilters(blobLogQuery, fltrs)

err = bl.DB().NewSelect().
ColumnExpr("blob_log.*").
ColumnExpr("signer.address as signer__address").
ColumnExpr("ns.id as namespace__id, ns.size as namespace__size, ns.blobs_count as namespace__blobs_count, ns.version as namespace__version, ns.namespace_id as namespace__namespace_id, ns.reserved as namespace__reserved, ns.pfb_count as namespace__pfb_count, ns.last_height as namespace__last_height, ns.last_message_time as namespace__last_message_time").
ColumnExpr("tx.id as tx__id, tx.height as tx__height, tx.time as tx__time, tx.position as tx__position, tx.gas_wanted as tx__gas_wanted, tx.gas_used as tx__gas_used, tx.timeout_height as tx__timeout_height, tx.events_count as tx__events_count, tx.messages_count as tx__messages_count, tx.fee as tx__fee, tx.status as tx__status, tx.error as tx__error, tx.codespace as tx__codespace, tx.hash as tx__hash, tx.memo as tx__memo, tx.message_types as tx__message_types").
TableExpr("(?) as blob_log", blobLogQuery).
Join("left join address as signer on signer.id = blob_log.signer_id").
Join("left join namespace as ns on ns.id = blob_log.namespace_id").
Join("left join tx on tx.id = blob_log.tx_id").
Scan(ctx, &logs)
return
}

Expand Down
158 changes: 158 additions & 0 deletions internal/storage/postgres/blob_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package postgres

import (
"context"
"time"

"github.com/celenium-io/celestia-indexer/internal/storage"
sdk "github.com/dipdup-net/indexer-sdk/pkg/storage"
)

func (s *StorageTestSuite) TestBlobLogsByNamespace() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

logs, err := s.storage.BlobLogs.ByNamespace(ctx, 2, storage.BlobLogFilters{
Limit: 2,
Offset: 0,
Sort: sdk.SortOrderAsc,
SortBy: "size",
})
s.Require().NoError(err)
s.Require().Len(logs, 2)

log := logs[0]
s.Require().EqualValues(1, log.Id)
s.Require().EqualValues(0, log.Height)
s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment)
s.Require().EqualValues(10, log.Size)
s.Require().EqualValues(2, log.NamespaceId)
s.Require().EqualValues(1, log.SignerId)
s.Require().EqualValues(1, log.MsgId)
s.Require().EqualValues(4, log.TxId)

s.Require().NotNil(log.Signer)
s.Require().EqualValues("celestia1mm8yykm46ec3t0dgwls70g0jvtm055wk9ayal8", log.Signer.Address)

s.Require().NotNil(log.Tx)
s.Require().EqualValues(4, log.Tx.Id)
}

func (s *StorageTestSuite) TestBlobLogsSigner() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

logs, err := s.storage.BlobLogs.BySigner(ctx, 1, storage.BlobLogFilters{
Limit: 2,
Offset: 0,
Sort: sdk.SortOrderAsc,
SortBy: "size",
})
s.Require().NoError(err)
s.Require().Len(logs, 2)

log := logs[0]
s.Require().EqualValues(1, log.Id)
s.Require().EqualValues(0, log.Height)
s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment)
s.Require().EqualValues(10, log.Size)
s.Require().EqualValues(2, log.NamespaceId)
s.Require().EqualValues(1, log.SignerId)
s.Require().EqualValues(1, log.MsgId)
s.Require().EqualValues(4, log.TxId)
s.Require().NotNil(log.Namespace)
s.Require().NotNil(log.TxId)
}

func (s *StorageTestSuite) TestBlobLogsTx() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

logs, err := s.storage.BlobLogs.ByTxId(ctx, 4, storage.BlobLogFilters{
Limit: 2,
Offset: 0,
Sort: sdk.SortOrderAsc,
SortBy: "size",
})
s.Require().NoError(err)
s.Require().Len(logs, 2)

log := logs[0]
s.Require().EqualValues(1, log.Id)
s.Require().EqualValues(0, log.Height)
s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment)
s.Require().EqualValues(10, log.Size)
s.Require().EqualValues(2, log.NamespaceId)
s.Require().EqualValues(1, log.SignerId)
s.Require().EqualValues(1, log.MsgId)
s.Require().EqualValues(4, log.TxId)
s.Require().NotNil(log.Namespace)
s.Require().NotNil(log.TxId)
s.Require().NotNil(log.Signer)
}

func (s *StorageTestSuite) TestCountBlobLogsTx() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

count, err := s.storage.BlobLogs.CountByTxId(ctx, 4)
s.Require().NoError(err)
s.Require().EqualValues(count, 2)
}

func (s *StorageTestSuite) TestBlobLogsByHeight() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

logs, err := s.storage.BlobLogs.ByHeight(ctx, 1000, storage.BlobLogFilters{
Limit: 2,
Offset: 0,
Sort: sdk.SortOrderDesc,
SortBy: "size",
})
s.Require().NoError(err)
s.Require().Len(logs, 2)

log := logs[0]
s.Require().EqualValues(2, log.Id)
s.Require().EqualValues(1000, log.Height)
s.Require().EqualValues("RWW7eaKKXasSGK/DS8PlpErARbl5iFs1vQIycYEAlk0=", log.Commitment)
s.Require().EqualValues(20, log.Size)
s.Require().EqualValues(1, log.NamespaceId)
s.Require().EqualValues(1, log.SignerId)
s.Require().EqualValues(1, log.MsgId)
s.Require().EqualValues(3, log.TxId)
s.Require().NotNil(log.Namespace)
s.Require().NotNil(log.Tx)
s.Require().NotNil(log.Signer)
}

func (s *StorageTestSuite) TestCountBlobLogsByHeight() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

count, err := s.storage.BlobLogs.CountByHeight(ctx, 1000)
s.Require().NoError(err)
s.Require().EqualValues(count, 4)
}

func (s *StorageTestSuite) TestBlobLogsByProviders() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

logs, err := s.storage.BlobLogs.ByProviders(ctx, []storage.RollupProvider{
{
AddressId: 1,
NamespaceId: 1,
},
}, storage.BlobLogFilters{
Limit: 10,
})
s.Require().NoError(err)
s.Require().Len(logs, 1)

log := logs[0]
s.Require().NotNil(log.Tx)
s.Require().NotNil(log.Namespace)
s.Require().NotNil(log.Signer)
}
Loading

0 comments on commit 25b17e4

Please sign in to comment.