diff --git a/processor.go b/processor.go index db35e1f7..32e4e24c 100644 --- a/processor.go +++ b/processor.go @@ -10,6 +10,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -25,6 +26,8 @@ const ( ProcStateRunning // ProcStateStopping indicates a stopping partition processor ProcStateStopping + // ProcStateStopped indicates a stopped partition processor + ProcStateStopped ) // ProcessCallback function is called for every message received by the @@ -60,6 +63,8 @@ type Processor struct { state *Signal + errMux sync.Mutex + err error done chan struct{} cancel context.CancelFunc } @@ -128,7 +133,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) graph: gg, - state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle), + state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping, ProcStateStopped).SetState(ProcStateIdle), done: make(chan struct{}), } @@ -260,7 +265,10 @@ func (g *Processor) Run(ctx context.Context) (rerr error) { // collect all errors before leaving var errs *multierror.Error defer func() { - rerr = multierror.Append(errs, rerr).ErrorOrNil() + g.errMux.Lock() + defer g.errMux.Unlock() + g.err = multierror.Append(errs, rerr).ErrorOrNil() + rerr = g.err }() var err error @@ -711,7 +719,7 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID()) g.state.SetState(ProcStateStopping) - defer g.state.SetState(ProcStateIdle) + defer g.state.SetState(ProcStateStopped) errg, _ := multierr.NewErrGroup(session.Context()) g.mTables.RLock() for part, partition := range g.partitions { @@ -926,6 +934,18 @@ func (g *Processor) Stop() { g.cancel() } +// Done returns a channel that is closed when the processor is stopped. +func (g *Processor) Done() <-chan struct{} { + return g.done +} + +// Error returns the error that caused the processor to stop. +func (g *Processor) Error() error { + g.errMux.Lock() + defer g.errMux.Unlock() + return g.err +} + // VisitAllWithStats visits all keys in parallel by passing the visit request // to all partitions. // The optional argument "meta" will be forwarded to the visit-function of each key of the table. diff --git a/processor_test.go b/processor_test.go index 7ec69aff..b6ac6e29 100644 --- a/processor_test.go +++ b/processor_test.go @@ -10,9 +10,10 @@ import ( "github.com/IBM/sarama" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/storage" - "github.com/stretchr/testify/require" ) func createMockBuilder(t *testing.T) (*gomock.Controller, *builderMock) { @@ -120,14 +121,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -135,8 +131,8 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } @@ -154,8 +150,8 @@ func TestProcessor_Run(t *testing.T) { // shutdown newProc.Stop() - <-done - require.NoError(t, procErr) + <-newProc.Done() + require.NoError(t, newProc.Error()) }) t.Run("loopback", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -196,14 +192,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -211,8 +202,8 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } @@ -222,8 +213,8 @@ func TestProcessor_Run(t *testing.T) { // shutdown newProc.Stop() - <-done - require.NoError(t, procErr) + <-newProc.Done() + require.NoError(t, newProc.Error()) }) t.Run("consume-error", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -248,14 +239,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -263,15 +249,15 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } cg.SendError(fmt.Errorf("test-error")) cancel() - <-done + <-newProc.Done() // the errors sent back by the consumergroup do not lead to a failure of the processor - require.NoError(t, procErr) + require.NoError(t, newProc.Error()) }) t.Run("setup-error", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -296,13 +282,51 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) + + cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) + + go func() { + newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // if there was an error during startup, no point in sending messages + // and waiting for them to be delivered + <-newProc.Done() + require.True(t, strings.Contains(newProc.Error().Error(), "setup-error")) + }) + t.Run("returns-error-on-failure", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, cg := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + // not really used, we're failing anyway + Input("input", new(codec.Int64), accumulate), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) + var ( procErr error done = make(chan struct{}) ) - cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) - go func() { defer close(done) procErr = newProc.Run(ctx) @@ -315,6 +339,46 @@ func TestProcessor_Run(t *testing.T) { <-done require.True(t, strings.Contains(procErr.Error(), "setup-error")) }) + t.Run("returns-nil-on-success", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + // not really used, we're stopping before the processor before producing + Input("input", new(codec.Int64), accumulate), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + var ( + procErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + newProc.Stop() + + <-done + require.NoError(t, procErr) + }) } func TestProcessor_StateReader(t *testing.T) { @@ -324,3 +388,101 @@ func TestProcessor_StateReader(t *testing.T) { require.Equal(t, ProcStateRunning, p.StateReader().State()) } + +func TestProcessor_Stop(t *testing.T) { + t.Run("expected-state", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + go func() { + newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // if there was an error during startup, no point in continuing + // and waiting for the processor to be stopped + select { + case <-newProc.Done(): + require.NoError(t, newProc.Error()) + default: + } + + require.Equal(t, ProcStateRunning, newProc.StateReader().State()) + + // shutdown + newProc.Stop() + <-newProc.Done() + + select { + case <-newProc.Done(): + require.Equal(t, ProcStateStopped, newProc.StateReader().State()) + require.NoError(t, newProc.Error()) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) + + t.Run("done-closes", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + go func() { + newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // shutdown + newProc.Stop() + + select { + case <-newProc.Done(): + require.NoError(t, newProc.Error()) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) +}