diff --git a/clients/consensus/rpc/beaconstream.go b/clients/consensus/rpc/beaconstream.go index 855f3a21..be6fd92c 100644 --- a/clients/consensus/rpc/beaconstream.go +++ b/clients/consensus/rpc/beaconstream.go @@ -93,7 +93,14 @@ func (bs *BeaconStream) startStream() { Ready: true, } case err := <-stream.Errors: - bs.logger.Warnf("beacon block stream error: %v", err) + if strings.Contains(err.Error(), "INTERNAL_ERROR; received from peer") { + // this seems to be a go bug, silently reconnect to the stream + time.Sleep(10 * time.Millisecond) + stream.RetryNow() + } else { + bs.logger.Warnf("beacon block stream error: %v", err) + } + select { case bs.ReadyChan <- &BeaconStreamStatus{ Ready: false, diff --git a/clients/consensus/rpc/eventstream/eventstream.go b/clients/consensus/rpc/eventstream/eventstream.go index 920e39e2..3afb3f26 100644 --- a/clients/consensus/rpc/eventstream/eventstream.go +++ b/clients/consensus/rpc/eventstream/eventstream.go @@ -1,6 +1,7 @@ package eventstream import ( + "context" "errors" "fmt" "io" @@ -34,6 +35,8 @@ type Stream struct { isClosed bool // isClosedMutex is a mutex protecting concurrent read/write access of isClosed closeMutex sync.Mutex + // retrySleepCancel is a function that can be called to cancel the retry sleep + retrySleepCancel context.CancelFunc } type StreamEvent interface { @@ -114,6 +117,13 @@ func (stream *Stream) Close() { }() } +// RetryNow will force the stream to reconnect a disconnected stream immediately. +func (stream *Stream) RetryNow() { + if cancelFn := stream.retrySleepCancel; cancelFn != nil { + cancelFn() + } +} + // Go's http package doesn't copy headers across when it encounters // redirects so we need to do that manually. func checkRedirect(req *http.Request, via []*http.Request) error { @@ -215,7 +225,11 @@ func (stream *Stream) retryRestartStream() { stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds()) } - time.Sleep(backoff) + ctx, cancel := context.WithTimeout(context.Background(), backoff) + stream.retrySleepCancel = cancel + <-ctx.Done() + + stream.retrySleepCancel = nil if stream.isClosed { return