From d0d6fefb09795d53489109b4d4c1b98b70bfe959 Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Fri, 5 Apr 2024 17:09:28 -0700 Subject: [PATCH 1/6] Improve processor stopped state management * Introduce a new `Stopped` state for processors. Since a processor cannot be restarted after it has been run, this state better indicates that the processor has reached its final state. * Expose a new `Done()` method on the processor to allow waiting on a processor to complete. This avoids the need to apply additional "done" channels on top of the processor when processors in go routines. --- processor.go | 12 ++++- processor_test.go | 111 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/processor.go b/processor.go index db35e1f7..97653ad7 100644 --- a/processor.go +++ b/processor.go @@ -10,6 +10,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -25,6 +26,8 @@ const ( ProcStateRunning // ProcStateStopping indicates a stopping partition processor ProcStateStopping + // ProcStateStopped indicates a stopped partition processor + ProcStateStopped ) // ProcessCallback function is called for every message received by the @@ -128,7 +131,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) graph: gg, - state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle), + state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping, ProcStateStopped).SetState(ProcStateIdle), done: make(chan struct{}), } @@ -711,7 +714,7 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID()) g.state.SetState(ProcStateStopping) - defer g.state.SetState(ProcStateIdle) + defer g.state.SetState(ProcStateStopped) errg, _ := multierr.NewErrGroup(session.Context()) g.mTables.RLock() for part, partition := range g.partitions { @@ -926,6 +929,11 @@ func (g *Processor) Stop() { g.cancel() } +// Done returns a channel that is closed when the processor is stopped. +func (g *Processor) Done() <-chan struct{} { + return g.done +} + // VisitAllWithStats visits all keys in parallel by passing the visit request // to all partitions. // The optional argument "meta" will be forwarded to the visit-function of each key of the table. diff --git a/processor_test.go b/processor_test.go index 7ec69aff..7962fd92 100644 --- a/processor_test.go +++ b/processor_test.go @@ -10,9 +10,10 @@ import ( "github.com/IBM/sarama" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/storage" - "github.com/stretchr/testify/require" ) func createMockBuilder(t *testing.T) (*gomock.Controller, *builderMock) { @@ -324,3 +325,111 @@ func TestProcessor_StateReader(t *testing.T) { require.Equal(t, ProcStateRunning, p.StateReader().State()) } + +func TestProcessor_Stop(t *testing.T) { + t.Run("expected-state", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + var ( + procErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // if there was an error during startup, no point in continuing + // and waiting for the processor to be stopped + select { + case <-done: + require.NoError(t, procErr) + default: + } + + require.Equal(t, ProcStateRunning, newProc.StateReader().State()) + + // shutdown + newProc.Stop() + <-done + + select { + case <-done: + require.Equal(t, ProcStateStopped, newProc.StateReader().State()) + require.NoError(t, procErr) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) +} + +func TestProcessor_Done(t *testing.T) { + t.Run("done-closes", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + var ( + procErr error + ) + + go func() { + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // shutdown + newProc.Stop() + + select { + case <-newProc.Done(): + require.NoError(t, procErr) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) +} From 5de45a07599b48c0ef53efab20a4e9e72d8dadbf Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Sun, 28 Apr 2024 19:53:55 -0700 Subject: [PATCH 2/6] combine tests --- processor_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/processor_test.go b/processor_test.go index 7962fd92..27372fdb 100644 --- a/processor_test.go +++ b/processor_test.go @@ -385,9 +385,7 @@ func TestProcessor_Stop(t *testing.T) { t.Errorf("processor did not shut down as expected") } }) -} -func TestProcessor_Done(t *testing.T) { t.Run("done-closes", func(t *testing.T) { ctrl, bm := createMockBuilder(t) defer ctrl.Finish() From 39e401570b619d0af3478ec9ebdce74636b2ef03 Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Mon, 6 May 2024 20:28:10 -0700 Subject: [PATCH 3/6] capture and return error --- processor.go | 9 ++++++++- processor_test.go | 8 +++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/processor.go b/processor.go index 97653ad7..1686155c 100644 --- a/processor.go +++ b/processor.go @@ -63,6 +63,7 @@ type Processor struct { state *Signal + err error done chan struct{} cancel context.CancelFunc } @@ -263,7 +264,8 @@ func (g *Processor) Run(ctx context.Context) (rerr error) { // collect all errors before leaving var errs *multierror.Error defer func() { - rerr = multierror.Append(errs, rerr).ErrorOrNil() + g.err = multierror.Append(errs, rerr).ErrorOrNil() + rerr = g.err }() var err error @@ -934,6 +936,11 @@ func (g *Processor) Done() <-chan struct{} { return g.done } +// Error returns the error that caused the processor to stop. +func (g *Processor) Error() error { + return g.err +} + // VisitAllWithStats visits all keys in parallel by passing the visit request // to all partitions. // The optional argument "meta" will be forwarded to the visit-function of each key of the table. diff --git a/processor_test.go b/processor_test.go index 27372fdb..9df1a863 100644 --- a/processor_test.go +++ b/processor_test.go @@ -315,6 +315,7 @@ func TestProcessor_Run(t *testing.T) { // and waiting for them to be delivered <-done require.True(t, strings.Contains(procErr.Error(), "setup-error")) + require.True(t, strings.Contains(newProc.Error().Error(), "setup-error")) }) } @@ -410,12 +411,9 @@ func TestProcessor_Stop(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - ) go func() { - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -425,7 +423,7 @@ func TestProcessor_Stop(t *testing.T) { select { case <-newProc.Done(): - require.NoError(t, procErr) + require.NoError(t, newProc.Error()) case <-time.After(10 * time.Second): t.Errorf("processor did not shut down as expected") } From 4ae7b1026f5f7935f207ec41c1493eac0d1c7636 Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Mon, 6 May 2024 20:31:39 -0700 Subject: [PATCH 4/6] error mutex --- processor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/processor.go b/processor.go index 1686155c..32e4e24c 100644 --- a/processor.go +++ b/processor.go @@ -63,6 +63,7 @@ type Processor struct { state *Signal + errMux sync.Mutex err error done chan struct{} cancel context.CancelFunc @@ -264,6 +265,8 @@ func (g *Processor) Run(ctx context.Context) (rerr error) { // collect all errors before leaving var errs *multierror.Error defer func() { + g.errMux.Lock() + defer g.errMux.Unlock() g.err = multierror.Append(errs, rerr).ErrorOrNil() rerr = g.err }() @@ -938,6 +941,8 @@ func (g *Processor) Done() <-chan struct{} { // Error returns the error that caused the processor to stop. func (g *Processor) Error() error { + g.errMux.Lock() + defer g.errMux.Unlock() return g.err } From c04b05bfce8717fa261b663789ab56b9b45e84a9 Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Tue, 7 May 2024 15:54:24 -0700 Subject: [PATCH 5/6] update tests --- processor_test.go | 145 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 101 insertions(+), 44 deletions(-) diff --git a/processor_test.go b/processor_test.go index 9df1a863..86da01a4 100644 --- a/processor_test.go +++ b/processor_test.go @@ -121,14 +121,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -136,8 +131,8 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } @@ -155,8 +150,8 @@ func TestProcessor_Run(t *testing.T) { // shutdown newProc.Stop() - <-done - require.NoError(t, procErr) + <-newProc.Done() + require.NoError(t, newProc.Error()) }) t.Run("loopback", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -197,14 +192,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -212,8 +202,8 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } @@ -223,8 +213,8 @@ func TestProcessor_Run(t *testing.T) { // shutdown newProc.Stop() - <-done - require.NoError(t, procErr) + <-newProc.Done() + require.NoError(t, newProc.Error()) }) t.Run("consume-error", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -249,14 +239,9 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -264,15 +249,15 @@ func TestProcessor_Run(t *testing.T) { // if there was an error during startup, no point in sending messages // and waiting for them to be delivered select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } cg.SendError(fmt.Errorf("test-error")) cancel() - <-done + <-newProc.Done() // the errors sent back by the consumergroup do not lead to a failure of the processor - require.NoError(t, procErr) + require.NoError(t, newProc.Error()) }) t.Run("setup-error", func(t *testing.T) { ctrl, bm := createMockBuilder(t) @@ -297,13 +282,51 @@ func TestProcessor_Run(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) + + cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) + + go func() { + newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // if there was an error during startup, no point in sending messages + // and waiting for them to be delivered + <-newProc.Done() + require.True(t, strings.Contains(newProc.Error().Error(), "setup-error")) + }) + t.Run("returns-error-on-failure", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, cg := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + // not really used, we're failing anyway + Input("input", new(codec.Int64), accumulate), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) + var ( procErr error done = make(chan struct{}) ) - cg.FailOnConsume(newErrSetup(123, fmt.Errorf("setup-error"))) - go func() { defer close(done) procErr = newProc.Run(ctx) @@ -315,7 +338,46 @@ func TestProcessor_Run(t *testing.T) { // and waiting for them to be delivered <-done require.True(t, strings.Contains(procErr.Error(), "setup-error")) - require.True(t, strings.Contains(newProc.Error().Error(), "setup-error")) + }) + t.Run("returns-nil-on-success", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + // not really used, we're failing anyway + Input("input", new(codec.Int64), accumulate), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + + var ( + procErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + newProc.Stop() + + <-done + require.NoError(t, procErr) }) } @@ -352,14 +414,9 @@ func TestProcessor_Stop(t *testing.T) { bm.createProcessorOptions(consBuilder, groupBuilder)..., ) require.NoError(t, err) - var ( - procErr error - done = make(chan struct{}) - ) go func() { - defer close(done) - procErr = newProc.Run(ctx) + newProc.Run(ctx) }() newProc.WaitForReady() @@ -367,8 +424,8 @@ func TestProcessor_Stop(t *testing.T) { // if there was an error during startup, no point in continuing // and waiting for the processor to be stopped select { - case <-done: - require.NoError(t, procErr) + case <-newProc.Done(): + require.NoError(t, newProc.Error()) default: } @@ -376,12 +433,12 @@ func TestProcessor_Stop(t *testing.T) { // shutdown newProc.Stop() - <-done + <-newProc.Done() select { - case <-done: + case <-newProc.Done(): require.Equal(t, ProcStateStopped, newProc.StateReader().State()) - require.NoError(t, procErr) + require.NoError(t, newProc.Error()) case <-time.After(10 * time.Second): t.Errorf("processor did not shut down as expected") } From 63d09bda00f414d3ea4ea020e326ee8fbcde04c8 Mon Sep 17 00:00:00 2001 From: Jameel Al-Aziz Date: Tue, 14 May 2024 17:27:43 -0700 Subject: [PATCH 6/6] reword incorrect comment --- processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor_test.go b/processor_test.go index 86da01a4..b6ac6e29 100644 --- a/processor_test.go +++ b/processor_test.go @@ -351,7 +351,7 @@ func TestProcessor_Run(t *testing.T) { consBuilder, _ := createTestConsumerBuilder(t) graph := DefineGroup("test", - // not really used, we're failing anyway + // not really used, we're stopping before the processor before producing Input("input", new(codec.Int64), accumulate), )