Skip to content

Commit

Permalink
feat: retry a fixed number of times in stage batches
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 11, 2024
1 parent 561a4b7 commit eb97b3f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
19 changes: 15 additions & 4 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,17 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful
socketErr error = nil
connected bool = c.conn != nil
)

count := 0
for {

select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil, errorCode, nil
default:
}
if count > 5 {
return nil, -1, errors.New("failed to get the L2 block within 5 attempts")
}
if connected {
if fullBLock, errorCode, err, socketErr = c.getL2BlockByNumber(blockNum); err != nil {
return nil, errorCode, err
Expand All @@ -127,6 +129,7 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful
}
time.Sleep(1 * time.Second)
connected = c.handleSocketError(socketErr)
count++
}

return fullBLock, types.CmdErrOK, nil
Expand Down Expand Up @@ -187,14 +190,17 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error
socketErr error = nil
connected bool = c.conn != nil
)

count := 0
for {
select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil, nil
default:
}
if count > 5 {
return nil, errors.New("failed to get the latest L2 block within 5 attempts")
}
if connected {
if l2Block, err, socketErr = c.getLatestL2Block(); err != nil {
return nil, err
Expand All @@ -206,6 +212,7 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error

time.Sleep(1 * time.Second)
connected = c.handleSocketError(socketErr)
count++
}
return l2Block, nil
}
Expand Down Expand Up @@ -399,14 +406,17 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
socketErr error = nil
connected bool = c.conn != nil
)

count := 0
for {
select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil
default:
}
if count > 5 {
return errors.New("failed to read all entries within 5 attempts")
}
if connected {
if err, socketErr = c.readAllEntriesToChannel(); err != nil {
return err
Expand All @@ -418,6 +428,7 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {

time.Sleep(1 * time.Second)
connected = c.handleSocketError(socketErr)
count++
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func SpawnStageBatches(
}
highestDSL2Block, err = dsQueryClient.GetLatestL2Block()
if err != nil {
return fmt.Errorf("failed to retrieve the latest datastream l2 block: %w", err)
// if we return error, stage will replay and block all other stages
log.Warn(fmt.Sprintf("[%s] Failed to get latest l2 block from datastream: %v", logPrefix, err))
return nil
}

// a lower block should also break the loop because that means the datastream was unwound
Expand Down

0 comments on commit eb97b3f

Please sign in to comment.