Skip to content

Commit

Permalink
sync2: add server options and request rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed Nov 20, 2024
1 parent f411f7e commit bffee74
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
4 changes: 2 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type ServerConfig struct {
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
func (s ServerConfig) ToOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
Expand Down Expand Up @@ -366,7 +366,7 @@ func (f *Fetch) registerServer(
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
opts = append(opts, f.cfg.getServerConfig(protocol).ToOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

Expand Down
4 changes: 2 additions & 2 deletions sync2/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ func NewATXSyncer(
cfg, enableActiveSync)

Check warning on line 295 in sync2/atxs.go

View check run for this annotation

Codecov / codecov/patch

sync2/atxs.go#L288-L295

Added lines #L288 - L295 were not covered by tests
}

func NewDispatcher(logger *zap.Logger, f Fetcher) *rangesync.Dispatcher {
func NewDispatcher(logger *zap.Logger, f Fetcher, opts []server.Opt) *rangesync.Dispatcher {
d := rangesync.NewDispatcher(logger)
d.SetupServer(f.Host(), proto, server.WithHardTimeout(20*time.Minute))
d.SetupServer(f.Host(), proto, opts...)
return d

Check warning on line 301 in sync2/atxs.go

View check run for this annotation

Codecov / codecov/patch

sync2/atxs.go#L298-L301

Added lines #L298 - L301 were not covered by tests
}

Expand Down
24 changes: 18 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/mesh"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
Expand Down Expand Up @@ -46,11 +47,13 @@ type Config struct {
}

type ReconcSyncConfig struct {
Enable bool `mapstructure:"enable"`
EnableActiveSync bool `mapstructure:"enable-active-sync"`
OldAtxSyncCfg sync2.Config `mapstructure:"old-atx-sync"`
NewAtxSyncCfg sync2.Config `mapstructure:"new-atx-sync"`
ParallelLoadLimit int `mapstructure:"parallel-load-limit"`
Enable bool `mapstructure:"enable"`
EnableActiveSync bool `mapstructure:"enable-active-sync"`
OldAtxSyncCfg sync2.Config `mapstructure:"old-atx-sync"`
NewAtxSyncCfg sync2.Config `mapstructure:"new-atx-sync"`
ParallelLoadLimit int `mapstructure:"parallel-load-limit"`
HardTimeout time.Duration `mapstructure:"hard-timeout"`
ServerConfig fetch.ServerConfig `mapstructure:"server-config"`
}

// DefaultConfig for the syncer.
Expand Down Expand Up @@ -78,6 +81,12 @@ func DefaultConfig() Config {
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: 10 * time.Minute,
ServerConfig: fetch.ServerConfig{
Queue: 200,
Requests: 100,
Interval: time.Second,
},
},
}
}
Expand Down Expand Up @@ -239,7 +248,10 @@ func NewSyncer(
s.lastLayerSynced.Store(s.mesh.LatestLayer().Uint32())
s.lastEpochSynced.Store(types.GetEffectiveGenesis().GetEpoch().Uint32() - 1)
if s.cfg.ReconcSync.Enable && s.asv2 == nil {
s.dispatcher = sync2.NewDispatcher(s.logger, fetcher.(sync2.Fetcher))
serverOpts := append(
s.cfg.ReconcSync.ServerConfig.ToOpts(),
server.WithHardTimeout(s.cfg.ReconcSync.HardTimeout))
s.dispatcher = sync2.NewDispatcher(s.logger, fetcher.(sync2.Fetcher), serverOpts)
hss := sync2.NewATXSyncSource(
s.logger, s.dispatcher, cdb.Database.(sql.StateDatabase),
fetcher.(sync2.Fetcher), s.cfg.ReconcSync.EnableActiveSync)
Expand Down

0 comments on commit bffee74

Please sign in to comment.