Skip to content

Commit

Permalink
Use RunWithScissors
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 15, 2023
1 parent 3bb5ca0 commit a5ff6e5
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func NewProcessor(
}

func (p *Processor) Run(ctx context.Context) error {
errC := make(chan error)

if p.workerFactor < 0.0 {
return fmt.Errorf("workerFactor must be positive or zero")
}
Expand All @@ -241,33 +243,34 @@ func (p *Processor) Run(ctx context.Context) error {
}

// Start the routine to do housekeeping tasks that don't need to be distributed to the workers.
var w sync.WaitGroup
w.Add(1)
go func(ctx context.Context) {
err := p.runHousekeeper(ctx)
if err != nil {
p.logger.Error("housekeeper failed", zap.Error(err))
}
}(ctx)
common.RunWithScissors(ctx, errC, "processor_housekeeper",
func(ctx context.Context) error {
err := p.runHousekeeper(ctx)
if err != nil {
errC <- fmt.Errorf("processor housekeeper failed: %v", err)
}
return err
})

// Start the workers.
w.Add(numWorkers)
for workerId := 1; workerId <= numWorkers; workerId++ {
go func(ctx context.Context, workerId int) {
p.logger.Info("processor worker started", zap.Int("workerId", workerId))
err := p.runWorker(ctx)
if err != nil {
p.logger.Error("processor worker failed", zap.Int("workerId", workerId), zap.Error(err))
}
p.logger.Info("processor worker done", zap.Int("workerId", workerId))
w.Done()
}(ctx, workerId)
workerId := workerId
common.RunWithScissors(ctx, errC, fmt.Sprintf("processor_worker_%d", workerId),
func(ctx context.Context) error {
err := p.runWorker(ctx)
if err != nil {
errC <- fmt.Errorf("processor worker %d failed: %v", workerId, err)
}
return err
})
}

w.Wait()

if p.acct != nil {
p.acct.Close()
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case e := <-errC:
err = e
}

// Leaving this here for easy debugging.
Expand All @@ -279,7 +282,11 @@ func (p *Processor) Run(ctx context.Context) error {
// metric = &dto.Metric{}
// _ = observationTotalDelay.Write(metric)
// p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String()))
return nil

if p.acct != nil {
p.acct.Close()
}
return err
}

// runHousekeeper performs general tasks that do not need to be distributed to the workers. There will always be exactly one instance of this.
Expand Down

0 comments on commit a5ff6e5

Please sign in to comment.