Skip to content

Commit

Permalink
Correct gossipsub mesh and connected peer inconsistencies (#6244)
Browse files Browse the repository at this point in the history
* Handle gossipsub promises gracefully

* Apply a forgotten patch which sync the fanout with unsubscriptions

* Merge remote-tracking branch 'network/unstable' into supress-invalid-gossipsub-error

* Update beacon_node/lighthouse_network/gossipsub/src/behaviour.rs

Co-authored-by: João Oliveira <hello@jxs.pt>

* Add changelog entry

* Merge latest unstable

* Merge branch 'unstable' into supress-invalid-gossipsub-error

* Merge branch 'unstable' into supress-invalid-gossipsub-error
  • Loading branch information
AgeManning authored Oct 30, 2024
1 parent 48aa353 commit 8d7b3dd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
3 changes: 3 additions & 0 deletions beacon_node/lighthouse_network/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
- Remove the beta tag from the v1.2 upgrade.
See [PR 6344](https://github.com/sigp/lighthouse/pull/6344)

- Correct state inconsistencies with the mesh and connected peers due to the fanout mapping.
See [PR 6244](https://github.com/sigp/lighthouse/pull/6244)

- Implement IDONTWANT messages as per [spec](https://github.com/libp2p/specs/pull/548).
See [PR 5422](https://github.com/sigp/lighthouse/pull/5422)

Expand Down
38 changes: 22 additions & 16 deletions beacon_node/lighthouse_network/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ where
}
} else {
tracing::error!(peer_id = %peer_id,
"Could not PUBLISH, peer doesn't exist in connected peer list");
"Could not send PUBLISH, peer doesn't exist in connected peer list");
}
}

Expand Down Expand Up @@ -1066,7 +1066,7 @@ where
});
} else {
tracing::error!(peer = %peer_id,
"Could not GRAFT, peer doesn't exist in connected peer list");
"Could not send GRAFT, peer doesn't exist in connected peer list");
}

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1165,7 +1165,7 @@ where
peer.sender.prune(prune);
} else {
tracing::error!(peer = %peer_id,
"Could not PRUNE, peer doesn't exist in connected peer list");
"Could not send PRUNE, peer doesn't exist in connected peer list");
}

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1344,7 +1344,7 @@ where
}
} else {
tracing::error!(peer = %peer_id,
"Could not IWANT, peer doesn't exist in connected peer list");
"Could not send IWANT, peer doesn't exist in connected peer list");
}
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
Expand All @@ -1367,7 +1367,7 @@ where

for id in iwant_msgs {
// If we have it and the IHAVE count is not above the threshold,
// foward the message.
// forward the message.
if let Some((msg, count)) = self
.mcache
.get_with_iwant_counts(&id, peer_id)
Expand Down Expand Up @@ -1407,7 +1407,7 @@ where
}
} else {
tracing::error!(peer = %peer_id,
"Could not IWANT, peer doesn't exist in connected peer list");
"Could not send IWANT, peer doesn't exist in connected peer list");
}
}
}
Expand Down Expand Up @@ -2050,8 +2050,11 @@ where
}
}

// remove unsubscribed peers from the mesh if it exists
// remove unsubscribed peers from the mesh and fanout if they exist there.
for (peer_id, topic_hash) in unsubscribed_peers {
self.fanout
.get_mut(&topic_hash)
.map(|peers| peers.remove(&peer_id));
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
}

Expand All @@ -2075,7 +2078,7 @@ where
}
} else {
tracing::error!(peer = %propagation_source,
"Could not GRAFT, peer doesn't exist in connected peer list");
"Could not send GRAFT, peer doesn't exist in connected peer list");
}

// Notify the application of the subscriptions
Expand All @@ -2093,9 +2096,12 @@ where
fn apply_iwant_penalties(&mut self) {
if let Some((peer_score, ..)) = &mut self.peer_score {
for (peer, count) in self.gossip_promises.get_broken_promises() {
peer_score.add_penalty(&peer, count);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_score_penalty(Penalty::BrokenPromise);
// We do not apply penalties to nodes that have disconnected.
if self.connected_peers.contains_key(&peer) {
peer_score.add_penalty(&peer, count);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_score_penalty(Penalty::BrokenPromise);
}
}
}
}
Expand Down Expand Up @@ -2590,7 +2596,7 @@ where
}
} else {
tracing::error!(peer = %peer_id,
"Could not IHAVE, peer doesn't exist in connected peer list");
"Could not send IHAVE, peer doesn't exist in connected peer list");
}
}
}
Expand Down Expand Up @@ -2676,7 +2682,7 @@ where
peer.sender.prune(prune);
} else {
tracing::error!(peer = %peer_id,
"Could not PRUNE, peer doesn't exist in connected peer list");
"Could not send PRUNE, peer doesn't exist in connected peer list");
}

// inform the handler
Expand Down Expand Up @@ -2713,8 +2719,8 @@ where

for peer_id in recipient_peers {
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
tracing::error!(peer = %peer_id,
"Could not IDONTWANT, peer doesn't exist in connected peer list");
// It can be the case that promises to disconnected peers appear here. In this case
// we simply ignore the peer-id.
continue;
};

Expand Down Expand Up @@ -2979,7 +2985,7 @@ where
}
} else {
tracing::error!(peer = %peer_id,
"Could not SUBSCRIBE, peer doesn't exist in connected peer list");
"Could not send SUBSCRIBE, peer doesn't exist in connected peer list");
}
}

Expand Down

0 comments on commit 8d7b3dd

Please sign in to comment.