Skip to content

Commit

Permalink
gopipeline/4: Making error handler threasafe (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSandison authored Apr 21, 2023
1 parent 5948e21 commit f49bc7d
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/gopipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type Pipeline[I any] interface {
/*
Register a function that will receive errors returned from any of the pipeline steps.
If the handler returns `true` then execution of the next pipeline step will continue. If it returns false, or if the error handler is not defined, then pipeline will cease execution for the current item
If the handler returns `true` then execution of the next pipeline step will continue. If it returns false, or if the error handler is not defined, then pipeline will cease execution for the current item.
Invoking the error handler users a mutex, so any action it takes will be threadsafe.
Subsequent calls with replace the error handler
*/
Expand Down Expand Up @@ -55,6 +57,7 @@ type pipeline[I any] struct {
inputStream chan I
inputProvider func(context.Context, chan I)
errorHandler func(error) bool
errorMtx *sync.Mutex
listeningwgs []*sync.WaitGroup

// internals
Expand All @@ -66,6 +69,7 @@ func NewPipeline[I any](concurrencyLevel, bufferSize int) Pipeline[I] {
concurrencyLevel: concurrencyLevel,
bufferSize: bufferSize,
pipelineWg: &sync.WaitGroup{},
errorMtx: &sync.Mutex{},
listeningwgs: []*sync.WaitGroup{},
}
return p
Expand Down Expand Up @@ -182,7 +186,7 @@ func (e *executor[I]) runStep(ctx context.Context, upstream, downstream chan I,
if err != nil {
var shouldcontinue bool
if e.pipeline.errorHandler != nil {
shouldcontinue = e.pipeline.errorHandler(err)
shouldcontinue = e.handleError(err)
}

if shouldcontinue {
Expand All @@ -197,6 +201,12 @@ func (e *executor[I]) runStep(ctx context.Context, upstream, downstream chan I,
}
}

func (e *executor[I]) handleError(err error) bool {
e.pipeline.errorMtx.Lock()
defer e.pipeline.errorMtx.Unlock()
return e.pipeline.errorHandler(err)
}

// Kick off the executor by sending items from the input stream
func (e *executor[I]) work(ctx context.Context) {
defer close(e.topChannel)
Expand Down

0 comments on commit f49bc7d

Please sign in to comment.