Skip to content

Commit

Permalink
add withdrawal indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Oct 8, 2024
1 parent 88db084 commit 7fb76b1
Show file tree
Hide file tree
Showing 3 changed files with 659 additions and 10 deletions.
54 changes: 54 additions & 0 deletions db/withdrawal_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,60 @@ import (
"github.com/jmoiron/sqlx"
)

func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, amount, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12

args := make([]any, len(withdrawalTxs)*fieldCount)
for i, withdrawalTx := range withdrawalTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = withdrawalTx.BlockNumber
args[argIdx+1] = withdrawalTx.BlockIndex
args[argIdx+2] = withdrawalTx.BlockTime
args[argIdx+3] = withdrawalTx.BlockRoot
args[argIdx+4] = withdrawalTx.ForkId
args[argIdx+5] = withdrawalTx.SourceAddress
args[argIdx+6] = withdrawalTx.ValidatorPubkey
args[argIdx+7] = withdrawalTx.Amount
args[argIdx+8] = withdrawalTx.TxHash
args[argIdx+9] = withdrawalTx.TxSender
args[argIdx+10] = withdrawalTx.TxTarget
args[argIdx+11] = withdrawalTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func InsertWithdrawalRequests(elRequests []*dbtypes.WithdrawalRequest, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
Expand Down
32 changes: 22 additions & 10 deletions indexer/execution/consolidation_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewConsolidationIndexer(indexer *IndexerCtx) *ConsolidationIndexer {

ci := &ConsolidationIndexer{
indexer: indexer,
logger: indexer.logger.WithField("indexer", "deposit"),
logger: indexer.logger.WithField("indexer", "consolidation"),
batchSize: batchSize,
contractAddress: common.HexToAddress(consolidationContractAddr),
forkStates: map[beacon.ForkKey]*consolidationIndexerForkState{},
Expand Down Expand Up @@ -150,7 +150,7 @@ func (ds *ConsolidationIndexer) loadTransactionByHash(ctx context.Context, clien
return tx, err
}

func (ds *ConsolidationIndexer) loadHeaderByNumber(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) {
func (ds *ConsolidationIndexer) loadHeaderByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -223,7 +223,7 @@ func (ds *ConsolidationIndexer) processFinalizedBlocks(finalizedBlockNumber uint
return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
}

txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockHash)
txBlockHeader, err = ds.loadHeaderByHash(ctx, client, log.BlockHash)
if err != nil {
return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err)
}
Expand Down Expand Up @@ -389,6 +389,7 @@ func (ds *ConsolidationIndexer) processRecentBlocksForFork(headFork *forkWithCli
var txHash []byte
var txDetails *types.Transaction
var txBlockHeader *types.Header
var clBlock []*beacon.Block

requestTxs := []*dbtypes.ConsolidationRequestTx{}

Expand Down Expand Up @@ -439,12 +440,23 @@ func (ds *ConsolidationIndexer) processRecentBlocksForFork(headFork *forkWithCli
return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
}

txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockHash)
clBlock = ds.indexer.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash))

txHash = log.TxHash[:]
}

var blockTime uint64
if len(clBlock) > 0 {
blockTime = uint64(ds.indexer.chainState.SlotToTime(clBlock[0].Slot).Unix())
} else if txBlockHeader == nil || txBlockHeader.Hash() != log.BlockHash {
var err error

txBlockHeader, err = ds.loadHeaderByHash(ctx, client, log.BlockHash)
if err != nil {
return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err)
}

txHash = log.TxHash[:]
blockTime = txBlockHeader.Time
}

txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails)
Expand All @@ -458,13 +470,13 @@ func (ds *ConsolidationIndexer) processRecentBlocksForFork(headFork *forkWithCli
continue
}

if clBlock := ds.indexer.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)); len(clBlock) > 0 {
if len(clBlock) > 0 {
requestTx.ForkId = uint64(clBlock[0].GetForkId())
} else {
requestTx.ForkId = uint64(headFork.forkId)
}

requestTx.BlockTime = txBlockHeader.Time
requestTx.BlockTime = blockTime
requestTx.TxSender = txFrom[:]
requestTx.TxTarget = txTo[:]

Expand All @@ -483,11 +495,11 @@ func (ds *ConsolidationIndexer) processRecentBlocksForFork(headFork *forkWithCli
}

if len(requestTxs) > 0 {
ds.logger.Infof("crawled recent consolidations for fork %v (%v-%v): %v deposits", headFork.forkId, startBlockNumber, toBlock, len(requestTxs))
ds.logger.Infof("crawled recent consolidations for fork %v (%v-%v): %v consolidations", headFork.forkId, startBlockNumber, toBlock, len(requestTxs))

err := ds.persistRecentRequestTxs(headFork.forkId, queueBlock, startQueueLen, requestTxs)
if err != nil {
return fmt.Errorf("could not persist deposit txs: %v", err)
return fmt.Errorf("could not persist consolidation txs: %v", err)
}

time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -564,7 +576,7 @@ func (ds *ConsolidationIndexer) persistState(tx *sqlx.Tx) error {

err := db.SetExplorerState("indexer.consolidationstate", ds.state, tx)
if err != nil {
return fmt.Errorf("error while updating deposit state: %v", err)
return fmt.Errorf("error while updating consolidation tx indexer state: %v", err)
}

return nil
Expand Down
Loading

0 comments on commit 7fb76b1

Please sign in to comment.