From a5ff6e5906808e45a713ed874c147a0fb00c9bdd Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Mon, 7 Aug 2023 10:54:14 -0500 Subject: [PATCH] Use RunWithScissors --- node/pkg/processor/processor.go | 53 +++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 5613fdd3dd..b63ecc0189 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -227,6 +227,8 @@ func NewProcessor( } func (p *Processor) Run(ctx context.Context) error { + errC := make(chan error) + if p.workerFactor < 0.0 { return fmt.Errorf("workerFactor must be positive or zero") } @@ -241,33 +243,34 @@ func (p *Processor) Run(ctx context.Context) error { } // Start the routine to do housekeeping tasks that don't need to be distributed to the workers. - var w sync.WaitGroup - w.Add(1) - go func(ctx context.Context) { - err := p.runHousekeeper(ctx) - if err != nil { - p.logger.Error("housekeeper failed", zap.Error(err)) - } - }(ctx) + common.RunWithScissors(ctx, errC, "processor_housekeeper", + func(ctx context.Context) error { + err := p.runHousekeeper(ctx) + if err != nil { + errC <- fmt.Errorf("processor housekeeper failed: %v", err) + } + return err + }) // Start the workers. - w.Add(numWorkers) for workerId := 1; workerId <= numWorkers; workerId++ { - go func(ctx context.Context, workerId int) { - p.logger.Info("processor worker started", zap.Int("workerId", workerId)) - err := p.runWorker(ctx) - if err != nil { - p.logger.Error("processor worker failed", zap.Int("workerId", workerId), zap.Error(err)) - } - p.logger.Info("processor worker done", zap.Int("workerId", workerId)) - w.Done() - }(ctx, workerId) + workerId := workerId + common.RunWithScissors(ctx, errC, fmt.Sprintf("processor_worker_%d", workerId), + func(ctx context.Context) error { + err := p.runWorker(ctx) + if err != nil { + errC <- fmt.Errorf("processor worker %d failed: %v", workerId, err) + } + return err + }) } - w.Wait() - - if p.acct != nil { - p.acct.Close() + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case e := <-errC: + err = e } // Leaving this here for easy debugging. @@ -279,7 +282,11 @@ func (p *Processor) Run(ctx context.Context) error { // metric = &dto.Metric{} // _ = observationTotalDelay.Write(metric) // p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String())) - return nil + + if p.acct != nil { + p.acct.Close() + } + return err } // runHousekeeper performs general tasks that do not need to be distributed to the workers. There will always be exactly one instance of this.