From 2df9bf99e780ed1eb2e124b74cfe65da9c67843f Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Tue, 12 Sep 2023 22:25:43 -0700 Subject: [PATCH] Replace registry goroutine and channel with mutex (#2243) * Replace registry goroutine and channel with mutex This simplifies the registry code and makes it more efficient. Originally, the registry was going to run many functions asynchronously in its own goroutine so that callers would not have to wait for the processing to finish. Over time, all the calls to registry functions became synchronous, so the channel and goroutine were nothing more than a synchronization mechanism. This PR replaces that goroutine with a simpler mutex. --- internal/registry/registry.go | 432 +++++++++++++---------------- internal/registry/registry_test.go | 9 +- 2 files changed, 198 insertions(+), 243 deletions(-) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index fba5d6ae4..2eaa6b03a 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -40,13 +40,12 @@ var log = logging.Logger("indexer/registry") // Registry stores information about discovered providers type Registry struct { - actions chan func() - closed chan struct{} closeOnce sync.Once closing chan struct{} dstore datastore.Datastore filterIPs bool freezer *freeze.Freezer + provMutex sync.Mutex pollDone chan struct{} providers map[peer.ID]*ProviderInfo sequences *sequences @@ -273,8 +272,6 @@ func New(ctx context.Context, cfg config.Discovery, dstore datastore.Datastore, } r := &Registry{ - actions: make(chan func()), - closed: make(chan struct{}), closing: make(chan struct{}), filterIPs: cfg.FilterIPs, policy: regPolicy, @@ -338,8 +335,6 @@ func New(ctx context.Context, cfg config.Discovery, dstore datastore.Datastore, go r.runPollCheck(poll, pollOverrides) } - go r.run() - return r, nil } @@ -393,7 +388,7 @@ func makePollOverrideMap(poll polling, cfgPollOverrides []config.Polling) (map[p return pollOverrides, nil } -// Close waits for polling and actions to finish and then stops the registry. +// Close stops the registry and waits for polling to finish. func (r *Registry) Close() { r.closeOnce.Do(func() { if r.freezer != nil { @@ -403,31 +398,13 @@ func (r *Registry) Close() { if r.pollDone != nil { <-r.pollDone } - // Stop the main run goroutine. - close(r.actions) }) - <-r.closed } func (r *Registry) SyncChan() <-chan *ProviderInfo { return r.syncChan } -// run executes functions that need to be executed on the same goroutine -// -// Running actions here is a substitute for mutex-locking the sections of code -// run as an action and allows the caller to decide whether or not to wait for -// the code to finish running. -// -// All functions named using the prefix "sync" must be run on this goroutine. -func (r *Registry) run() { - defer close(r.closed) - - for action := range r.actions { - action() - } -} - func (r *Registry) runPollCheck(poll polling, pollOverrides map[peer.ID]polling) { retryAfter := poll.retryAfter for i := range pollOverrides { @@ -451,31 +428,21 @@ running: } } - // Check that pollProviders is finished and close sync channel. - done := make(chan struct{}) - r.actions <- func() { - close(done) - } - <-done - close(r.syncChan) close(r.pollDone) } // Saw indicates that a provider was seen. func (r *Registry) Saw(provider peer.ID) { - done := make(chan struct{}) - r.actions <- func() { - if _, ok := r.providers[provider]; ok { - pinfo := r.providers[provider] - pinfo.lastContactTime = time.Now() - pinfo.inactive = false - log.Infow("Saw provider", "provider", provider, "publisher", pinfo.Publisher, "time", pinfo.lastContactTime) + r.provMutex.Lock() + defer r.provMutex.Unlock() - } - close(done) + pinfo, ok := r.providers[provider] + if ok { + pinfo.lastContactTime = time.Now() + pinfo.inactive = false + log.Infow("Saw provider", "provider", provider, "publisher", pinfo.Publisher, "time", pinfo.lastContactTime) } - <-done } // Allowed checks if the peer is allowed by policy. If configured to work with @@ -681,8 +648,8 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo var newPublisher bool - info, _ := r.ProviderInfo(provider.ID) - if info != nil { + info, ok := r.ProviderInfo(provider.ID) + if ok { info = &ProviderInfo{ AddrInfo: info.AddrInfo, LastAdvertisement: info.LastAdvertisement, @@ -730,7 +697,7 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo AddrInfo: provider, } if extendedProviders != nil { - if err := validateExtProviders(extendedProviders); err != nil { + if err = validateExtProviders(extendedProviders); err != nil { return err } info.ExtendedProviders = extendedProviders @@ -755,9 +722,9 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo } if info.Publisher.Validate() == nil { - // Use new publisher addrs if any given. Otherwise, keep existing. - // If no existing publisher addrs, and publisher ID is same as - // provider ID, then use provider addresses if any. + // Use new publisher addrs if any given. Otherwise, keep existing. If + // no existing publisher addrs, and publisher ID is same as provider + // ID, then use provider addresses if any. if len(publisher.Addrs) != 0 { info.PublisherAddr = publisher.Addrs[0] } else if info.PublisherAddr == nil && publisher.ID == info.AddrInfo.ID { @@ -773,74 +740,61 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo } info.lastContactTime = now - err := r.register(ctx, info) - if err != nil { - return err - } - log.Debugw("Updated registered provider info", "id", info.AddrInfo.ID, "addrs", info.AddrInfo.Addrs) - return nil + return r.register(ctx, info) } func (r *Registry) register(ctx context.Context, info *ProviderInfo) error { - errCh := make(chan error, 1) - r.actions <- func() { - errCh <- r.syncRegister(ctx, info) + r.provMutex.Lock() + defer r.provMutex.Unlock() + + r.providers[info.AddrInfo.ID] = info + err := r.syncPersistProvider(ctx, info) + if err != nil { + err = fmt.Errorf("could not persist provider: %s", err) + return apierror.New(err, http.StatusInternalServerError) } - return <-errCh + return nil } // IsRegistered checks if the provider is in the registry func (r *Registry) IsRegistered(providerID peer.ID) bool { - done := make(chan struct{}) - var found bool - r.actions <- func() { - _, found = r.providers[providerID] - close(done) - } - <-done + r.provMutex.Lock() + _, found := r.providers[providerID] + r.provMutex.Unlock() return found } // ProviderInfo returns information for a registered provider. func (r *Registry) ProviderInfo(providerID peer.ID) (*ProviderInfo, bool) { - infoChan := make(chan *ProviderInfo) - r.actions <- func() { - info, ok := r.providers[providerID] - if ok { - infoChan <- info - } - close(infoChan) - } - pinfo := <-infoChan - if pinfo == nil { + r.provMutex.Lock() + pinfo, ok := r.providers[providerID] + r.provMutex.Unlock() + if !ok { return nil, false } - return pinfo, r.policy.Allowed(providerID) } // AllProviderInfo returns information for all registered providers that are // active and allowed. func (r *Registry) AllProviderInfo() []*ProviderInfo { - infosCh := make(chan []*ProviderInfo) - r.actions <- func() { - infos := make([]*ProviderInfo, 0, len(r.providers)) - for _, info := range r.providers { - if r.assigned != nil { - r.assignMutex.Lock() - _, ok := r.assigned[info.Publisher] - r.assignMutex.Unlock() - if !ok { - // Skip providers whose publisher is not assigned, if using - // assigner service. - continue - } + r.provMutex.Lock() + infos := make([]*ProviderInfo, 0, len(r.providers)) + for _, info := range r.providers { + if r.assigned != nil { + r.assignMutex.Lock() + _, ok := r.assigned[info.Publisher] + r.assignMutex.Unlock() + if !ok { + // Skip providers whose publisher is not assigned, if using + // assigner service. + continue } - infos = append(infos, info) } - infosCh <- infos + infos = append(infos, info) } - infos := <-infosCh + r.provMutex.Unlock() + // Stats tracks the number of active, allowed providers. stats.Record(context.Background(), metrics.ProviderCount.M(int64(len(infos)))) return infos @@ -871,7 +825,8 @@ func (r *Registry) Handoff(ctx context.Context, publisherID, frozenID peer.ID, f return err } - // Iterate through the providers to find the one with the publisher being handed off. + // Iterate through providers to find the one with the publisher being + // handed off. var provInfo *model.ProviderInfo for _, pInfo := range provs { if pInfo.Publisher.ID == publisherID { @@ -912,7 +867,6 @@ func (r *Registry) Handoff(ctx context.Context, publisherID, frozenID peer.ID, f // sync with the frozen at ad as the ad to stop at. if provInfo.FrozenAt != cid.Undef { regInfo.stopCid = provInfo.FrozenAt - select { case r.syncChan <- regInfo: case <-r.closing: @@ -989,24 +943,28 @@ func (r *Registry) ImportProviders(ctx context.Context, fromURL *url.URL) (int, func (r *Registry) RemoveProvider(ctx context.Context, providerID peer.ID) error { var pinfo *ProviderInfo - errChan := make(chan error) - r.actions <- func() { - pinfo = r.providers[providerID] - // Remove provider from datastore and memory. - errChan <- r.syncRemoveProvider(ctx, providerID) + var err error + + r.provMutex.Lock() + pinfo, ok := r.providers[providerID] + if !ok { + r.provMutex.Unlock() + return nil } - err := <-errChan + // Remove provider from datastore and memory. + err = r.syncRemoveProvider(ctx, providerID) + r.provMutex.Unlock() + if err != nil { return err } - if pinfo != nil { - // Tell ingester to delete its provider data. - pinfo.deleted = true - select { - case r.syncChan <- pinfo: - case <-r.closing: - return errors.New("shutdown") - } + + // Tell ingester to delete its provider data. + pinfo.deleted = true + select { + case r.syncChan <- pinfo: + case <-r.closing: + return errors.New("shutdown") } return nil } @@ -1016,22 +974,26 @@ func (r *Registry) SetLastError(providerID peer.ID, err error) { if err != nil { now = time.Now() } - r.actions <- func() { - pinfo, ok := r.providers[providerID] - if !ok { - return - } - var errMsg string - if err != nil { - errMsg = err.Error() - } else if pinfo.LastError == "" { - return - } - pinfoCpy := *pinfo - pinfoCpy.LastError = errMsg - pinfoCpy.LastErrorTime = now - r.providers[providerID] = &pinfoCpy + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + pinfo, ok := r.providers[providerID] + if !ok { + return } + + var errMsg string + if err != nil { + errMsg = err.Error() + } else if pinfo.LastError == "" { + // Last error is also empty; nothing to update. + return + } + pinfoCpy := *pinfo + pinfoCpy.LastError = errMsg + pinfoCpy.LastErrorTime = now + r.providers[providerID] = &pinfoCpy } func (r *Registry) CheckSequence(peerID peer.ID, seq uint64) error { @@ -1051,14 +1013,45 @@ func (r *Registry) Freeze() error { // freeze is called by the Freezer to record the last advertisement ingested // for each provider at the time the indexer becomes frozen. func (r *Registry) freeze() error { + if err := r.freezeProviders(); err != nil { + return fmt.Errorf("cannot freeze providers: %w", err) + } + return nil +} + +func (r *Registry) freezeProviders() error { + ctx := context.Background() now := time.Now() - errCh := make(chan error) - r.actions <- func() { - errCh <- r.syncFreeze(now) + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + for id, info := range r.providers { + frozenInfo := *info + frozenInfo.FrozenAt = info.LastAdvertisement + if info.LastAdvertisementTime.IsZero() { + frozenInfo.FrozenAtTime = now + } else { + frozenInfo.FrozenAtTime = info.LastAdvertisementTime + } + r.providers[id] = &frozenInfo + + if r.dstore == nil { + continue + } + + value, err := json.Marshal(&frozenInfo) + if err != nil { + return err + } + + dsKey := info.dsKey() + if err = r.dstore.Put(ctx, dsKey, value); err != nil { + return err + } } - err := <-errCh - if err != nil { - return fmt.Errorf("cannot freeze providers: %w", err) + if r.dstore != nil { + return r.dstore.Sync(ctx, datastore.NewKey(providerKeyPath)) } return nil } @@ -1127,48 +1120,6 @@ func Unfreeze(ctx context.Context, freezeDirs []string, freezeAtPercent float64, return unfrozen, nil } -func (r *Registry) syncFreeze(now time.Time) error { - ctx := context.Background() - for id, info := range r.providers { - frozenInfo := *info - frozenInfo.FrozenAt = info.LastAdvertisement - if info.LastAdvertisementTime.IsZero() { - frozenInfo.FrozenAtTime = now - } else { - frozenInfo.FrozenAtTime = info.LastAdvertisementTime - } - r.providers[id] = &frozenInfo - - if r.dstore == nil { - continue - } - - value, err := json.Marshal(&frozenInfo) - if err != nil { - return err - } - - dsKey := info.dsKey() - if err = r.dstore.Put(ctx, dsKey, value); err != nil { - return err - } - } - if r.dstore != nil { - return r.dstore.Sync(ctx, datastore.NewKey(providerKeyPath)) - } - return nil -} - -func (r *Registry) syncRegister(ctx context.Context, info *ProviderInfo) error { - r.providers[info.AddrInfo.ID] = info - err := r.syncPersistProvider(ctx, info) - if err != nil { - err = fmt.Errorf("could not persist provider: %s", err) - return apierror.New(err, http.StatusInternalServerError) - } - return nil -} - func (r *Registry) syncPersistProvider(ctx context.Context, info *ProviderInfo) error { if r.dstore == nil { return nil @@ -1186,78 +1137,79 @@ func (r *Registry) syncPersistProvider(ctx context.Context, info *ProviderInfo) } func (r *Registry) pollProviders(normalPoll polling, pollOverrides map[peer.ID]polling) { - r.actions <- func() { - now := time.Now() - for peerID, info := range r.providers { - // Reset poll in case previously overridden. - poll := normalPoll - // If the provider is not allowed, then do not poll or de-list. - if !r.policy.Allowed(peerID) { - continue - } - if info.Publisher.Validate() != nil || !r.policy.Allowed(info.Publisher) { - // No publisher. - continue - } - // If using assigner service, and the provider's publisher is not - // assigned, then do not poll. - if r.assigned != nil { - r.assignMutex.Lock() - _, ok := r.assigned[info.Publisher] - r.assignMutex.Unlock() - if !ok { - continue - } - } - override, ok := pollOverrides[peerID] - if ok { - poll = override - } - if info.lastContactTime.IsZero() { - // There has been no contact since startup. Poll during next - // call to this function if no update for provider. - info.lastContactTime = now.Add(-poll.interval) - continue - } - noContactTime := now.Sub(info.lastContactTime) - if noContactTime < poll.interval { - // Had recent enough contact, no need to poll. + now := time.Now() + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + for peerID, info := range r.providers { + // Reset poll in case previously overridden. + poll := normalPoll + // If the provider is not allowed, then do not poll or de-list. + if !r.policy.Allowed(peerID) { + continue + } + if info.Publisher.Validate() != nil || !r.policy.Allowed(info.Publisher) { + // No publisher. + continue + } + // If using assigner service, and the provider's publisher is not + // assigned, then do not poll. + if r.assigned != nil { + r.assignMutex.Lock() + _, ok := r.assigned[info.Publisher] + r.assignMutex.Unlock() + if !ok { continue } - sincePollingStarted := noContactTime - poll.interval - // If more than stopAfter time has elapsed since polling started, - // then the publisher is considered permanently unresponsive, so - // remove it. - if sincePollingStarted >= poll.stopAfter { - // Too much time since last contact. - log.Warnw("Lost contact with provider, too long with no updates", - "publisher", info.Publisher, - "provider", info.AddrInfo.ID, - "since", info.lastContactTime, - "sincePollingStarted", sincePollingStarted, - "stopAfter", poll.stopAfter) - // Remove the dead provider from the registry. - if err := r.syncRemoveProvider(context.Background(), peerID); err != nil { - log.Errorw("Failed to update deleted provider info", "err", err) - } - // Tell the ingester to remove data for the provider. - info.deleted = true - } else if sincePollingStarted >= poll.deactivateAfter { - // Still polling after deactivateAfter, so mark inactive. - // This will exclude the provider from find responses. - log.Infow("Deactivating provider, too long with no updates", - "publisher", info.Publisher, - "provider", info.AddrInfo.ID, - "since", info.lastContactTime, - "sincePollingStarted", sincePollingStarted, - "deactivateAfter", poll.deactivateAfter) - info.inactive = true - } - select { - case r.syncChan <- info: - default: - log.Debugw("Sync channel blocked, skipping auto-sync", "publisher", info.Publisher) + } + override, ok := pollOverrides[peerID] + if ok { + poll = override + } + if info.lastContactTime.IsZero() { + // There has been no contact since startup. Poll during next call + // to this function if no update for provider. + info.lastContactTime = now.Add(-poll.interval) + continue + } + noContactTime := now.Sub(info.lastContactTime) + if noContactTime < poll.interval { + // Had recent enough contact, no need to poll. + continue + } + sincePollingStarted := noContactTime - poll.interval + // If more than stopAfter time has elapsed since polling started, then + // the publisher is considered permanently unresponsive, so remove it. + if sincePollingStarted >= poll.stopAfter { + // Too much time since last contact. + log.Warnw("Lost contact with provider, too long with no updates", + "publisher", info.Publisher, + "provider", info.AddrInfo.ID, + "since", info.lastContactTime, + "sincePollingStarted", sincePollingStarted, + "stopAfter", poll.stopAfter) + // Remove the dead provider from the registry. + if err := r.syncRemoveProvider(context.Background(), peerID); err != nil { + log.Errorw("Failed to update deleted provider info", "err", err) } + // Tell the ingester to remove data for the provider. + info.deleted = true + } else if sincePollingStarted >= poll.deactivateAfter { + // Still polling after deactivateAfter, so mark inactive. This will + // exclude the provider from find responses. + log.Infow("Deactivating provider, too long with no updates", + "publisher", info.Publisher, + "provider", info.AddrInfo.ID, + "since", info.lastContactTime, + "sincePollingStarted", sincePollingStarted, + "deactivateAfter", poll.deactivateAfter) + info.inactive = true + } + select { + case r.syncChan <- info: + default: + log.Debugw("Sync channel blocked, skipping auto-sync", "publisher", info.Publisher) } } } diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index 089018ef4..51801a9f0 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -410,16 +410,19 @@ func TestPollProvider(t *testing.T) { t.Fatal("Expected sync channel to be written") } - // Check that actions chan is not blocked by unread auto-sync channel. + // Check that registry is not blocked by unread auto-sync channel. poll.retryAfter = 0 poll.deactivateAfter = 0 r.pollProviders(poll, nil) r.pollProviders(poll, nil) r.pollProviders(poll, nil) done := make(chan struct{}) - r.actions <- func() { + go func() { + _, ok := r.ProviderInfo(peerID) + require.True(t, ok) close(done) - } + }() + select { case <-done: case <-timeout: