diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 9a7d9c5dcc7..87b46c9785b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -29,11 +29,15 @@ const ( blockHistorySize = int64(256) ) +var ( + BlockSubscriberServiceName = "BlockSubscriber" +) + type BlockSubscriber struct { - sync utils.StartStopOnce + utils.StartStopOnce + threadCtrl utils.ThreadControl + mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc hb httypes.HeadBroadcaster lp logpoller.LogPoller headC chan *evmtypes.Head @@ -53,6 +57,7 @@ var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{} func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr logger.Logger) *BlockSubscriber { return &BlockSubscriber{ + threadCtrl: utils.NewThreadControl(), hb: hb, lp: lp, headC: make(chan *evmtypes.Head, channelSize), @@ -81,8 +86,8 @@ func (bs *BlockSubscriber) getBlockRange(ctx context.Context) ([]uint64, error) return blocks, nil } -func (bs *BlockSubscriber) initializeBlocks(blocks []uint64) error { - logpollerBlocks, err := bs.lp.GetBlocksRange(bs.ctx, blocks, pg.WithParentCtx(bs.ctx)) +func (bs *BlockSubscriber) initializeBlocks(ctx context.Context, blocks []uint64) error { + logpollerBlocks, err := bs.lp.GetBlocksRange(ctx, blocks) if err != nil { return err } @@ -127,67 +132,61 @@ func (bs *BlockSubscriber) cleanup() { bs.lggr.Infof("lastClearedBlock is set to %d", bs.lastClearedBlock) } -func (bs *BlockSubscriber) Start(_ context.Context) error { - bs.lggr.Info("block subscriber started.") - return bs.sync.StartOnce("BlockSubscriber", func() error { - bs.mu.Lock() - defer bs.mu.Unlock() - bs.ctx, bs.cancel = context.WithCancel(context.Background()) - // initialize the blocks map with the recent blockSize blocks - blocks, err := bs.getBlockRange(bs.ctx) - if err != nil { - bs.lggr.Errorf("failed to get block range", err) - } - err = bs.initializeBlocks(blocks) - if err != nil { - bs.lggr.Errorf("failed to get log poller blocks", err) - } - - _, bs.unsubscribe = bs.hb.Subscribe(&headWrapper{headC: bs.headC, lggr: bs.lggr}) +func (bs *BlockSubscriber) initialize(ctx context.Context) { + bs.mu.Lock() + defer bs.mu.Unlock() + // initialize the blocks map with the recent blockSize blocks + blocks, err := bs.getBlockRange(ctx) + if err != nil { + bs.lggr.Errorf("failed to get block range", err) + } + err = bs.initializeBlocks(ctx, blocks) + if err != nil { + bs.lggr.Errorf("failed to get log poller blocks", err) + } + _, bs.unsubscribe = bs.hb.Subscribe(&headWrapper{headC: bs.headC, lggr: bs.lggr}) +} +func (bs *BlockSubscriber) Start(ctx context.Context) error { + return bs.StartOnce(BlockSubscriberServiceName, func() error { + bs.lggr.Info("block subscriber started.") + bs.initialize(ctx) // poll from head broadcaster channel and push to subscribers - { - go func(ctx context.Context) { - for { - select { - case h := <-bs.headC: - if h != nil { - bs.processHead(h) - } - case <-ctx.Done(): - return + bs.threadCtrl.Go(func(ctx context.Context) { + for { + select { + case h := <-bs.headC: + if h != nil { + bs.processHead(h) } + case <-ctx.Done(): + return } - }(bs.ctx) - } - - // clean up block maps - { - go func(ctx context.Context) { - ticker := time.NewTicker(cleanUpInterval) - for { - select { - case <-ticker.C: - bs.cleanup() - case <-ctx.Done(): - ticker.Stop() - return - } + } + }) + // cleanup old blocks + bs.threadCtrl.Go(func(ctx context.Context) { + ticker := time.NewTicker(cleanUpInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + bs.cleanup() + case <-ctx.Done(): + return } - }(bs.ctx) - } + } + }) return nil }) } func (bs *BlockSubscriber) Close() error { - bs.lggr.Info("stop block subscriber") - return bs.sync.StopOnce("BlockSubscriber", func() error { - bs.mu.Lock() - defer bs.mu.Unlock() - - bs.cancel() + return bs.StopOnce(BlockSubscriberServiceName, func() error { + bs.lggr.Info("stop block subscriber") + bs.threadCtrl.Close() bs.unsubscribe() return nil }) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go index 19dfa7d9281..23fcf3f6695 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go @@ -158,7 +158,7 @@ func TestBlockSubscriber_InitializeBlocks(t *testing.T) { bs := NewBlockSubscriber(hb, lp, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize - err := bs.initializeBlocks(tc.Blocks) + err := bs.initializeBlocks(testutils.Context(t), tc.Blocks) if tc.Error != nil { assert.Equal(t, tc.Error.Error(), err.Error()) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 50e7e85e3b6..b62fb370847 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -22,9 +22,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( + LogProviderServiceName = "LogEventProvider" + ErrHeadNotAvailable = fmt.Errorf("head not available") ErrBlockLimitExceeded = fmt.Errorf("block limit exceeded") @@ -78,9 +81,10 @@ var _ LogEventProviderTest = &logEventProvider{} // logEventProvider manages log filters for upkeeps and enables to read the log events. type logEventProvider struct { - lggr logger.Logger + utils.StartStopOnce + threadCtrl utils.ThreadControl - cancel context.CancelFunc + lggr logger.Logger poller logpoller.LogPoller @@ -99,8 +103,9 @@ type logEventProvider struct { func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { return &logEventProvider{ - packer: packer, + threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), + packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, opts: opts, @@ -109,33 +114,22 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa } func (p *logEventProvider) Start(context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) + return p.StartOnce(LogProviderServiceName, func() error { - p.lock.Lock() - if p.cancel != nil { - p.lock.Unlock() - cancel() // Cancel the created context - return errors.New("already started") - } - p.cancel = cancel - p.lock.Unlock() + readQ := make(chan []*big.Int, readJobQueueSize) - readQ := make(chan []*big.Int, readJobQueueSize) + p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads) - p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads) + for i := 0; i < readerThreads; i++ { + p.threadCtrl.Go(func(ctx context.Context) { + p.startReader(ctx, readQ) + }) + } - { // start readers - go func(ctx context.Context) { - for i := 0; i < readerThreads; i++ { - go p.startReader(ctx, readQ) - } - }(ctx) - } + p.threadCtrl.Go(func(ctx context.Context) { + lggr := p.lggr.With("where", "scheduler") - { // start scheduler - lggr := p.lggr.With("where", "scheduler") - go func(ctx context.Context) { - err := p.scheduleReadJobs(ctx, func(ids []*big.Int) { + p.scheduleReadJobs(ctx, func(ids []*big.Int) { select { case readQ <- ids: case <-ctx.Done(): @@ -143,35 +137,21 @@ func (p *logEventProvider) Start(context.Context) error { lggr.Warnw("readQ is full, dropping ids", "ids", ids) } }) - // if the context was canceled, we don't need to log the error - if ctx.Err() != nil { - return - } - if err != nil { - lggr.Warnw("stopped scheduling read jobs with error", "err", err) - } - lggr.Debug("stopped scheduling read jobs") - }(ctx) - } + }) - return nil + return nil + }) } func (p *logEventProvider) Close() error { - p.lock.Lock() - defer p.lock.Unlock() - - if cancel := p.cancel; cancel != nil { - p.cancel = nil - cancel() - } else { - return errors.New("already stopped") - } - return nil + return p.StopOnce(LogProviderServiceName, func() error { + p.threadCtrl.Close() + return nil + }) } -func (p *logEventProvider) Name() string { - return p.lggr.Name() +func (p *logEventProvider) HealthReport() map[string]error { + return map[string]error{LogProviderServiceName: p.Healthy()} } func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { @@ -237,7 +217,7 @@ func (p *logEventProvider) CurrentPartitionIdx() uint64 { } // scheduleReadJobs starts a scheduler that pushed ids to readQ for reading logs in the background. -func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([]*big.Int)) error { +func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([]*big.Int)) { ctx, cancel := context.WithCancel(pctx) defer cancel() @@ -265,7 +245,7 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([ partitionIdx++ atomic.StoreUint64(&p.currentPartitionIdx, partitionIdx) case <-ctx.Done(): - return ctx.Err() + return } } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go index 47070d71aed..db22886cbb7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go @@ -193,7 +193,7 @@ func TestLogEventProvider_ScheduleReadJobs(t *testing.T) { reads := make(chan []*big.Int, 100) go func(ctx context.Context) { - _ = p.scheduleReadJobs(ctx, func(ids []*big.Int) { + p.scheduleReadJobs(ctx, func(ids []*big.Int) { select { case reads <- ids: default: diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c1065b7e870..06a3b257065 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -26,6 +26,8 @@ import ( ) var ( + LogRecovererServiceName = "LogRecoverer" + // RecoveryInterval is the interval at which the recovery scanning processing is triggered RecoveryInterval = 5 * time.Second // RecoveryCacheTTL is the time to live for the recovery cache @@ -57,9 +59,10 @@ type visitedRecord struct { } type logRecoverer struct { - lggr logger.Logger + utils.StartStopOnce + threadCtrl utils.ThreadControl - cancel context.CancelFunc + lggr logger.Logger lookbackBlocks *atomic.Int64 blockTime *atomic.Int64 @@ -82,7 +85,9 @@ var _ LogRecoverer = &logRecoverer{} func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client client.Client, stateStore core.UpkeepStateReader, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logRecoverer { rec := &logRecoverer{ - lggr: lggr.Named("LogRecoverer"), + lggr: lggr.Named(LogRecovererServiceName), + + threadCtrl: utils.NewThreadControl(), blockTime: &atomic.Int64{}, lookbackBlocks: &atomic.Int64{}, @@ -104,63 +109,75 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie return rec } -func (r *logRecoverer) Start(_ context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) +// Start starts the log recoverer, which runs 3 threads in the background: +// 1. Recovery thread: scans for logs that were missed by the log poller +// 2. Cleanup thread: cleans up the cache of logs that were already processed +// 3. Block time thread: updates the block time of the chain +func (r *logRecoverer) Start(ctx context.Context) error { + return r.StartOnce(LogRecovererServiceName, func() error { + r.updateBlockTime(ctx) - r.lock.Lock() - if r.cancel != nil { - r.lock.Unlock() - cancel() // Cancel the created context - return errors.New("already started") - } - r.cancel = cancel - r.lock.Unlock() - - r.updateBlockTime(ctx) + r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) - r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) - - { - go func(ctx context.Context, interval time.Duration) { - recoverTicker := time.NewTicker(interval) - defer recoverTicker.Stop() - gcTicker := time.NewTicker(utils.WithJitter(GCInterval)) - defer gcTicker.Stop() - blockTimeUpdateTicker := time.NewTicker(blockTimeUpdateCadence) - defer blockTimeUpdateTicker.Stop() + r.threadCtrl.Go(func(ctx context.Context) { + recoveryTicker := time.NewTicker(r.interval) + defer recoveryTicker.Stop() for { select { - case <-recoverTicker.C: + case <-recoveryTicker.C: if err := r.recover(ctx); err != nil { r.lggr.Warnw("failed to recover logs", "err", err) } - case <-gcTicker.C: + case <-ctx.Done(): + return + } + } + }) + + r.threadCtrl.Go(func(ctx context.Context) { + cleanupTicker := time.NewTicker(utils.WithJitter(GCInterval)) + defer cleanupTicker.Stop() + + for { + select { + case <-cleanupTicker.C: r.clean(ctx) - gcTicker.Reset(utils.WithJitter(GCInterval)) - case <-blockTimeUpdateTicker.C: + cleanupTicker.Reset(utils.WithJitter(GCInterval)) + case <-ctx.Done(): + return + } + } + }) + + r.threadCtrl.Go(func(ctx context.Context) { + blockTimeTicker := time.NewTicker(blockTimeUpdateCadence) + defer blockTimeTicker.Stop() + + for { + select { + case <-blockTimeTicker.C: r.updateBlockTime(ctx) + blockTimeTicker.Reset(utils.WithJitter(blockTimeUpdateCadence)) case <-ctx.Done(): return } } - }(ctx, r.interval) - } + }) - return nil + return nil + }) } func (r *logRecoverer) Close() error { - r.lock.Lock() - defer r.lock.Unlock() + return r.StopOnce(LogRecovererServiceName, func() error { + r.threadCtrl.Close() + return nil + }) +} - if cancel := r.cancel; cancel != nil { - r.cancel = nil - cancel() - } else { - return errors.New("already stopped") - } - return nil +func (r *logRecoverer) HealthReport() map[string]error { + return map[string]error{LogRecovererServiceName: r.Healthy()} } func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 751b94bde5c..a4684e67078 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -41,6 +41,8 @@ const ( ) var ( + RegistryServiceName = "AutomationRegistry" + ErrLogReadFailure = fmt.Errorf("failure reading logs") ErrHeadNotAvailable = fmt.Errorf("head not available") ErrInitializationFailure = fmt.Errorf("failed to initialize registry") @@ -83,6 +85,8 @@ func NewEvmRegistry( finalityDepth uint32, ) *EvmRegistry { return &EvmRegistry{ + ctx: context.Background(), + threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("EvmRegistry"), poller: client.LogPoller(), addr: addr, @@ -124,7 +128,8 @@ type MercuryConfig struct { } type EvmRegistry struct { - sync utils.StartStopOnce + utils.StartStopOnce + threadCtrl utils.ThreadControl lggr logger.Logger poller logpoller.LogPoller addr common.Address @@ -134,13 +139,11 @@ type EvmRegistry struct { abi abi.ABI packer encoding.Packer chLog chan logpoller.Log - reInit *time.Timer mu sync.RWMutex logProcessed map[string]bool active ActiveUpkeepList lastPollBlock int64 ctx context.Context - cancel context.CancelFunc headFunc func(ocr2keepers.BlockKey) runState int runError error @@ -155,110 +158,83 @@ func (r *EvmRegistry) Name() string { return r.lggr.Name() } -func (r *EvmRegistry) Start(_ context.Context) error { - return r.sync.StartOnce("AutomationRegistry", func() error { - r.mu.Lock() - defer r.mu.Unlock() - r.ctx, r.cancel = context.WithCancel(context.Background()) - r.reInit = time.NewTimer(refreshInterval) - +func (r *EvmRegistry) Start(ctx context.Context) error { + return r.StartOnce(RegistryServiceName, func() error { if err := r.registerEvents(r.chainID, r.addr); err != nil { return fmt.Errorf("logPoller error while registering automation events: %w", err) } - // refresh the active upkeep keys; if the reInit timer returns, do it again - { - go func(cx context.Context, tmr *time.Timer, lggr logger.Logger, f func() error) { - err := f() - if err != nil { - lggr.Errorf("failed to initialize upkeeps", err) - } + r.threadCtrl.Go(func(ctx context.Context) { + lggr := r.lggr.With("where", "upkeeps_referesh") + err := r.refreshActiveUpkeeps() + if err != nil { + lggr.Errorf("failed to initialize upkeeps", err) + } + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() - for { - select { - case <-tmr.C: - err = f() - if err != nil { - lggr.Errorf("failed to re-initialize upkeeps", err) - } - tmr.Reset(refreshInterval) - case <-cx.Done(): - return + for { + select { + case <-ticker.C: + err = r.refreshActiveUpkeeps() + if err != nil { + lggr.Errorf("failed to refresh upkeeps", err) } + case <-ctx.Done(): + return } - }(r.ctx, r.reInit, r.lggr, r.refreshActiveUpkeeps) - } - - // start polling logs on an interval - { - go func(cx context.Context, lggr logger.Logger, f func() error) { - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - err := f() - if err != nil { - lggr.Errorf("failed to poll logs for upkeeps", err) - } - case <-cx.Done(): - ticker.Stop() - return + } + }) + + r.threadCtrl.Go(func(ctx context.Context) { + lggr := r.lggr.With("where", "logs_polling") + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := r.pollUpkeepStateLogs() + if err != nil { + lggr.Errorf("failed to poll logs for upkeeps", err) } + case <-ctx.Done(): + return } - }(r.ctx, r.lggr, r.pollUpkeepStateLogs) - } - - // run process to process logs from log channel - { - go func(cx context.Context, ch chan logpoller.Log, lggr logger.Logger, f func(logpoller.Log) error) { - for { - select { - case l := <-ch: - err := f(l) - if err != nil { - lggr.Errorf("failed to process log for upkeep", err) - } - case <-cx.Done(): - return + } + }) + + r.threadCtrl.Go(func(ctx context.Context) { + lggr := r.lggr.With("where", "logs_processing") + ch := r.chLog + + for { + select { + case l := <-ch: + err := r.processUpkeepStateLog(l) + if err != nil { + lggr.Errorf("failed to process log for upkeep", err) } + case <-ctx.Done(): + return } - }(r.ctx, r.chLog, r.lggr, r.processUpkeepStateLog) - } + } + }) - r.runState = 1 return nil }) } func (r *EvmRegistry) Close() error { - return r.sync.StopOnce("AutomationRegistry", func() error { - r.mu.Lock() - defer r.mu.Unlock() - r.cancel() - r.runState = 0 - r.runError = nil + return r.StopOnce(RegistryServiceName, func() error { + r.threadCtrl.Close() return nil }) } -func (r *EvmRegistry) Ready() error { - r.mu.RLock() - defer r.mu.RUnlock() - - if r.runState == 1 { - return nil - } - return r.sync.Ready() -} - func (r *EvmRegistry) HealthReport() map[string]error { - r.mu.RLock() - defer r.mu.RUnlock() - - if r.runState > 1 { - r.sync.SvcErrBuffer.Append(fmt.Errorf("failed run state: %w", r.runError)) - } - return map[string]error{r.Name(): r.sync.Healthy()} + return map[string]error{RegistryServiceName: r.Healthy()} } func (r *EvmRegistry) refreshActiveUpkeeps() error { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index ce1b50de229..cd123212376 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -2,7 +2,6 @@ package upkeepstate import ( "context" - "errors" "fmt" "io" "math/big" @@ -18,10 +17,12 @@ import ( ) const ( + UpkeepStateStoreServiceName = "UpkeepStateStore" // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. - GCInterval = 2 * time.Hour + GCInterval = 2 * time.Hour + // flushCadence is the amount of time between flushes to the DB. flushCadence = 30 * time.Second concurrentBatchCalls = 10 ) @@ -58,12 +59,13 @@ type upkeepStateRecord struct { // It stores the state of ineligible upkeeps in a local, in-memory cache. // In addition, performed events are fetched by the scanner on demand. type upkeepStateStore struct { - // dependencies + utils.StartStopOnce + threadCtrl utils.ThreadControl + orm ORM lggr logger.Logger scanner PerformedLogsScanner - // configuration retention time.Duration cleanCadence time.Duration @@ -73,20 +75,18 @@ type upkeepStateStore struct { pendingRecords []persistedStateRecord sem chan struct{} batchSize int - - // service values - cancel context.CancelFunc } // NewUpkeepStateStore creates a new state store func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { return &upkeepStateStore{ orm: orm, - lggr: lggr.Named("UpkeepStateStore"), + lggr: lggr.Named(UpkeepStateStoreServiceName), cache: map[string]*upkeepStateRecord{}, scanner: scanner, retention: CacheExpiration, cleanCadence: GCInterval, + threadCtrl: utils.NewThreadControl(), pendingRecords: []persistedStateRecord{}, sem: make(chan struct{}, concurrentBatchCalls), batchSize: batchSize, @@ -94,32 +94,18 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann } // Start starts the upkeep state store. -// it does background cleanup of the cache. +// it does background cleanup of the cache every GCInterval, +// and flush records to DB every flushCadence. func (u *upkeepStateStore) Start(pctx context.Context) error { - if u.retention == 0 { - return errors.New("pruneDepth must be greater than zero") - } - - u.mu.Lock() - if u.cancel != nil { - u.mu.Unlock() - return fmt.Errorf("already started") - } - - ctx, cancel := context.WithCancel(context.Background()) - - u.cancel = cancel - u.mu.Unlock() - - if err := u.scanner.Start(ctx); err != nil { - return fmt.Errorf("failed to start scanner") - } + return u.StartOnce(UpkeepStateStoreServiceName, func() error { + if err := u.scanner.Start(pctx); err != nil { + return fmt.Errorf("failed to start scanner") + } - u.lggr.Debug("Starting upkeep state store") + u.lggr.Debug("Starting upkeep state store") - { - go func(ctx context.Context) { - ticker := time.NewTicker(u.cleanCadence) + u.threadCtrl.Go(func(ctx context.Context) { + ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) defer ticker.Stop() flushTicker := newTickerFn(utils.WithJitter(flushCadence)) @@ -131,19 +117,18 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { if err := u.cleanup(ctx); err != nil { u.lggr.Errorw("unable to clean old state values", "err", err) } - ticker.Reset(utils.WithJitter(u.cleanCadence)) case <-flushTicker.C: u.flush(ctx) + flushTicker.Reset(utils.WithJitter(flushCadence)) case <-ctx.Done(): u.flush(ctx) return } } - }(ctx) - } - - return nil + }) + return nil + }) } func (u *upkeepStateStore) flush(ctx context.Context) { @@ -175,20 +160,14 @@ func (u *upkeepStateStore) flush(ctx context.Context) { // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { - u.mu.Lock() - defer u.mu.Unlock() - - if cancel := u.cancel; cancel != nil { - u.cancel = nil - cancel() - } else { - return fmt.Errorf("already stopped") - } - if err := u.scanner.Close(); err != nil { - return fmt.Errorf("failed to start scanner") - } + return u.StopOnce(UpkeepStateStoreServiceName, func() error { + u.threadCtrl.Close() + return nil + }) +} - return nil +func (u *upkeepStateStore) HealthReport() map[string]error { + return map[string]error{UpkeepStateStoreServiceName: u.Healthy()} } // SelectByWorkIDs returns the current state of the upkeep for the provided ids. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go index 9e7ba81e5eb..cc4f6d0a23f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store_test.go @@ -310,14 +310,13 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) { ctx := testutils.Context(t) tickerCh := make(chan time.Time) + oldNewTickerFn := newTickerFn oldFlushSize := batchSize - defer func() { - }() newTickerFn = func(d time.Duration) *time.Ticker { - return &time.Ticker{ - C: tickerCh, - } + t := time.NewTicker(d) + t.C = tickerCh + return t } batchSize = test.flushSize defer func() {