Skip to content

Commit

Permalink
feat: add ctx close in datastream reconnections
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 11, 2024
1 parent bcf8a1a commit c2f7f11
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,24 @@ func (c *StreamClient) GetEntryNumberLimit() uint64 {
func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.FullL2Block, errorCode int, err error) {
var (
socketErr error = nil
connected bool = true
connected bool = c.conn != nil
)

for {

select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil, errorCode, nil
default:
}
if connected {
if fullBLock, errorCode, err, socketErr = c.getL2BlockByNumber(blockNum); err != nil {
return nil, errorCode, err
}
}
if socketErr == nil {
break
if socketErr == nil {
break
}
}
time.Sleep(1 * time.Second)
connected = c.handleSocketError(socketErr)
Expand Down Expand Up @@ -178,18 +185,25 @@ func (c *StreamClient) getL2BlockByNumber(blockNum uint64) (l2Block *types.FullL
func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error) {
var (
socketErr error = nil
connected bool = true
connected bool = c.conn != nil
)

for {
select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil, nil
default:
}
if connected {
if l2Block, err, socketErr = c.getLatestL2Block(); err != nil {
return nil, err
}
if socketErr == nil {
break
}
}
if socketErr == nil {
break
}

time.Sleep(1 * time.Second)
connected = c.handleSocketError(socketErr)
}
Expand Down Expand Up @@ -383,17 +397,23 @@ func (c *StreamClient) RenewEntryChannel() {
func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
var (
socketErr error = nil
connected bool = true
connected bool = c.conn != nil
)

for {
select {
case <-c.ctx.Done():
log.Warn("[Datastream client] Context done - stopping")
return nil
default:
}
if connected {
if err, socketErr = c.readAllEntriesToChannel(); err != nil {
return err
}
}
if socketErr == nil {
break
if socketErr == nil {
break
}
}

time.Sleep(1 * time.Second)
Expand All @@ -404,7 +424,9 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
}

func (c *StreamClient) handleSocketError(socketErr error) bool {
log.Warn(fmt.Sprintf("Socket error: %s", socketErr))
if socketErr != nil {
log.Warn(fmt.Sprintf("Socket error: %s", socketErr))
}
if err := c.tryReConnect(); err != nil {
log.Warn(fmt.Sprintf("Failed to reconnect the datastream client: %s", err))
return false
Expand Down

0 comments on commit c2f7f11

Please sign in to comment.