Skip to content

Commit

Permalink
fix contract-watcher processing (#531)
Browse files Browse the repository at this point in the history
Co-authored-by: walker-16 <agpazos85@gmail.com>
  • Loading branch information
ftocal and walker-16 committed Jul 13, 2023
1 parent 55d4b35 commit 5077426
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 25 deletions.
2 changes: 1 addition & 1 deletion contract-watcher/cmd/service/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func newWatchersForMainnet() *watchersConfig {
evm: 1000,
solana: 3,
terra: 10,
aptos: 3,
aptos: 20,
oasis: 3,
moonbeam: 5,
celo: 3,
Expand Down
12 changes: 7 additions & 5 deletions contract-watcher/watcher/aptos_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (w *AptosWatcher) Start(ctx context.Context) error {
w.logger.Error("cannot get latest block", zap.Error(err))
}
maxBlocks := uint64(w.sizeBlocks)
w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
w.metrics.SetLastBlock(w.chainID, lastBlock)
if currentBlock < lastBlock {
totalBlocks := (lastBlock-currentBlock)/maxBlocks + 1
Expand All @@ -98,21 +98,23 @@ func (w *AptosWatcher) Start(ctx context.Context) error {
if toBlock > lastBlock {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
} else {
w.logger.Info("waiting for new blocks")
w.logger.Debug("waiting for new blocks")
select {
case <-ctx.Done():
w.wg.Done()
return nil
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
currentBlock = lastBlock
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions contract-watcher/watcher/evm_standard_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,30 @@ func (w *EvmStandarWatcher) Start(ctx context.Context) error {
if err != nil {
w.logger.Error("cannot get latest block", zap.Error(err))
}
w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
w.metrics.SetLastBlock(w.chainID, lastBlock)

if currentBlock < lastBlock {
totalBlocks := getTotalBlocks(lastBlock, currentBlock, w.maxBlocks)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(currentBlock, i, w.maxBlocks, lastBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
} else {
w.logger.Info("waiting for new blocks")
w.logger.Debug("waiting for new blocks")
select {
case <-ctx.Done():
w.wg.Done()
return nil
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
currentBlock = lastBlock
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions contract-watcher/watcher/evm_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,23 @@ func (w *EVMWatcher) Start(ctx context.Context) error {
if toBlock > lastBlock {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
w.logger.Debug("processing blocks", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
w.logger.Debug("blocks processed", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
}
// process all the blocks between current and last block.
} else {
w.logger.Info("waiting for new blocks")
w.logger.Debug("waiting for new blocks")
select {
case <-ctx.Done():
w.wg.Done()
return nil
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
currentBlock = lastBlock
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions contract-watcher/watcher/solana_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,29 +142,31 @@ func (w *SolanaWatcher) Start(ctx context.Context) error {
maxBlocks := uint64(w.sizeBlocks)
w.metrics.SetLastBlock(w.chainID, lastBlock)
if currentBlock < lastBlock {
w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
totalBlocks := (lastBlock-currentBlock)/maxBlocks + 1
for i := 0; i < int(totalBlocks); i++ {
fromBlock := currentBlock + uint64(i)*maxBlocks
toBlock := fromBlock + maxBlocks - 1
if toBlock > lastBlock {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
} else {
w.logger.Info("waiting for new blocks")
w.logger.Debug("waiting for new blocks")
select {
case <-ctx.Done():
w.wg.Done()
return nil
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
currentBlock = lastBlock
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions contract-watcher/watcher/terra_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (w *TerraWatcher) Start(ctx context.Context) error {
w.metrics.SetLastBlock(w.chainID, uint64(lastBlock))
// check if there are new blocks to process.
if currentBlock < lastBlock {
w.logger.Info("processing blocks", zap.Int64("from", currentBlock), zap.Int64("to", lastBlock))
w.logger.Debug("processing blocks", zap.Int64("from", currentBlock), zap.Int64("to", lastBlock))
for block := currentBlock; block <= lastBlock; block++ {
w.processBlock(ctx, block)
// update block watcher
Expand All @@ -119,7 +119,9 @@ func (w *TerraWatcher) Start(ctx context.Context) error {
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
currentBlock = lastBlock
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}
}
Expand All @@ -128,7 +130,7 @@ func (w *TerraWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock u
totalBlocks := getTotalBlocks(toBlock, fromBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
for block := fromBlock; block <= toBlock; block++ {
w.processBlock(ctx, int64(block))
if persistBlock {
Expand All @@ -141,7 +143,7 @@ func (w *TerraWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock u
w.repository.UpdateWatcherBlock(ctx, w.chainID, watcherBlock)
}
}
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}

Expand Down
4 changes: 3 additions & 1 deletion deploy/parser/parser-backfiller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ spec:
- --vaa-payload-parser-timeout
- "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}"
- --page-size
- "50"
- "50"
- --start-time
- "2018-01-01T00:00:00Z"

0 comments on commit 5077426

Please sign in to comment.