Skip to content

Commit

Permalink
Use common.StartRunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 15, 2023
1 parent b016515 commit a06d2af
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 5 deletions.
47 changes: 47 additions & 0 deletions node/pkg/common/scissors.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,50 @@ func WrapWithScissors(runnable supervisor.Runnable, name string) supervisor.Runn
return runnable(ctx)
}
}

// StartRunnable starts a go routine with the ability to recover from errors by publishing them to an error channel. If catchPanics is true,
// it will also catch panics and publish the panic message to the error channel. If catchPanics is false, the panic will be propagated upward.
func StartRunnable(ctx context.Context, errC chan error, catchPanics bool, name string, runnable supervisor.Runnable) {
ScissorsErrorsCaught.WithLabelValues(name).Add(0)
if catchPanics {
ScissorsPanicsCaught.WithLabelValues(name).Add(0)
}
go func() {
if catchPanics {
defer func() {
if r := recover(); r != nil {
var err error
switch x := r.(type) {
case error:
err = fmt.Errorf("%s: %w", name, x)
default:
err = fmt.Errorf("%s: %v", name, x)
}
// We don't want this to hang if the listener has already gone away.
select {
case errC <- err:
default:
}
ScissorsPanicsCaught.WithLabelValues(name).Inc()

}
}()
}
startRunnable(ctx, errC, name, runnable)
}()
}

// startRunnable is used by StartRunnable. It is a separate function so we can call it directly from tests.
func startRunnable(ctx context.Context, errC chan error, name string, runnable supervisor.Runnable) {
func() {
err := runnable(ctx)
if err != nil {
// We don't want this to hang if the listener has already gone away.
select {
case errC <- err:
default:
}
ScissorsErrorsCaught.WithLabelValues(name).Inc()
}
}()
}
132 changes: 132 additions & 0 deletions node/pkg/common/scissors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,135 @@ func TestRunWithScissorsErrorDoesNotBlockWhenNoListener(t *testing.T) {
assert.Equal(t, 1.0, getCounterValue(ScissorsErrorsCaught, "TestRunWithScissorsErrorDoesNotBlockWhenNoListener"))
assert.Equal(t, 0.0, getCounterValue(ScissorsPanicsCaught, "TestRunWithScissorsErrorDoesNotBlockWhenNoListener"))
}

func TestStartRunnable_CleanExit(t *testing.T) {
ctx := context.Background()
errC := make(chan error)

itRan := make(chan bool, 1)
StartRunnable(ctx, errC, true, "TestStartRunnable_CleanExit", func(ctx context.Context) error {
itRan <- true
return nil
})

shouldHaveRun := <-itRan
require.Equal(t, true, shouldHaveRun)

// Need to wait a bit to make sure the scissors code completes without hanging.
time.Sleep(100 * time.Millisecond)

assert.Equal(t, 0.0, getCounterValue(ScissorsErrorsCaught, "TestStartRunnable_CleanExit"))
assert.Equal(t, 0.0, getCounterValue(ScissorsPanicsCaught, "TestStartRunnable_CleanExit"))
}

func TestStartRunnable_OnError(t *testing.T) {
ctx := context.Background()
errC := make(chan error)

itRan := make(chan bool, 1)
StartRunnable(ctx, errC, true, "TestStartRunnable_OnError", func(ctx context.Context) error {
itRan <- true
return fmt.Errorf("Some random error")
})

var err error
select {
case <-ctx.Done():
break
case err = <-errC:
break
}

shouldHaveRun := <-itRan
require.Equal(t, true, shouldHaveRun)
assert.Error(t, err)
assert.Equal(t, "Some random error", err.Error())
assert.Equal(t, 1.0, getCounterValue(ScissorsErrorsCaught, "TestStartRunnable_OnError"))
assert.Equal(t, 0.0, getCounterValue(ScissorsPanicsCaught, "TestStartRunnable_OnError"))
}

