Skip to content

Commit

Permalink
Improve processor stopped state management
Browse files Browse the repository at this point in the history
* Introduce a new `Stopped` state for processors. Since a processor
  cannot be restarted after it has been run, this state better indicates
  that the processor has reached its final state.
* Expose a new `Done()` method on the processor to allow waiting on a
  processor to complete. This avoids the need to apply additional "done"
  channels on top of the processor when processors in go routines.
  • Loading branch information
jalaziz committed Apr 29, 2024
1 parent 85b1720 commit d0d6fef
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 3 deletions.
12 changes: 10 additions & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/IBM/sarama"
"github.com/hashicorp/go-multierror"

"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
Expand All @@ -25,6 +26,8 @@ const (
ProcStateRunning
// ProcStateStopping indicates a stopping partition processor
ProcStateStopping
// ProcStateStopped indicates a stopped partition processor
ProcStateStopped
)

// ProcessCallback function is called for every message received by the
Expand Down Expand Up @@ -128,7 +131,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)

graph: gg,

state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle),
state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping, ProcStateStopped).SetState(ProcStateIdle),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -711,7 +714,7 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error {
defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID())

g.state.SetState(ProcStateStopping)
defer g.state.SetState(ProcStateIdle)
defer g.state.SetState(ProcStateStopped)
errg, _ := multierr.NewErrGroup(session.Context())
g.mTables.RLock()
for part, partition := range g.partitions {
Expand Down Expand Up @@ -926,6 +929,11 @@ func (g *Processor) Stop() {
g.cancel()
}

// Done returns a channel that is closed when the processor is stopped.
func (g *Processor) Done() <-chan struct{} {
return g.done
}

// VisitAllWithStats visits all keys in parallel by passing the visit request
// to all partitions.
// The optional argument "meta" will be forwarded to the visit-function of each key of the table.
Expand Down
111 changes: 110 additions & 1 deletion processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

"github.com/IBM/sarama"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/storage"
"github.com/stretchr/testify/require"
)

func createMockBuilder(t *testing.T) (*gomock.Controller, *builderMock) {
Expand Down Expand Up @@ -324,3 +325,111 @@ func TestProcessor_StateReader(t *testing.T) {

require.Equal(t, ProcStateRunning, p.StateReader().State())
}

func TestProcessor_Stop(t *testing.T) {
t.Run("expected-state", func(t *testing.T) {
ctrl, bm := createMockBuilder(t)
defer ctrl.Finish()

bm.tmgr.EXPECT().Close().Times(1)
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
bm.producer.EXPECT().Close().Times(1)

groupBuilder, _ := createTestConsumerGroupBuilder(t)
consBuilder, _ := createTestConsumerBuilder(t)

graph := DefineGroup("test",
Input("input", new(codec.Int64), func(ctx Context, msg interface{}) {
// Do nothing
}),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel()

newProc, err := NewProcessor([]string{"localhost:9092"}, graph,
bm.createProcessorOptions(consBuilder, groupBuilder)...,
)
require.NoError(t, err)
var (
procErr error
done = make(chan struct{})
)

go func() {
defer close(done)
procErr = newProc.Run(ctx)
}()

newProc.WaitForReady()

// if there was an error during startup, no point in continuing
// and waiting for the processor to be stopped
select {
case <-done:
require.NoError(t, procErr)
default:
}

require.Equal(t, ProcStateRunning, newProc.StateReader().State())

// shutdown
newProc.Stop()
<-done

select {
case <-done:
require.Equal(t, ProcStateStopped, newProc.StateReader().State())
require.NoError(t, procErr)
case <-time.After(10 * time.Second):
t.Errorf("processor did not shut down as expected")
}
})
}

func TestProcessor_Done(t *testing.T) {
t.Run("done-closes", func(t *testing.T) {
ctrl, bm := createMockBuilder(t)
defer ctrl.Finish()

bm.tmgr.EXPECT().Close().Times(1)
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
bm.producer.EXPECT().Close().Times(1)

groupBuilder, _ := createTestConsumerGroupBuilder(t)
consBuilder, _ := createTestConsumerBuilder(t)

graph := DefineGroup("test",
Input("input", new(codec.Int64), func(ctx Context, msg interface{}) {
// Do nothing
}),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel()

newProc, err := NewProcessor([]string{"localhost:9092"}, graph,
bm.createProcessorOptions(consBuilder, groupBuilder)...,
)
require.NoError(t, err)
var (
procErr error
)

go func() {
procErr = newProc.Run(ctx)
}()

newProc.WaitForReady()

// shutdown
newProc.Stop()

select {
case <-newProc.Done():
require.NoError(t, procErr)
case <-time.After(10 * time.Second):
t.Errorf("processor did not shut down as expected")
}
})
}

0 comments on commit d0d6fef

Please sign in to comment.