Skip to content

Commit

Permalink
properly index latest pectra structures (consolidation & withdrawal r…
Browse files Browse the repository at this point in the history
…equests)
  • Loading branch information
pk910 committed Aug 7, 2024
1 parent f11cc6b commit fa5ec7b
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 261 deletions.
22 changes: 3 additions & 19 deletions db/consolidations.go → db/consolidation_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,12 @@ import (
"github.com/jmoiron/sqlx"
)

/*
type Consolidation struct {
SlotNumber uint64 `db:"slot_number"`
SlotRoot []byte `db:"slot_root"`
SlotIndex uint64 `db:"slot_index"`
Orphaned bool `db:"orphaned"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourceIndex uint64 `db:"source_index"`
SourcePubkey []byte `db:"source_pubkey"`
TargetIndex uint64 `db:"target_index"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
}
*/

func InsertConsolidations(consolidations []*dbtypes.Consolidation, tx *sqlx.Tx) error {
func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidations ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidations ",
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_requests ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_requests ",
}),
"(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash)",
" VALUES ",
Expand Down
74 changes: 1 addition & 73 deletions db/schema/pgsql/20240615012111_pectra-updates.sql
Original file line number Diff line number Diff line change
@@ -1,79 +1,7 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS consolidations (
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root bytea NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
source_index BIGINT NOT NULL,
target_index BIGINT NOT NULL,
epoch BIGINT NOT NULL,
CONSTRAINT consolidation_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "consolidations_source_idx"
ON public."consolidations"
("source_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_target_idx"
ON public."consolidations"
("target_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_epoch_idx"
ON public."consolidations"
("epoch" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_slot_idx"
ON public."consolidations"
("slot_number" ASC NULLS FIRST);

CREATE TABLE IF NOT EXISTS el_requests (
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root bytea NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
request_type INT NOT NULL,
source_address bytea NULL,
source_index BIGINT NULL,
source_pubkey bytea NULL,
target_index BIGINT NULL,
target_pubkey bytea NULL,
amount BIGINT NULL,
CONSTRAINT el_requests_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "el_requests_slot_idx"
ON public."el_requests"
("slot_number" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_reqtype_idx"
ON public."el_requests"
("request_type" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_sourceaddr_idx"
ON public."el_requests"
("source_address" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_sourceidx_idx"
ON public."el_requests"
("source_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_targetidx_idx"
ON public."el_requests"
("target_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_sourcekey_idx"
ON public."el_requests"
("source_pubkey" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_targetkey_idx"
ON public."el_requests"
("target_pubkey" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "el_requests_amount_idx"
ON public."el_requests"
("amount" ASC NULLS FIRST);
SELECT 'skipped';

-- +goose StatementEnd
-- +goose Down
Expand Down
22 changes: 11 additions & 11 deletions db/schema/pgsql/20240805095505_pectra-updates2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

DROP TABLE IF EXISTS public."consolidations";

CREATE TABLE IF NOT EXISTS public."consolidations" (
CREATE TABLE IF NOT EXISTS public."consolidation_requests" (
slot_number INT NOT NULL,
slot_root bytea NOT NULL,
slot_index INT NOT NULL,
Expand All @@ -18,24 +18,24 @@ CREATE TABLE IF NOT EXISTS public."consolidations" (
CONSTRAINT consolidation_pkey PRIMARY KEY (slot_root, slot_index)
);

CREATE INDEX IF NOT EXISTS "consolidations_slot_idx"
ON public."consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_slot_idx"
ON public."consolidation_requests"
("slot_number" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_source_idx"
ON public."consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_source_idx"
ON public."consolidation_requests"
("source_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_target_idx"
ON public."consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_target_idx"
ON public."consolidation_requests"
("target_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_source_addr_idx"
ON public."consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_source_addr_idx"
ON public."consolidation_requests"
("source_address" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidations_fork_idx"
ON public."consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
ON public."consolidation_requests"
("fork_idx" ASC NULLS FIRST);


Expand Down
74 changes: 1 addition & 73 deletions db/schema/sqlite/20240615012111_pectra-updates.sql
Original file line number Diff line number Diff line change
@@ -1,79 +1,7 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS consolidations (
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root BLOB NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
source_index BIGINT NOT NULL,
target_index BIGINT NOT NULL,
epoch BIGINT NOT NULL,
CONSTRAINT consolidation_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "consolidations_source_idx"
ON "consolidations"
("source_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_target_idx"
ON "consolidations"
("target_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_epoch_idx"
ON "consolidations"
("epoch" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_slot_idx"
ON "consolidations"
("slot_number" ASC);

CREATE TABLE IF NOT EXISTS el_requests (
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root BLOB NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
request_type INT NOT NULL,
source_address BLOB NULL,
source_index BIGINT NULL,
source_pubkey BLOB NULL,
target_index BIGINT NULL,
target_pubkey BLOB NULL,
amount BIGINT NULL,
CONSTRAINT el_requests_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "el_requests_slot_idx"
ON "el_requests"
("slot_number" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_reqtype_idx"
ON "el_requests"
("request_type" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_sourceaddr_idx"
ON "el_requests"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_sourceidx_idx"
ON "el_requests"
("source_index" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_targetidx_idx"
ON "el_requests"
("target_index" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_sourcekey_idx"
ON "el_requests"
("source_pubkey" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_targetkey_idx"
ON "el_requests"
("target_pubkey" ASC);

CREATE INDEX IF NOT EXISTS "el_requests_amount_idx"
ON "el_requests"
("amount" ASC);
SELECT 'skipped';

-- +goose StatementEnd
-- +goose Down
Expand Down
22 changes: 11 additions & 11 deletions db/schema/sqlite/20240805095505_pectra-updates2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

DROP TABLE IF EXISTS "consolidations";

CREATE TABLE IF NOT EXISTS "consolidations" (
CREATE TABLE IF NOT EXISTS "consolidation_requests" (
slot_number INT NOT NULL,
slot_root BLOB NOT NULL,
slot_index INT NOT NULL,
Expand All @@ -18,24 +18,24 @@ CREATE TABLE IF NOT EXISTS "consolidations" (
CONSTRAINT consolidation_pkey PRIMARY KEY (slot_root, slot_index)
);

CREATE INDEX IF NOT EXISTS "consolidations_slot_idx"
ON "consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_slot_idx"
ON "consolidation_requests"
("slot_number" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_source_idx"
ON "consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_source_idx"
ON "consolidation_requests"
("source_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_target_idx"
ON "consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_target_idx"
ON "consolidation_requests"
("target_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_source_addr_idx"
ON "consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_source_addr_idx"
ON "consolidation_requests"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "consolidations_fork_idx"
ON "consolidations"
CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
ON "consolidation_requests"
("fork_idx" ASC);


Expand Down
File renamed without changes.
24 changes: 12 additions & 12 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,18 @@ type Slashing struct {
ForkId uint64 `db:"fork_id"`
}

type Consolidation struct {
SlotNumber uint64 `db:"slot_number"`
SlotRoot []byte `db:"slot_root"`
SlotIndex uint64 `db:"slot_index"`
Orphaned bool `db:"orphaned"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourceIndex uint64 `db:"source_index"`
SourcePubkey []byte `db:"source_pubkey"`
TargetIndex uint64 `db:"target_index"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
type ConsolidationRequest struct {
SlotNumber uint64 `db:"slot_number"`
SlotRoot []byte `db:"slot_root"`
SlotIndex uint64 `db:"slot_index"`
Orphaned bool `db:"orphaned"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourceIndex *uint64 `db:"source_index"`
SourcePubkey []byte `db:"source_pubkey"`
TargetIndex *uint64 `db:"target_index"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
}

type WithdrawalRequest struct {
Expand Down
6 changes: 6 additions & 0 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ func (block *Block) GetDbSlashings(indexer *Indexer) []*dbtypes.Slashing {
return indexer.dbWriter.buildDbSlashings(block, orphaned, nil)
}

// GetDbConsolidationRequests returns the database representation of the consolidation requests in this block.
func (block *Block) GetDbConsolidationRequests(indexer *Indexer) []*dbtypes.ConsolidationRequest {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbConsolidationRequests(block, orphaned, nil)
}

// GetExecutionExtraData returns the execution extra data of this block.
func (block *Block) GetExecutionExtraData() []byte {
blockBody := block.GetBlock()
Expand Down
46 changes: 46 additions & 0 deletions indexer/beacon/block_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,52 @@ func getBlockExecutionDepositRequests(v *spec.VersionedSignedBeaconBlock) ([]*el
}
}

func getBlockExecutionConsolidationRequests(v *spec.VersionedSignedBeaconBlock) ([]*electra.ConsolidationRequest, error) {
switch v.Version {
case spec.DataVersionPhase0:
return nil, errors.New("no deposit requests in phase0")
case spec.DataVersionAltair:
return nil, errors.New("no deposit requests in altair")
case spec.DataVersionBellatrix:
return nil, errors.New("no deposit requests in bellatrix")
case spec.DataVersionCapella:
return nil, errors.New("no deposit requests in capella")
case spec.DataVersionDeneb:
return nil, errors.New("no deposit requests in deneb")
case spec.DataVersionElectra:
if v.Electra == nil || v.Electra.Message == nil || v.Electra.Message.Body == nil || v.Electra.Message.Body.ExecutionPayload == nil {
return nil, errors.New("no electra block")
}

return v.Electra.Message.Body.ExecutionPayload.ConsolidationRequests, nil
default:
return nil, errors.New("unknown version")
}
}

func getBlockExecutionWithdrawalRequests(v *spec.VersionedSignedBeaconBlock) ([]*electra.WithdrawalRequest, error) {
switch v.Version {
case spec.DataVersionPhase0:
return nil, errors.New("no deposit requests in phase0")
case spec.DataVersionAltair:
return nil, errors.New("no deposit requests in altair")
case spec.DataVersionBellatrix:
return nil, errors.New("no deposit requests in bellatrix")
case spec.DataVersionCapella:
return nil, errors.New("no deposit requests in capella")
case spec.DataVersionDeneb:
return nil, errors.New("no deposit requests in deneb")
case spec.DataVersionElectra:
if v.Electra == nil || v.Electra.Message == nil || v.Electra.Message.Body == nil || v.Electra.Message.Body.ExecutionPayload == nil {
return nil, errors.New("no electra block")
}

return v.Electra.Message.Body.ExecutionPayload.WithdrawalRequests, nil
default:
return nil, errors.New("unknown version")
}
}

// getStateRandaoMixes returns the RANDAO mixes from a versioned beacon state.
func getStateRandaoMixes(v *spec.VersionedBeaconState) ([]phase0.Root, error) {
switch v.Version {
Expand Down
Loading

0 comments on commit fa5ec7b

Please sign in to comment.