diff --git a/internal/registry/registry.go b/internal/registry/registry.go index fba5d6ae4..2eaa6b03a 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -40,13 +40,12 @@ var log = logging.Logger("indexer/registry") // Registry stores information about discovered providers type Registry struct { - actions chan func() - closed chan struct{} closeOnce sync.Once closing chan struct{} dstore datastore.Datastore filterIPs bool freezer *freeze.Freezer + provMutex sync.Mutex pollDone chan struct{} providers map[peer.ID]*ProviderInfo sequences *sequences @@ -273,8 +272,6 @@ func New(ctx context.Context, cfg config.Discovery, dstore datastore.Datastore, } r := &Registry{ - actions: make(chan func()), - closed: make(chan struct{}), closing: make(chan struct{}), filterIPs: cfg.FilterIPs, policy: regPolicy, @@ -338,8 +335,6 @@ func New(ctx context.Context, cfg config.Discovery, dstore datastore.Datastore, go r.runPollCheck(poll, pollOverrides) } - go r.run() - return r, nil } @@ -393,7 +388,7 @@ func makePollOverrideMap(poll polling, cfgPollOverrides []config.Polling) (map[p return pollOverrides, nil } -// Close waits for polling and actions to finish and then stops the registry. +// Close stops the registry and waits for polling to finish. func (r *Registry) Close() { r.closeOnce.Do(func() { if r.freezer != nil { @@ -403,31 +398,13 @@ func (r *Registry) Close() { if r.pollDone != nil { <-r.pollDone } - // Stop the main run goroutine. - close(r.actions) }) - <-r.closed } func (r *Registry) SyncChan() <-chan *ProviderInfo { return r.syncChan } -// run executes functions that need to be executed on the same goroutine -// -// Running actions here is a substitute for mutex-locking the sections of code -// run as an action and allows the caller to decide whether or not to wait for -// the code to finish running. -// -// All functions named using the prefix "sync" must be run on this goroutine. -func (r *Registry) run() { - defer close(r.closed) - - for action := range r.actions { - action() - } -} - func (r *Registry) runPollCheck(poll polling, pollOverrides map[peer.ID]polling) { retryAfter := poll.retryAfter for i := range pollOverrides { @@ -451,31 +428,21 @@ running: } } - // Check that pollProviders is finished and close sync channel. - done := make(chan struct{}) - r.actions <- func() { - close(done) - } - <-done - close(r.syncChan) close(r.pollDone) } // Saw indicates that a provider was seen. func (r *Registry) Saw(provider peer.ID) { - done := make(chan struct{}) - r.actions <- func() { - if _, ok := r.providers[provider]; ok { - pinfo := r.providers[provider] - pinfo.lastContactTime = time.Now() - pinfo.inactive = false - log.Infow("Saw provider", "provider", provider, "publisher", pinfo.Publisher, "time", pinfo.lastContactTime) + r.provMutex.Lock() + defer r.provMutex.Unlock() - } - close(done) + pinfo, ok := r.providers[provider] + if ok { + pinfo.lastContactTime = time.Now() + pinfo.inactive = false + log.Infow("Saw provider", "provider", provider, "publisher", pinfo.Publisher, "time", pinfo.lastContactTime) } - <-done } // Allowed checks if the peer is allowed by policy. If configured to work with @@ -681,8 +648,8 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo var newPublisher bool - info, _ := r.ProviderInfo(provider.ID) - if info != nil { + info, ok := r.ProviderInfo(provider.ID) + if ok { info = &ProviderInfo{ AddrInfo: info.AddrInfo, LastAdvertisement: info.LastAdvertisement, @@ -730,7 +697,7 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo AddrInfo: provider, } if extendedProviders != nil { - if err := validateExtProviders(extendedProviders); err != nil { + if err = validateExtProviders(extendedProviders); err != nil { return err } info.ExtendedProviders = extendedProviders @@ -755,9 +722,9 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo } if info.Publisher.Validate() == nil { - // Use new publisher addrs if any given. Otherwise, keep existing. - // If no existing publisher addrs, and publisher ID is same as - // provider ID, then use provider addresses if any. + // Use new publisher addrs if any given. Otherwise, keep existing. If + // no existing publisher addrs, and publisher ID is same as provider + // ID, then use provider addresses if any. if len(publisher.Addrs) != 0 { info.PublisherAddr = publisher.Addrs[0] } else if info.PublisherAddr == nil && publisher.ID == info.AddrInfo.ID { @@ -773,74 +740,61 @@ func (r *Registry) Update(ctx context.Context, provider, publisher peer.AddrInfo } info.lastContactTime = now - err := r.register(ctx, info) - if err != nil { - return err - } - log.Debugw("Updated registered provider info", "id", info.AddrInfo.ID, "addrs", info.AddrInfo.Addrs) - return nil + return r.register(ctx, info) } func (r *Registry) register(ctx context.Context, info *ProviderInfo) error { - errCh := make(chan error, 1) - r.actions <- func() { - errCh <- r.syncRegister(ctx, info) + r.provMutex.Lock() + defer r.provMutex.Unlock() + + r.providers[info.AddrInfo.ID] = info + err := r.syncPersistProvider(ctx, info) + if err != nil { + err = fmt.Errorf("could not persist provider: %s", err) + return apierror.New(err, http.StatusInternalServerError) } - return <-errCh + return nil } // IsRegistered checks if the provider is in the registry func (r *Registry) IsRegistered(providerID peer.ID) bool { - done := make(chan struct{}) - var found bool - r.actions <- func() { - _, found = r.providers[providerID] - close(done) - } - <-done + r.provMutex.Lock() + _, found := r.providers[providerID] + r.provMutex.Unlock() return found } // ProviderInfo returns information for a registered provider. func (r *Registry) ProviderInfo(providerID peer.ID) (*ProviderInfo, bool) { - infoChan := make(chan *ProviderInfo) - r.actions <- func() { - info, ok := r.providers[providerID] - if ok { - infoChan <- info - } - close(infoChan) - } - pinfo := <-infoChan - if pinfo == nil { + r.provMutex.Lock() + pinfo, ok := r.providers[providerID] + r.provMutex.Unlock() + if !ok { return nil, false } - return pinfo, r.policy.Allowed(providerID) } // AllProviderInfo returns information for all registered providers that are // active and allowed. func (r *Registry) AllProviderInfo() []*ProviderInfo { - infosCh := make(chan []*ProviderInfo) - r.actions <- func() { - infos := make([]*ProviderInfo, 0, len(r.providers)) - for _, info := range r.providers { - if r.assigned != nil { - r.assignMutex.Lock() - _, ok := r.assigned[info.Publisher] - r.assignMutex.Unlock() - if !ok { - // Skip providers whose publisher is not assigned, if using - // assigner service. - continue - } + r.provMutex.Lock() + infos := make([]*ProviderInfo, 0, len(r.providers)) + for _, info := range r.providers { + if r.assigned != nil { + r.assignMutex.Lock() + _, ok := r.assigned[info.Publisher] + r.assignMutex.Unlock() + if !ok { + // Skip providers whose publisher is not assigned, if using + // assigner service. + continue } - infos = append(infos, info) } - infosCh <- infos + infos = append(infos, info) } - infos := <-infosCh + r.provMutex.Unlock() + // Stats tracks the number of active, allowed providers. stats.Record(context.Background(), metrics.ProviderCount.M(int64(len(infos)))) return infos @@ -871,7 +825,8 @@ func (r *Registry) Handoff(ctx context.Context, publisherID, frozenID peer.ID, f return err } - // Iterate through the providers to find the one with the publisher being handed off. + // Iterate through providers to find the one with the publisher being + // handed off. var provInfo *model.ProviderInfo for _, pInfo := range provs { if pInfo.Publisher.ID == publisherID { @@ -912,7 +867,6 @@ func (r *Registry) Handoff(ctx context.Context, publisherID, frozenID peer.ID, f // sync with the frozen at ad as the ad to stop at. if provInfo.FrozenAt != cid.Undef { regInfo.stopCid = provInfo.FrozenAt - select { case r.syncChan <- regInfo: case <-r.closing: @@ -989,24 +943,28 @@ func (r *Registry) ImportProviders(ctx context.Context, fromURL *url.URL) (int, func (r *Registry) RemoveProvider(ctx context.Context, providerID peer.ID) error { var pinfo *ProviderInfo - errChan := make(chan error) - r.actions <- func() { - pinfo = r.providers[providerID] - // Remove provider from datastore and memory. - errChan <- r.syncRemoveProvider(ctx, providerID) + var err error + + r.provMutex.Lock() + pinfo, ok := r.providers[providerID] + if !ok { + r.provMutex.Unlock() + return nil } - err := <-errChan + // Remove provider from datastore and memory. + err = r.syncRemoveProvider(ctx, providerID) + r.provMutex.Unlock() + if err != nil { return err } - if pinfo != nil { - // Tell ingester to delete its provider data. - pinfo.deleted = true - select { - case r.syncChan <- pinfo: - case <-r.closing: - return errors.New("shutdown") - } + + // Tell ingester to delete its provider data. + pinfo.deleted = true + select { + case r.syncChan <- pinfo: + case <-r.closing: + return errors.New("shutdown") } return nil } @@ -1016,22 +974,26 @@ func (r *Registry) SetLastError(providerID peer.ID, err error) { if err != nil { now = time.Now() } - r.actions <- func() { - pinfo, ok := r.providers[providerID] - if !ok { - return - } - var errMsg string - if err != nil { - errMsg = err.Error() - } else if pinfo.LastError == "" { - return - } - pinfoCpy := *pinfo - pinfoCpy.LastError = errMsg - pinfoCpy.LastErrorTime = now - r.providers[providerID] = &pinfoCpy + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + pinfo, ok := r.providers[providerID] + if !ok { + return } + + var errMsg string + if err != nil { + errMsg = err.Error() + } else if pinfo.LastError == "" { + // Last error is also empty; nothing to update. + return + } + pinfoCpy := *pinfo + pinfoCpy.LastError = errMsg + pinfoCpy.LastErrorTime = now + r.providers[providerID] = &pinfoCpy } func (r *Registry) CheckSequence(peerID peer.ID, seq uint64) error { @@ -1051,14 +1013,45 @@ func (r *Registry) Freeze() error { // freeze is called by the Freezer to record the last advertisement ingested // for each provider at the time the indexer becomes frozen. func (r *Registry) freeze() error { + if err := r.freezeProviders(); err != nil { + return fmt.Errorf("cannot freeze providers: %w", err) + } + return nil +} + +func (r *Registry) freezeProviders() error { + ctx := context.Background() now := time.Now() - errCh := make(chan error) - r.actions <- func() { - errCh <- r.syncFreeze(now) + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + for id, info := range r.providers { + frozenInfo := *info + frozenInfo.FrozenAt = info.LastAdvertisement + if info.LastAdvertisementTime.IsZero() { + frozenInfo.FrozenAtTime = now + } else { + frozenInfo.FrozenAtTime = info.LastAdvertisementTime + } + r.providers[id] = &frozenInfo + + if r.dstore == nil { + continue + } + + value, err := json.Marshal(&frozenInfo) + if err != nil { + return err + } + + dsKey := info.dsKey() + if err = r.dstore.Put(ctx, dsKey, value); err != nil { + return err + } } - err := <-errCh - if err != nil { - return fmt.Errorf("cannot freeze providers: %w", err) + if r.dstore != nil { + return r.dstore.Sync(ctx, datastore.NewKey(providerKeyPath)) } return nil } @@ -1127,48 +1120,6 @@ func Unfreeze(ctx context.Context, freezeDirs []string, freezeAtPercent float64, return unfrozen, nil } -func (r *Registry) syncFreeze(now time.Time) error { - ctx := context.Background() - for id, info := range r.providers { - frozenInfo := *info - frozenInfo.FrozenAt = info.LastAdvertisement - if info.LastAdvertisementTime.IsZero() { - frozenInfo.FrozenAtTime = now - } else { - frozenInfo.FrozenAtTime = info.LastAdvertisementTime - } - r.providers[id] = &frozenInfo - - if r.dstore == nil { - continue - } - - value, err := json.Marshal(&frozenInfo) - if err != nil { - return err - } - - dsKey := info.dsKey() - if err = r.dstore.Put(ctx, dsKey, value); err != nil { - return err - } - } - if r.dstore != nil { - return r.dstore.Sync(ctx, datastore.NewKey(providerKeyPath)) - } - return nil -} - -func (r *Registry) syncRegister(ctx context.Context, info *ProviderInfo) error { - r.providers[info.AddrInfo.ID] = info - err := r.syncPersistProvider(ctx, info) - if err != nil { - err = fmt.Errorf("could not persist provider: %s", err) - return apierror.New(err, http.StatusInternalServerError) - } - return nil -} - func (r *Registry) syncPersistProvider(ctx context.Context, info *ProviderInfo) error { if r.dstore == nil { return nil @@ -1186,78 +1137,79 @@ func (r *Registry) syncPersistProvider(ctx context.Context, info *ProviderInfo) } func (r *Registry) pollProviders(normalPoll polling, pollOverrides map[peer.ID]polling) { - r.actions <- func() { - now := time.Now() - for peerID, info := range r.providers { - // Reset poll in case previously overridden. - poll := normalPoll - // If the provider is not allowed, then do not poll or de-list. - if !r.policy.Allowed(peerID) { - continue - } - if info.Publisher.Validate() != nil || !r.policy.Allowed(info.Publisher) { - // No publisher. - continue - } - // If using assigner service, and the provider's publisher is not - // assigned, then do not poll. - if r.assigned != nil { - r.assignMutex.Lock() - _, ok := r.assigned[info.Publisher] - r.assignMutex.Unlock() - if !ok { - continue - } - } - override, ok := pollOverrides[peerID] - if ok { - poll = override - } - if info.lastContactTime.IsZero() { - // There has been no contact since startup. Poll during next - // call to this function if no update for provider. - info.lastContactTime = now.Add(-poll.interval) - continue - } - noContactTime := now.Sub(info.lastContactTime) - if noContactTime < poll.interval { - // Had recent enough contact, no need to poll. + now := time.Now() + + r.provMutex.Lock() + defer r.provMutex.Unlock() + + for peerID, info := range r.providers { + // Reset poll in case previously overridden. + poll := normalPoll + // If the provider is not allowed, then do not poll or de-list. + if !r.policy.Allowed(peerID) { + continue + } + if info.Publisher.Validate() != nil || !r.policy.Allowed(info.Publisher) { + // No publisher. + continue + } + // If using assigner service, and the provider's publisher is not + // assigned, then do not poll. + if r.assigned != nil { + r.assignMutex.Lock() + _, ok := r.assigned[info.Publisher] + r.assignMutex.Unlock() + if !ok { continue } - sincePollingStarted := noContactTime - poll.interval - // If more than stopAfter time has elapsed since polling started, - // then the publisher is considered permanently unresponsive, so - // remove it. - if sincePollingStarted >= poll.stopAfter { - // Too much time since last contact. - log.Warnw("Lost contact with provider, too long with no updates", - "publisher", info.Publisher, - "provider", info.AddrInfo.ID, - "since", info.lastContactTime, - "sincePollingStarted", sincePollingStarted, - "stopAfter", poll.stopAfter) - // Remove the dead provider from the registry. - if err := r.syncRemoveProvider(context.Background(), peerID); err != nil { - log.Errorw("Failed to update deleted provider info", "err", err) - } - // Tell the ingester to remove data for the provider. - info.deleted = true - } else if sincePollingStarted >= poll.deactivateAfter { - // Still polling after deactivateAfter, so mark inactive. - // This will exclude the provider from find responses. - log.Infow("Deactivating provider, too long with no updates", - "publisher", info.Publisher, - "provider", info.AddrInfo.ID, - "since", info.lastContactTime, - "sincePollingStarted", sincePollingStarted, - "deactivateAfter", poll.deactivateAfter) - info.inactive = true - } - select { - case r.syncChan <- info: - default: - log.Debugw("Sync channel blocked, skipping auto-sync", "publisher", info.Publisher) + } + override, ok := pollOverrides[peerID] + if ok { + poll = override + } + if info.lastContactTime.IsZero() { + // There has been no contact since startup. Poll during next call + // to this function if no update for provider. + info.lastContactTime = now.Add(-poll.interval) + continue + } + noContactTime := now.Sub(info.lastContactTime) + if noContactTime < poll.interval { + // Had recent enough contact, no need to poll. + continue + } + sincePollingStarted := noContactTime - poll.interval + // If more than stopAfter time has elapsed since polling started, then + // the publisher is considered permanently unresponsive, so remove it. + if sincePollingStarted >= poll.stopAfter { + // Too much time since last contact. + log.Warnw("Lost contact with provider, too long with no updates", + "publisher", info.Publisher, + "provider", info.AddrInfo.ID, + "since", info.lastContactTime, + "sincePollingStarted", sincePollingStarted, + "stopAfter", poll.stopAfter) + // Remove the dead provider from the registry. + if err := r.syncRemoveProvider(context.Background(), peerID); err != nil { + log.Errorw("Failed to update deleted provider info", "err", err) } + // Tell the ingester to remove data for the provider. + info.deleted = true + } else if sincePollingStarted >= poll.deactivateAfter { + // Still polling after deactivateAfter, so mark inactive. This will + // exclude the provider from find responses. + log.Infow("Deactivating provider, too long with no updates", + "publisher", info.Publisher, + "provider", info.AddrInfo.ID, + "since", info.lastContactTime, + "sincePollingStarted", sincePollingStarted, + "deactivateAfter", poll.deactivateAfter) + info.inactive = true + } + select { + case r.syncChan <- info: + default: + log.Debugw("Sync channel blocked, skipping auto-sync", "publisher", info.Publisher) } } } diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index 089018ef4..51801a9f0 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -410,16 +410,19 @@ func TestPollProvider(t *testing.T) { t.Fatal("Expected sync channel to be written") } - // Check that actions chan is not blocked by unread auto-sync channel. + // Check that registry is not blocked by unread auto-sync channel. poll.retryAfter = 0 poll.deactivateAfter = 0 r.pollProviders(poll, nil) r.pollProviders(poll, nil) r.pollProviders(poll, nil) done := make(chan struct{}) - r.actions <- func() { + go func() { + _, ok := r.ProviderInfo(peerID) + require.True(t, ok) close(done) - } + }() + select { case <-done: case <-timeout: