Skip to content

Commit

Permalink
peerstore: better GC in membacked peerstore (#2960)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: sukun <sukunrt@gmail.com>
  • Loading branch information
MarcoPolo and sukunrt committed Sep 24, 2024
1 parent 6694d4a commit 85eb9ef
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 212 deletions.
67 changes: 22 additions & 45 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,10 @@ type AddrBook interface {
PeersWithAddrs() peer.IDSlice
}

// CertifiedAddrBook manages "self-certified" addresses for remote peers.
// Self-certified addresses are contained in peer.PeerRecords
// which are wrapped in a record.Envelope and signed by the peer
// to whom they belong.
// CertifiedAddrBook manages signed peer records and "self-certified" addresses
// contained within them.
// Use this interface with an `AddrBook`.
//
// Certified addresses (CA) are generally more secure than uncertified
// addresses (UA). Consequently, CAs beat and displace UAs. When the
// peerstore learns CAs for a peer, it will reject UAs for the same peer
// (as long as the former haven't expired).
// Furthermore, peer records act like sequenced snapshots of CAs. Therefore,
// processing a peer record that's newer than the last one seen overwrites
// all addresses with the incoming ones.
//
// This interface is most useful when combined with AddrBook.
// To test whether a given AddrBook / Peerstore implementation supports
// certified addresses, callers should use the GetCertifiedAddrBook helper or
// type-assert on the CertifiedAddrBook interface:
Expand All @@ -143,41 +133,28 @@ type AddrBook interface {
// cab.ConsumePeerRecord(signedPeerRecord, aTTL)
// }
type CertifiedAddrBook interface {
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
//
// The 'accepted' return value indicates that the record was successfully processed
// and integrated into the CertifiedAddrBook state. If 'accepted' is false but no
// error is returned, it means that the record was ignored, most likely because
// a newer record exists for the same peer.
//
// Signed records added via this method will be stored without
// alteration as long as the address TTLs remain valid. The Envelopes
// containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID).
//
// If the signed PeerRecord belongs to a peer that already has certified
// addresses in the CertifiedAddrBook, the new addresses will replace the
// older ones, if the new record has a higher sequence number than the
// existing record. Attempting to add a peer record with a
// sequence number that's <= an existing record for the same peer will not
// result in an error, but the record will be ignored, and the 'accepted'
// bool return value will be false.
// ConsumePeerRecord stores a signed peer record and the contained addresses for
// for ttl duration.
// The addresses contained in the signed peer record will expire after ttl. If any
// address is already present in the peer store, it'll expire at max of existing ttl and
// provided ttl.
// The signed peer record itself will be expired when all the addresses associated with the peer,
// self-certified or not, are removed from the AddrBook.
// To delete the signed peer record, use `AddrBook.UpdateAddrs`,`AddrBook.SetAddrs`, or
// `AddrBook.ClearAddrs` with ttl 0.
// Note: Future calls to ConsumePeerRecord will not expire self-certified addresses from the
// previous calls.
//
// If the CertifiedAddrBook is also an AddrBook (which is most likely the case),
// adding certified addresses for a peer will *replace* any
// existing non-certified addresses for that peer, and only the certified
// addresses will be returned from AddrBook.Addrs thereafter.
// The `accepted` return value indicates that the record was successfully processed. If
// `accepted` is false but no error is returned, it means that the record was ignored, most
// likely because a newer record exists for the same peer with a greater seq value.
//
// Likewise, once certified addresses have been added for a given peer,
// any non-certified addresses added via AddrBook.AddAddrs or
// AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used
// to update the TTL of certified addresses that have previously been
// added via ConsumePeerRecord.
// The Envelopes containing the signed peer records can be retrieved by calling
// GetPeerRecord(peerID).
ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error)

// GetPeerRecord returns an Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
// GetPeerRecord returns an Envelope containing a peer record for the
// peer, or nil if no record exists.
GetPeerRecord(p peer.ID) *record.Envelope
}

Expand All @@ -196,7 +173,7 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) {

// KeyBook tracks the keys of Peers.
type KeyBook interface {
// PubKey stores the public key of a peer.
// PubKey returns the public key of a peer.
PubKey(peer.ID) ic.PubKey

// AddPubKey stores the public key of a peer.
Expand Down
55 changes: 31 additions & 24 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,12 @@ func (h *BasicHost) SignalAddressChange() {
}
}

func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
if prev == nil && current == nil {
return nil
}
prevmap := make(map[string]ma.Multiaddr, len(prev))
evt := event.EvtLocalAddressesUpdated{Diffs: true}
evt := &event.EvtLocalAddressesUpdated{Diffs: true}
addrsAdded := false

for _, addr := range prev {
Expand All @@ -524,7 +527,19 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return nil
}

return &evt
// Our addresses have changed. Make a new signed peer record.
if !h.disableSignedPeerRecord {
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(current)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
// drop this change
return nil
}
evt.SignedPeerRecord = sr
}

return evt
}

func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
Expand All @@ -548,34 +563,27 @@ func (h *BasicHost) background() {
var lastAddrs []ma.Multiaddr

emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
// nothing to do if both are nil..defensive check
if currentAddrs == nil && lastAddrs == nil {
return
}

changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)

changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}

// Our addresses have changed.
// store the signed peer record in the peer store.
if !h.disableSignedPeerRecord {
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(currentAddrs)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
}
// update host addresses in the peer store
removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed))
for _, ua := range changeEvt.Removed {
removedAddrs = append(removedAddrs, ua.Address)
}
h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0)

// emit addr change event on the bus
// emit addr change event
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
Expand All @@ -587,11 +595,10 @@ func (h *BasicHost) background() {
defer ticker.Stop()

for {
// Update our local IP addresses before checking our current addresses.
if len(h.network.ListenAddresses()) > 0 {
h.updateLocalIpAddr()
}
// Request addresses anyways because, technically, address filters still apply.
// The underlying AllAddrs call is effectively a no-op.
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr
Expand Down
Loading

0 comments on commit 85eb9ef

Please sign in to comment.