func TestStartRunnable_DontCatchPanics_OnPanic(t *testing.T) {
ctx := context.Background()
errC := make(chan error)

itRan := make(chan bool, 1)
itPanicked := make(chan bool, 1)

// We can't use StartRunnable() because we cannot test for a panic in another go routine.
// This verifies that startRunnable() lets the panic through so it gets caught here, allowing us to test for it.
func() {
defer func() {
if r := recover(); r != nil {
itPanicked <- true
}
itRan <- true
}()

startRunnable(ctx, errC, "TestStartRunnable_DontCatchPanics_OnPanic", func(ctx context.Context) error {
panic("Some random panic")
})
}()

var shouldHaveRun bool
select {
case <-ctx.Done():
break
case shouldHaveRun = <-itRan:
break
}

require.Equal(t, true, shouldHaveRun)

require.Equal(t, 1, len(itPanicked))
shouldHavePanicked := <-itPanicked
require.Equal(t, true, shouldHavePanicked)

assert.Equal(t, 0.0, getCounterValue(ScissorsErrorsCaught, "TestStartRunnable_DontCatchPanics_OnPanic"))
assert.Equal(t, 0.0, getCounterValue(ScissorsPanicsCaught, "TestStartRunnable_DontCatchPanics_OnPanic"))
}

func TestStartRunnable_CatchPanics_OnPanic(t *testing.T) {
ctx := context.Background()
errC := make(chan error)

itRan := make(chan bool, 1)
StartRunnable(ctx, errC, true, "TestStartRunnable_CatchPanics_OnPanic", func(ctx context.Context) error {
itRan <- true
panic("Some random panic")
})

var err error
select {
case <-ctx.Done():
break
case err = <-errC:
break
}

shouldHaveRun := <-itRan
require.Equal(t, true, shouldHaveRun)
assert.Error(t, err)
assert.Equal(t, "TestStartRunnable_CatchPanics_OnPanic: Some random panic", err.Error())
assert.Equal(t, 0.0, getCounterValue(ScissorsErrorsCaught, "TestStartRunnable_CatchPanics_OnPanic"))
assert.Equal(t, 1.0, getCounterValue(ScissorsPanicsCaught, "TestStartRunnable_CatchPanics_OnPanic"))
}

func TestStartRunnable_DoesNotBlockWhenNoListener(t *testing.T) {
ctx := context.Background()
errC := make(chan error)

itRan := make(chan bool, 1)
StartRunnable(ctx, errC, true, "TestStartRunnable_DoesNotBlockWhenNoListener", func(ctx context.Context) error {
itRan <- true
panic("Some random panic")
})

shouldHaveRun := <-itRan
require.Equal(t, true, shouldHaveRun)

// Need to wait a bit to make sure the scissors code completes without hanging.
time.Sleep(100 * time.Millisecond)

assert.Equal(t, 0.0, getCounterValue(ScissorsErrorsCaught, "TestStartRunnable_DoesNotBlockWhenNoListener"))
assert.Equal(t, 1.0, getCounterValue(ScissorsPanicsCaught, "TestStartRunnable_DoesNotBlockWhenNoListener"))
}
9 changes: 4 additions & 5 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,13 @@ func (p *Processor) Run(ctx context.Context) error {
numWorkers = int(math.Ceil(float64(runtime.NumCPU()) * p.workerFactor))
p.logger.Info("processor configured to use workers", zap.Int("numWorkers", numWorkers), zap.Float64("workerFactor", p.workerFactor))
}
// Start the routine to do housekeeping tasks that don't need to be distributed to the workers. We will catch errors but not panics.
common.StartRunnable(ctx, errC, false, "processor_housekeeper", p.runHousekeeper)

// Start the routine to do housekeeping tasks that don't need to be distributed to the workers.
common.RunWithScissors(ctx, errC, "processor_housekeeper", p.runHousekeeper)

// Start the workers.
// Start the workers. We will catch errors but not panics.
for workerId := 1; workerId <= numWorkers; workerId++ {
workerId := workerId
common.RunWithScissors(ctx, errC, fmt.Sprintf("processor_worker_%d", workerId), p.runWorker)
common.StartRunnable(ctx, errC, false, fmt.Sprintf("processor_worker_%d", workerId), p.runWorker)
}

var err error
Expand Down

0 comments on commit a06d2af

Please sign in to comment.