Skip to content

Commit

Permalink
Merge pull request #449 from jalaziz/improved-processor-state
Browse files Browse the repository at this point in the history
Improve processor stopped state management
  • Loading branch information
frairon authored May 16, 2024
2 parents bd3f866 + 63d09bd commit 90c88b0
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 36 deletions.
26 changes: 23 additions & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/IBM/sarama"
"github.com/hashicorp/go-multierror"

"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
Expand All @@ -25,6 +26,8 @@ const (
ProcStateRunning
// ProcStateStopping indicates a stopping partition processor
ProcStateStopping
// ProcStateStopped indicates a stopped partition processor
ProcStateStopped
)

// ProcessCallback function is called for every message received by the
Expand Down Expand Up @@ -60,6 +63,8 @@ type Processor struct {

state *Signal

errMux sync.Mutex
err error
done chan struct{}
cancel context.CancelFunc
}
Expand Down Expand Up @@ -128,7 +133,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)

graph: gg,

state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle),
state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping, ProcStateStopped).SetState(ProcStateIdle),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -260,7 +265,10 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
// collect all errors before leaving
var errs *multierror.Error
defer func() {
rerr = multierror.Append(errs, rerr).ErrorOrNil()
g.errMux.Lock()
defer g.errMux.Unlock()
g.err = multierror.Append(errs, rerr).ErrorOrNil()
rerr = g.err
}()

var err error
Expand Down Expand Up @@ -711,7 +719,7 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error {
defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID())

g.state.SetState(ProcStateStopping)
defer g.state.SetState(ProcStateIdle)
defer g.state.SetState(ProcStateStopped)
errg, _ := multierr.NewErrGroup(session.Context())
g.mTables.RLock()
for part, partition := range g.partitions {
Expand Down Expand Up @@ -926,6 +934,18 @@ func (g *Processor) Stop() {
g.cancel()
}

// Done returns a channel that is closed when the processor is stopped.
func (g *Processor) Done() <-chan struct{} {
return g.done
}

// Error returns the error that caused the processor to stop.
func (g *Processor) Error() error {
g.errMux.Lock()
defer g.errMux.Unlock()
return g.err
}

// VisitAllWithStats visits all keys in parallel by passing the visit request
// to all partitions.
// The optional argument "meta" will be forwarded to the visit-function of each key of the table.
Expand Down
Loading

0 comments on commit 90c88b0

Please sign in to comment.