From 17569ef9918ce422dadaa285a7a9d89b7ce5524c Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 23 Sep 2024 21:02:26 +0530 Subject: [PATCH 1/2] relay: make only 1 reservation per peer We only need 1 reservation per peer. This PR also proactively removes relay reservations on disconnecting from the peer. This helps when the relay client is restarting frequently. --- p2p/protocol/circuitv2/relay/constraints.go | 90 ++++++++++--------- .../circuitv2/relay/constraints_test.go | 49 +++++----- p2p/protocol/circuitv2/relay/relay.go | 15 ++-- p2p/protocol/circuitv2/relay/resources.go | 2 + 4 files changed, 84 insertions(+), 72 deletions(-) diff --git a/p2p/protocol/circuitv2/relay/constraints.go b/p2p/protocol/circuitv2/relay/constraints.go index 28698d9c06..0e6b2bc313 100644 --- a/p2p/protocol/circuitv2/relay/constraints.go +++ b/p2p/protocol/circuitv2/relay/constraints.go @@ -2,6 +2,7 @@ package relay import ( "errors" + "slices" "sync" "time" @@ -12,24 +13,25 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) -var validity = 30 * time.Minute - var ( - errTooManyReservations = errors.New("too many reservations") - errTooManyReservationsForPeer = errors.New("too many reservations for peer") - errTooManyReservationsForIP = errors.New("too many peers for IP address") - errTooManyReservationsForASN = errors.New("too many peers for ASN") + errTooManyReservations = errors.New("too many reservations") + errTooManyReservationsForIP = errors.New("too many peers for IP address") + errTooManyReservationsForASN = errors.New("too many peers for ASN") ) +type peerWithExpiry struct { + Expiry time.Time + Peer peer.ID +} + // constraints implements various reservation constraints type constraints struct { rc *Resources mutex sync.Mutex - total []time.Time - peers map[peer.ID][]time.Time - ips map[string][]time.Time - asns map[uint32][]time.Time + total []peerWithExpiry + ips map[string][]peerWithExpiry + asns map[uint32][]peerWithExpiry } // newConstraints creates a new constraints object. @@ -37,21 +39,22 @@ type constraints struct { // is required. func newConstraints(rc *Resources) *constraints { return &constraints{ - rc: rc, - peers: make(map[peer.ID][]time.Time), - ips: make(map[string][]time.Time), - asns: make(map[uint32][]time.Time), + rc: rc, + ips: make(map[string][]peerWithExpiry), + asns: make(map[uint32][]peerWithExpiry), } } // AddReservation adds a reservation for a given peer with a given multiaddr. // If adding this reservation violates IP constraints, an error is returned. -func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error { +func (c *constraints) Reserve(p peer.ID, a ma.Multiaddr, expiry time.Time) error { c.mutex.Lock() defer c.mutex.Unlock() now := time.Now() c.cleanup(now) + // To handle refreshes correctly, remove the existing reservation for the peer. + c.cleanupPeer(p) if len(c.total) >= c.rc.MaxReservations { return errTooManyReservations @@ -62,17 +65,12 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error { return errors.New("no IP address associated with peer") } - peerReservations := c.peers[p] - if len(peerReservations) >= c.rc.MaxReservationsPerPeer { - return errTooManyReservationsForPeer - } - ipReservations := c.ips[ip.String()] if len(ipReservations) >= c.rc.MaxReservationsPerIP { return errTooManyReservationsForIP } - var asnReservations []time.Time + var asnReservations []peerWithExpiry var asn uint32 if ip.To4() == nil { asn = asnutil.AsnForIPv6(ip) @@ -84,42 +82,52 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error { } } - expiry := now.Add(validity) - c.total = append(c.total, expiry) - - peerReservations = append(peerReservations, expiry) - c.peers[p] = peerReservations + c.total = append(c.total, peerWithExpiry{Expiry: expiry, Peer: p}) - ipReservations = append(ipReservations, expiry) + ipReservations = append(ipReservations, peerWithExpiry{Expiry: expiry, Peer: p}) c.ips[ip.String()] = ipReservations if asn != 0 { - asnReservations = append(asnReservations, expiry) + asnReservations = append(asnReservations, peerWithExpiry{Expiry: expiry, Peer: p}) c.asns[asn] = asnReservations } return nil } -func (c *constraints) cleanupList(l []time.Time, now time.Time) []time.Time { - var index int - for i, t := range l { - if t.After(now) { - break +func (c *constraints) cleanup(now time.Time) { + expireFunc := func(pe peerWithExpiry) bool { + return pe.Expiry.Before(now) + } + c.total = slices.DeleteFunc(c.total, expireFunc) + for k, ipReservations := range c.ips { + c.ips[k] = slices.DeleteFunc(ipReservations, expireFunc) + if len(c.ips[k]) == 0 { + delete(c.ips, k) + } + } + for k, asnReservations := range c.asns { + c.asns[k] = slices.DeleteFunc(asnReservations, expireFunc) + if len(c.asns[k]) == 0 { + delete(c.asns, k) } - index = i + 1 } - return l[index:] } -func (c *constraints) cleanup(now time.Time) { - c.total = c.cleanupList(c.total, now) - for k, peerReservations := range c.peers { - c.peers[k] = c.cleanupList(peerReservations, now) +func (c *constraints) cleanupPeer(p peer.ID) { + removeFunc := func(pe peerWithExpiry) bool { + return pe.Peer == p } + c.total = slices.DeleteFunc(c.total, removeFunc) for k, ipReservations := range c.ips { - c.ips[k] = c.cleanupList(ipReservations, now) + c.ips[k] = slices.DeleteFunc(ipReservations, removeFunc) + if len(c.ips[k]) == 0 { + delete(c.ips, k) + } } for k, asnReservations := range c.asns { - c.asns[k] = c.cleanupList(asnReservations, now) + c.asns[k] = slices.DeleteFunc(asnReservations, removeFunc) + if len(c.asns[k]) == 0 { + delete(c.asns, k) + } } } diff --git a/p2p/protocol/circuitv2/relay/constraints_test.go b/p2p/protocol/circuitv2/relay/constraints_test.go index 352fe22405..a40b50803d 100644 --- a/p2p/protocol/circuitv2/relay/constraints_test.go +++ b/p2p/protocol/circuitv2/relay/constraints_test.go @@ -34,35 +34,40 @@ func TestConstraints(t *testing.T) { } } const limit = 7 + expiry := time.Now().Add(30 * time.Minute) t.Run("total reservations", func(t *testing.T) { res := infResources() res.MaxReservations = limit c := newConstraints(res) for i := 0; i < limit; i++ { - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil { t.Fatal(err) } } - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations { t.Fatalf("expected to run into total reservation limit, got %v", err) } }) - t.Run("reservations per peer", func(t *testing.T) { + t.Run("updates reservations on the same peer", func(t *testing.T) { p := test.RandPeerIDFatal(t) + p2 := test.RandPeerIDFatal(t) res := infResources() - res.MaxReservationsPerPeer = limit + res.MaxReservationsPerIP = 1 c := newConstraints(res) - for i := 0; i < limit; i++ { - if err := c.AddReservation(p, randomIPv4Addr(t)); err != nil { - t.Fatal(err) - } + + ipAddr := randomIPv4Addr(t) + if err := c.Reserve(p, ipAddr, expiry); err != nil { + t.Fatal(err) } - if err := c.AddReservation(p, randomIPv4Addr(t)); err != errTooManyReservationsForPeer { + if err := c.Reserve(p2, ipAddr, expiry); err != errTooManyReservationsForIP { t.Fatalf("expected to run into total reservation limit, got %v", err) } - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(p, randomIPv4Addr(t), expiry); err != nil { + t.Fatalf("expected to update reservation for peer, got %v", err) + } + if err := c.Reserve(p2, ipAddr, expiry); err != nil { t.Fatalf("expected reservation for different peer to be possible, got %v", err) } }) @@ -73,14 +78,14 @@ func TestConstraints(t *testing.T) { res.MaxReservationsPerIP = limit c := newConstraints(res) for i := 0; i < limit; i++ { - if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != nil { t.Fatal(err) } } - if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != errTooManyReservationsForIP { + if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != errTooManyReservationsForIP { t.Fatalf("expected to run into total reservation limit, got %v", err) } - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil { t.Fatalf("expected reservation for different IP to be possible, got %v", err) } }) @@ -101,25 +106,23 @@ func TestConstraints(t *testing.T) { const ipv6Prefix = "2a03:2880:f003:c07:face:b00c::" for i := 0; i < limit; i++ { addr := getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, i+1))) - if err := c.AddReservation(test.RandPeerIDFatal(t), addr); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), addr, expiry); err != nil { t.Fatal(err) } } - if err := c.AddReservation(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42)))); err != errTooManyReservationsForASN { + if err := c.Reserve(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42))), expiry); err != errTooManyReservationsForASN { t.Fatalf("expected to run into total reservation limit, got %v", err) } - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil { t.Fatalf("expected reservation for different IP to be possible, got %v", err) } }) } func TestConstraintsCleanup(t *testing.T) { - origValidity := validity - defer func() { validity = origValidity }() - validity = 500 * time.Millisecond - const limit = 7 + validity := 500 * time.Millisecond + expiry := time.Now().Add(validity) res := &Resources{ MaxReservations: limit, MaxReservationsPerPeer: math.MaxInt32, @@ -128,16 +131,16 @@ func TestConstraintsCleanup(t *testing.T) { } c := newConstraints(res) for i := 0; i < limit; i++ { - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil { t.Fatal(err) } } - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations { t.Fatalf("expected to run into total reservation limit, got %v", err) } time.Sleep(validity + time.Millisecond) - if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil { t.Fatalf("expected old reservations to have been garbage collected, %v", err) } } diff --git a/p2p/protocol/circuitv2/relay/relay.go b/p2p/protocol/circuitv2/relay/relay.go index 1629a6d1a1..f98da96c30 100644 --- a/p2p/protocol/circuitv2/relay/relay.go +++ b/p2p/protocol/circuitv2/relay/relay.go @@ -202,18 +202,16 @@ func (r *Relay) handleReserve(s network.Stream) pbv2.Status { return pbv2.Status_PERMISSION_DENIED } now := time.Now() + expire := now.Add(r.rc.ReservationTTL) _, exists := r.rsvp[p] - if !exists { - if err := r.constraints.AddReservation(p, a); err != nil { - r.mx.Unlock() - log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err) - r.handleError(s, pbv2.Status_RESERVATION_REFUSED) - return pbv2.Status_RESERVATION_REFUSED - } + if err := r.constraints.Reserve(p, a, expire); err != nil { + r.mx.Unlock() + log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err) + r.handleError(s, pbv2.Status_RESERVATION_REFUSED) + return pbv2.Status_RESERVATION_REFUSED } - expire := now.Add(r.rc.ReservationTTL) r.rsvp[p] = expire r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight) r.mx.Unlock() @@ -673,6 +671,7 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) { if ok { delete(r.rsvp, p) } + r.constraints.cleanupPeer(p) r.mx.Unlock() if ok && r.metricsTracer != nil { diff --git a/p2p/protocol/circuitv2/relay/resources.go b/p2p/protocol/circuitv2/relay/resources.go index 0ae0169899..d47daa3112 100644 --- a/p2p/protocol/circuitv2/relay/resources.go +++ b/p2p/protocol/circuitv2/relay/resources.go @@ -22,6 +22,8 @@ type Resources struct { // MaxReservationsPerPeer is the maximum number of reservations originating from the same // peer; default is 4. + // + // Deprecated: We only need 1 reservation per peer. MaxReservationsPerPeer int // MaxReservationsPerIP is the maximum number of reservations originating from the same // IP address; default is 8. From 9a2212b1a6ed07770a321362699c0604cebdc8b6 Mon Sep 17 00:00:00 2001 From: sukun Date: Fri, 4 Oct 2024 16:09:14 +0530 Subject: [PATCH 2/2] fix stale comments --- p2p/protocol/circuitv2/relay/constraints.go | 4 ++-- p2p/protocol/circuitv2/relay/constraints_test.go | 4 ++-- p2p/protocol/circuitv2/relay/resources.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/circuitv2/relay/constraints.go b/p2p/protocol/circuitv2/relay/constraints.go index 0e6b2bc313..4cce1f0176 100644 --- a/p2p/protocol/circuitv2/relay/constraints.go +++ b/p2p/protocol/circuitv2/relay/constraints.go @@ -45,8 +45,8 @@ func newConstraints(rc *Resources) *constraints { } } -// AddReservation adds a reservation for a given peer with a given multiaddr. -// If adding this reservation violates IP constraints, an error is returned. +// Reserve adds a reservation for a given peer with a given multiaddr. +// If adding this reservation violates IP, ASN, or total reservation constraints, an error is returned. func (c *constraints) Reserve(p peer.ID, a ma.Multiaddr, expiry time.Time) error { c.mutex.Lock() defer c.mutex.Unlock() diff --git a/p2p/protocol/circuitv2/relay/constraints_test.go b/p2p/protocol/circuitv2/relay/constraints_test.go index a40b50803d..bced8e4097 100644 --- a/p2p/protocol/circuitv2/relay/constraints_test.go +++ b/p2p/protocol/circuitv2/relay/constraints_test.go @@ -62,10 +62,10 @@ func TestConstraints(t *testing.T) { t.Fatal(err) } if err := c.Reserve(p2, ipAddr, expiry); err != errTooManyReservationsForIP { - t.Fatalf("expected to run into total reservation limit, got %v", err) + t.Fatalf("expected to run into IP reservation limit as this IP has already been reserved by a different peer, got %v", err) } if err := c.Reserve(p, randomIPv4Addr(t), expiry); err != nil { - t.Fatalf("expected to update reservation for peer, got %v", err) + t.Fatalf("expected to update existing reservation for peer, got %v", err) } if err := c.Reserve(p2, ipAddr, expiry); err != nil { t.Fatalf("expected reservation for different peer to be possible, got %v", err) diff --git a/p2p/protocol/circuitv2/relay/resources.go b/p2p/protocol/circuitv2/relay/resources.go index d47daa3112..dc0207bca6 100644 --- a/p2p/protocol/circuitv2/relay/resources.go +++ b/p2p/protocol/circuitv2/relay/resources.go @@ -53,7 +53,7 @@ func DefaultResources() Resources { MaxCircuits: 16, BufferSize: 2048, - MaxReservationsPerPeer: 4, + MaxReservationsPerPeer: 1, MaxReservationsPerIP: 8, MaxReservationsPerASN: 32, }