Skip to content

Commit

Permalink
sync: enable rate limiting for servers (#5151)
Browse files Browse the repository at this point in the history
closes: #4977
closes: #4603

this change introduces two configuration parameter for every server:
- requests per interval pace, for example 10 req/s, this caps the maximum bandwidth that every server can use
- queue size, it is set to serve requests within expected latency. every other request is dropped immediately so that client can retry with different node. currently the timeout is set to 10s, so the queue should be roughly 10 times larger then rps

it doesn't provide global limit for bandwidth, but we have limit for the number of peers. and honest peer doesn't run many concurrent queries. so what we really want to handle is peers with intentionally malicious behavior, but thats not a pressing issue 

example configuration:

```json
"fetch": {
        "servers": {
            "ax/1": {"queue": 10, "requests": 1, "interval": "1s"},
            "ld/1": {"queue": 1000, "requests": 100, "interval": "1s"},
            "hs/1": {"queue": 2000, "requests": 200, "interval": "1s"},
            "mh/1": {"queue": 1000, "requests": 100, "interval": "1s"},
            "ml/1": {"queue": 100, "requests": 10, "interval": "1s"},
            "lp/2": {"queue": 10000, "requests": 1000, "interval": "1s"}
        }
    }
```

https://github.com/spacemeshos/go-spacemesh/blob/3cf02146bf27f53c001bffcacffbda05933c27c4/fetch/fetch.go#L130-L144


metrics are per server:

https://github.com/spacemeshos/go-spacemesh/blob/3cf02146bf27f53c001bffcacffbda05933c27c4/p2p/server/metrics.go#L15-L52

have to be enabled for all servers with

```json
"fetch": {
        "servers-metrics": true
    }
```
  • Loading branch information
dshulyak committed Oct 22, 2023
1 parent b168b41 commit adb2849
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 110 deletions.
110 changes: 82 additions & 28 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,72 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage {
return m
}

type ServerConfig struct {
Queue int `mapstructure:"queue"`
Requests int `mapstructure:"requests"`
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
}
if s.Requests != 0 && s.Interval != 0 {
opts = append(opts, server.WithRequestsPerInterval(s.Requests, s.Interval))
}
return opts
}

// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
BatchTimeout time.Duration
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
RequestTimeout time.Duration
MaxRetriesForRequest int
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
EnableServesMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
cfg, exists := c.ServersConfig[protocol]
if exists {
return cfg
}
return ServerConfig{
Queue: 10000,
Requests: 100,
Interval: time.Second,
}
}

// DefaultConfig is the default config for the fetch component.
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
BatchTimeout: 50 * time.Millisecond,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
RequestTimeout: 10 * time.Second,
MaxRetriesForRequest: 100,
PeersRateThreshold: 0.02,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
atxProtocol: {Queue: 10, Requests: 1, Interval: time.Second},
// serves 1 KB of data
lyrDataProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves atxs, ballots, active sets
// atx - 1 KB
// ballots > 300 bytes
// often queried after receiving gossip message
hashProtocol: {Queue: 2000, Requests: 200, Interval: time.Second},
// serves at most 100 hashes - 3KB
meshHashProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves all malicious ids (id - 32 byte) - 10KB
malProtocol: {Queue: 100, Requests: 10, Interval: time.Second},
// 64 bytes
OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second},
},
PeersRateThreshold: 0.02,
}
}

Expand Down Expand Up @@ -220,34 +268,34 @@ func NewFetch(
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
f.registerServer(host, atxProtocol, h.handleEpochInfoReq)
f.registerServer(host, lyrDataProtocol, h.handleLayerDataReq)
f.registerServer(host, hashProtocol, h.handleHashReq)
f.registerServer(host, meshHashProtocol, h.handleMeshHashReq)
f.registerServer(host, malProtocol, h.handleMaliciousIDsReq)
f.registerServer(host, OpnProtocol, h.handleLayerOpinionsReq2)
}
return f
}

func (f *Fetch) registerServer(
host *p2p.Host,
protocol string,
handler server.Handler,
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServesMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

type dataValidators struct {
atx SyncValidator
poet SyncValidator
Expand Down Expand Up @@ -295,6 +343,12 @@ func (f *Fetch) Start() error {
f.loop()
return nil
})
for _, srv := range f.servers {
srv := srv
f.eg.Go(func() error {
return srv.Run(f.shutdownCtx)
})
}
})
return nil
}
Expand Down
12 changes: 11 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -65,6 +66,9 @@ func createFetch(tb testing.TB) *testFetch {
mTxProposalH: mocks.NewMockSyncValidator(ctrl),
mPoetH: mocks.NewMockSyncValidator(ctrl),
}
for _, srv := range []*mocks.Mockrequester{tf.mMalS, tf.mAtxS, tf.mLyrS, tf.mHashS, tf.mMHashS, tf.mOpn2S} {
srv.EXPECT().Run(gomock.Any()).AnyTimes()
}
cfg := Config{
BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout
BatchSize: 3,
Expand Down Expand Up @@ -373,7 +377,13 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
}
return result, nil
}
server.New(badPeerHost, hashProtocol, badPeerHandler)
badsrv := server.New(badPeerHost, hashProtocol, badPeerHandler)
var eg errgroup.Group
eg.Go(func() error {
badsrv.Run(ctx)
return nil
})
defer eg.Wait()

fetcher := NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), nil, nil, h,
WithContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go

type requester interface {
Run(context.Context) error
Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -206,7 +207,6 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
78 changes: 78 additions & 0 deletions p2p/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package server

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/spacemeshos/go-spacemesh/metrics"
)

const (
namespace = "server"
protoLabel = "protocol"
)

var (
targetQueue = metrics.NewGauge(
"target_queue",
namespace,
"target size of the queue",
[]string{protoLabel},
)
queue = metrics.NewGauge(
"queue",
namespace,
"actual size of the queue",
[]string{protoLabel},
)
targetRps = metrics.NewGauge(
"rps",
namespace,
"target requests per second",
[]string{protoLabel},
)
requests = metrics.NewCounter(
"requests",
namespace,
"requests counter",
[]string{protoLabel, "state"},
)
clientLatency = metrics.NewHistogramWithBuckets(
"client_latency_seconds",
namespace,
"latency since initiating a request",
[]string{protoLabel, "result"},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
serverLatency = metrics.NewHistogramWithBuckets(
"server_latency_seconds",
namespace,
"latency since accepting new stream",
[]string{protoLabel},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
)

func newTracker(protocol string) *tracker {
return &tracker{
targetQueue: targetQueue.WithLabelValues(protocol),
queue: queue.WithLabelValues(protocol),
targetRps: targetRps.WithLabelValues(protocol),
completed: requests.WithLabelValues(protocol, "completed"),
accepted: requests.WithLabelValues(protocol, "accepted"),
dropped: requests.WithLabelValues(protocol, "dropped"),
serverLatency: serverLatency.WithLabelValues(protocol),
clientLatency: clientLatency.WithLabelValues(protocol, "success"),
clientLatencyFailure: clientLatency.WithLabelValues(protocol, "failure"),
}
}

type tracker struct {
targetQueue prometheus.Gauge
queue prometheus.Gauge
targetRps prometheus.Gauge
completed prometheus.Counter
accepted prometheus.Counter
dropped prometheus.Counter
serverLatency prometheus.Observer
clientLatency, clientLatencyFailure prometheus.Observer
}
Loading

0 comments on commit adb2849

Please sign in to comment.