Skip to content

Commit

Permalink
fix: Defer topic handle close, better handle errors on block request (#…
Browse files Browse the repository at this point in the history
…41)

* fix: Defer topic handle close

* add more debugging info
  • Loading branch information
marioevz authored Mar 6, 2024
1 parent d0e18e4 commit 035b47b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 29 deletions.
38 changes: 20 additions & 18 deletions blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,18 @@ func (b *Blobber) genValidatorBlockHandler(cl *beacon_client.BeaconClient, id in
return false, errors.Wrap(err, "failed to unmarshal slot")
}
blockVersion, blockBlobResponse, err := ParseResponse(response)
if err != nil {
if err != nil || blockBlobResponse == nil {
logrus.WithFields(logrus.Fields{
"proxy_id": id,
"version": version,
"slot": slot,
"response": string(response),
"proxy_id": id,
"version": version,
"slot": slot,
"requestURL": request.URL.String(),
"response": string(response),
}).Debug("Failed to parse response")
return false, errors.Wrap(err, "failed to parse response")
}
if blockBlobResponse == nil {
return false, errors.Wrap(err, "response is nil")
if err != nil {
return false, errors.Wrap(err, "failed to parse response")
}
return false, errors.New("failed to parse response")
}
var validatorKey *keys.ValidatorKey
if b.ValidatorKeys != nil {
Expand Down Expand Up @@ -412,28 +413,29 @@ func (b *Blobber) genValidatorBlockHandler(cl *beacon_client.BeaconClient, id in
}

type BlockDataStruct struct {
Version string `json:"version"`
Data json.RawMessage `json:"data"`
Version *string `json:"version"`
Data *json.RawMessage `json:"data"`
}

func ParseResponse(response []byte) (string, *deneb.BlockContents, error) {
var (
blockDataStruct BlockDataStruct
)
if err := json.Unmarshal(response, &blockDataStruct); err != nil {
return blockDataStruct.Version, nil, errors.Wrap(err, "failed to unmarshal response into BlockDataStruct")
if err := json.Unmarshal(response, &blockDataStruct); err != nil || blockDataStruct.Version == nil || blockDataStruct.Data == nil {
return "", nil, errors.Wrap(err, "failed to unmarshal response into BlockDataStruct")
}

if blockDataStruct.Version != "deneb" {
if *blockDataStruct.Version != "deneb" {
logrus.WithField("version", blockDataStruct.Version).Warn("Unsupported version, skipping actions")
return blockDataStruct.Version, nil, nil
logrus.WithField("response", string(response)).Debug("Unsupported version, skipping actions")
return *blockDataStruct.Version, nil, nil
}

decoder := json.NewDecoder(bytes.NewReader(blockDataStruct.Data))
decoder := json.NewDecoder(bytes.NewReader(*blockDataStruct.Data))
data := new(deneb.BlockContents)
if err := decoder.Decode(&data); err != nil {
return blockDataStruct.Version, nil, errors.Wrap(err, "failed to decode block contents")
return *blockDataStruct.Version, nil, errors.Wrap(err, "failed to decode block contents")
}

return blockDataStruct.Version, data, nil
return *blockDataStruct.Version, data, nil
}
35 changes: 24 additions & 11 deletions p2p/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ var (
)

func PublishTopic(ctx context.Context, topicHandle *pubsub.Topic, data []byte, opts ...pubsub.PubOpt) error {
// Publish the message to the topic, retrying until we have peers to send the message to
// or the context is cancelled
start := time.Now()
for {
if len(topicHandle.ListPeers()) > 0 {
// Log list of peers we are sending the message to
peerIDs := make([]string, len(topicHandle.ListPeers()))
for i, peer := range topicHandle.ListPeers() {
peerIDs[i] = peer.String()
}
logrus.WithFields(logrus.Fields{
debugFields := logrus.Fields{
"topic": topicHandle.String(),
"peers": peerIDs,
"data-length": len(data),
}).Debug("sending message to peers")

}
for i, peer := range topicHandle.ListPeers() {
debugFields[fmt.Sprintf("peer_%d", i)] = peer.String()
}
logrus.WithFields(debugFields).Debug("sending message to peers")
return topicHandle.Publish(ctx, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "topic list of peers was always empty")
return errors.Wrapf(ctx.Err(), "topic list of peers was always empty, waited for %s", time.Since(start))
case <-time.After(1 * time.Millisecond):
}
}
Expand Down Expand Up @@ -89,6 +90,7 @@ func (p *TestPeer) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlo
if err != nil {
return errors.Wrap(err, "failed to join topic")
}
defer topicHandle.Close()
blockRoot := signedBeaconBlock.Message.HashTreeRoot(spec, tree.GetHashFn())
debugFields := logrus.Fields{
"id": p.ID,
Expand All @@ -107,9 +109,14 @@ func (p *TestPeer) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlo
logrus.WithFields(debugFields).Debug("Broadcasting signed beacon block deneb")

if err := PublishTopic(timeoutCtx, topicHandle, buf); err != nil {
debugFields := logrus.Fields{}
for i, peer := range p.Host.Network().Peers() {
debugFields[fmt.Sprintf("peer_%d", i)] = peer.String()
}
logrus.WithFields(debugFields).Debug("connected network peers")
return errors.Wrap(err, "failed to publish topic")
}
return topicHandle.Close()
return nil
}

func (p TestPeers) BroadcastSignedBeaconBlock(spec *common.Spec, signedBeaconBlockDeneb *deneb.SignedBeaconBlock) error {
Expand Down Expand Up @@ -149,6 +156,7 @@ func (p *TestPeer) BroadcastBlobSidecar(spec *common.Spec, blobSidecar *deneb.Bl
if err != nil {
return errors.Wrap(err, "failed to join topic")
}
defer topicHandle.Close()

blockRoot := blobSidecar.SignedBlockHeader.Message.HashTreeRoot(tree.GetHashFn())
logrus.WithFields(logrus.Fields{
Expand All @@ -162,9 +170,14 @@ func (p *TestPeer) BroadcastBlobSidecar(spec *common.Spec, blobSidecar *deneb.Bl
}).Debug("Broadcasting blob sidecar with signed block header")

if err := PublishTopic(timeoutCtx, topicHandle, buf); err != nil {
debugFields := logrus.Fields{}
for i, peer := range p.Host.Network().Peers() {
debugFields[fmt.Sprintf("peer_%d", i)] = peer.String()
}
logrus.WithFields(debugFields).Debug("connected network peers")
return errors.Wrap(err, "failed to publish topic")
}
return topicHandle.Close()
return nil
}

func (p *TestPeer) BroadcastBlobSidecars(spec *common.Spec, blobSidecars ...*deneb.BlobSidecar) error {
Expand Down

0 comments on commit 035b47b

Please sign in to comment.