From 035b47baaa196e491b29a260afaf7d28550a72f1 Mon Sep 17 00:00:00 2001 From: Mario Vega Date: Wed, 6 Mar 2024 14:26:56 -0600 Subject: [PATCH] fix: Defer topic handle close, better handle errors on block request (#41) * fix: Defer topic handle close * add more debugging info --- blobber.go | 38 ++++++++++++++++++++------------------ p2p/broadcast.go | 35 ++++++++++++++++++++++++----------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/blobber.go b/blobber.go index c32a5b5..b95e818 100644 --- a/blobber.go +++ b/blobber.go @@ -364,17 +364,18 @@ func (b *Blobber) genValidatorBlockHandler(cl *beacon_client.BeaconClient, id in return false, errors.Wrap(err, "failed to unmarshal slot") } blockVersion, blockBlobResponse, err := ParseResponse(response) - if err != nil { + if err != nil || blockBlobResponse == nil { logrus.WithFields(logrus.Fields{ - "proxy_id": id, - "version": version, - "slot": slot, - "response": string(response), + "proxy_id": id, + "version": version, + "slot": slot, + "requestURL": request.URL.String(), + "response": string(response), }).Debug("Failed to parse response") - return false, errors.Wrap(err, "failed to parse response") - } - if blockBlobResponse == nil { - return false, errors.Wrap(err, "response is nil") + if err != nil { + return false, errors.Wrap(err, "failed to parse response") + } + return false, errors.New("failed to parse response") } var validatorKey *keys.ValidatorKey if b.ValidatorKeys != nil { @@ -412,28 +413,29 @@ func (b *Blobber) genValidatorBlockHandler(cl *beacon_client.BeaconClient, id in } type BlockDataStruct struct { - Version string `json:"version"` - Data json.RawMessage `json:"data"` + Version *string `json:"version"` + Data *json.RawMessage `json:"data"` } func ParseResponse(response []byte) (string, *deneb.BlockContents, error) { var ( blockDataStruct BlockDataStruct ) - if err := json.Unmarshal(response, &blockDataStruct); err != nil { - return blockDataStruct.Version, nil, errors.Wrap(err, "failed to unmarshal response into BlockDataStruct") + if err := json.Unmarshal(response, &blockDataStruct); err != nil || blockDataStruct.Version == nil || blockDataStruct.Data == nil { + return "", nil, errors.Wrap(err, "failed to unmarshal response into BlockDataStruct") } - if blockDataStruct.Version != "deneb" { + if *blockDataStruct.Version != "deneb" { logrus.WithField("version", blockDataStruct.Version).Warn("Unsupported version, skipping actions") - return blockDataStruct.Version, nil, nil + logrus.WithField("response", string(response)).Debug("Unsupported version, skipping actions") + return *blockDataStruct.Version, nil, nil } - decoder := json.NewDecoder(bytes.NewReader(blockDataStruct.Data)) + decoder := json.NewDecoder(bytes.NewReader(*blockDataStruct.Data)) data := new(deneb.BlockContents) if err := decoder.Decode(&data); err != nil { - return blockDataStruct.Version, nil, errors.Wrap(err, "failed to decode block contents") + return *blockDataStruct.Version, nil, errors.Wrap(err, "failed to decode block contents") } - return blockDataStruct.Version, data, nil + return *blockDataStruct.Version, data, nil } diff --git a/p2p/broadcast.go b/p2p/broadcast.go index 07aa717..d4b95b8 100644 --- a/p2p/broadcast.go +++ b/p2p/broadcast.go @@ -25,24 +25,25 @@ var ( ) func PublishTopic(ctx context.Context, topicHandle *pubsub.Topic, data []byte, opts ...pubsub.PubOpt) error { + // Publish the message to the topic, retrying until we have peers to send the message to + // or the context is cancelled + start := time.Now() for { if len(topicHandle.ListPeers()) > 0 { // Log list of peers we are sending the message to - peerIDs := make([]string, len(topicHandle.ListPeers())) - for i, peer := range topicHandle.ListPeers() { - peerIDs[i] = peer.String() - } - logrus.WithFields(logrus.Fields{ + debugFields := logrus.Fields{ "topic": topicHandle.String(), - "peers": peerIDs, "data-length": len(data), - }).Debug("sending message to peers") - + } + for i, peer := range topicHandle.ListPeers() { + debugFields[fmt.Sprintf("peer_%d", i)] = peer.String() + } + logrus.WithFields(debugFields).Debug("sending message to peers") return topicHandle.Publish(ctx, data, opts...) } select { case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "topic list of peers was always empty") + return errors.Wrapf(ctx.Err(), "topic list of peers was always empty, waited for %s", time.Since(start)) case <-time.After(1 * time.Millisecond): } } @@ -89,6 +90,7 @@ func (p *TestPeer) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlo if err != nil { return errors.Wrap(err, "failed to join topic") } + defer topicHandle.Close() blockRoot := signedBeaconBlock.Message.HashTreeRoot(spec, tree.GetHashFn()) debugFields := logrus.Fields{ "id": p.ID, @@ -107,9 +109,14 @@ func (p *TestPeer) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlo logrus.WithFields(debugFields).Debug("Broadcasting signed beacon block deneb") if err := PublishTopic(timeoutCtx, topicHandle, buf); err != nil { + debugFields := logrus.Fields{} + for i, peer := range p.Host.Network().Peers() { + debugFields[fmt.Sprintf("peer_%d", i)] = peer.String() + } + logrus.WithFields(debugFields).Debug("connected network peers") return errors.Wrap(err, "failed to publish topic") } - return topicHandle.Close() + return nil } func (p TestPeers) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlockDeneb *deneb.SignedBeaconBlock) error { @@ -149,6 +156,7 @@ func (p *TestPeer) BroadcastBlobSidecar(spec *common.Spec, blobSidecar *deneb.Bl if err != nil { return errors.Wrap(err, "failed to join topic") } + defer topicHandle.Close() blockRoot := blobSidecar.SignedBlockHeader.Message.HashTreeRoot(tree.GetHashFn()) logrus.WithFields(logrus.Fields{ @@ -162,9 +170,14 @@ func (p *TestPeer) BroadcastBlobSidecar(spec *common.Spec, blobSidecar *deneb.Bl }).Debug("Broadcasting blob sidecar with signed block header") if err := PublishTopic(timeoutCtx, topicHandle, buf); err != nil { + debugFields := logrus.Fields{} + for i, peer := range p.Host.Network().Peers() { + debugFields[fmt.Sprintf("peer_%d", i)] = peer.String() + } + logrus.WithFields(debugFields).Debug("connected network peers") return errors.Wrap(err, "failed to publish topic") } - return topicHandle.Close() + return nil } func (p *TestPeer) BroadcastBlobSidecars(spec *common.Spec, blobSidecars ...*deneb.BlobSidecar) error {