Skip to content

Commit

Permalink
various fixes in metadata updater
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Oct 25, 2024
1 parent 0089277 commit 08ba08e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
30 changes: 8 additions & 22 deletions operator/metadata/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

const (
defaultUpdateInterval = 12 * time.Minute
defaultStreamInterval = 2 * time.Second
defaultBatchSize = 512
streamInterval = 2 * time.Second
batchSize = 512
streamChanSize = 1024
)

type Updater struct {
Expand All @@ -30,8 +31,6 @@ type Updater struct {
beaconNetwork beacon.BeaconNetwork
fetcher *Fetcher
updateInterval time.Duration
streamInterval time.Duration
batchSize int
}

type shareStorage interface {
Expand All @@ -53,7 +52,6 @@ func NewUpdater(
beaconNetwork: beaconNetwork,
fetcher: NewFetcher(logger, beaconNode),
updateInterval: defaultUpdateInterval,
streamInterval: defaultStreamInterval,
}

for _, opt := range opts {
Expand All @@ -71,18 +69,6 @@ func WithUpdateInterval(interval time.Duration) Option {
}
}

func WithStreamInterval(interval time.Duration) Option {
return func(u *Updater) {
u.updateInterval = interval
}
}

func WithBatchSize(batchSize int) Option {
return func(u *Updater) {
u.batchSize = batchSize
}
}

func (u *Updater) RetrieveInitialMetadata(ctx context.Context) (map[spectypes.ValidatorPK]*beacon.ValidatorMetadata, error) {
// Load non-liquidated shares.
shares := u.shareStorage.List(nil, registrystorage.ByNotLiquidated())
Expand Down Expand Up @@ -155,7 +141,7 @@ type Update struct {
}

func (u *Updater) Stream(ctx context.Context) <-chan Update {
metadataUpdates := make(chan Update)
metadataUpdates := make(chan Update, streamChanSize)

go func() {
defer close(metadataUpdates)
Expand Down Expand Up @@ -194,8 +180,8 @@ func (u *Updater) Stream(ctx context.Context) <-chan Update {
}

// Only sleep if there aren't more validators to fetch metadata for.
if len(shares) < u.batchSize {
time.Sleep(u.streamInterval)
if len(shares) < batchSize {
time.Sleep(streamInterval)
}
}
}
Expand All @@ -215,12 +201,12 @@ func (u *Updater) sharesForUpdate() []*ssvtypes.SSVShare {
} else if time.Since(share.MetadataLastUpdated()) > u.updateInterval {
existingShares = append(existingShares, share)
}
return len(newShares) < u.batchSize
return len(newShares) < batchSize
})

// Combine validators up to batchSize, prioritizing the new ones.
shares := newShares
if remainder := u.batchSize - len(shares); remainder > 0 {
if remainder := batchSize - len(shares); remainder > 0 {
end := remainder
if end > len(existingShares) {
end = len(existingShares)
Expand Down
3 changes: 1 addition & 2 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,8 +1086,7 @@ func (c *controller) ForkListener(logger *zap.Logger) {
}

func (c *controller) HandleMetadataUpdates(ctx context.Context) {
updates := c.metadataUpdater.Stream(ctx)
for update := range updates {
for update := range c.metadataUpdater.Stream(ctx) {
if err := c.handleMetadataUpdate(ctx, update); err != nil {
c.logger.Warn("could not handle metadata update", zap.Error(err))
}
Expand Down

0 comments on commit 08ba08e

Please sign in to comment.