Skip to content

Commit

Permalink
feature/11: Adding stats collector implementation (#12)
Browse files Browse the repository at this point in the history
* feature/11: Adding stats tracker

* Adjustments

* feature/11: Adding tests

* Updating readme

* Changing lock type
  • Loading branch information
ChrisSandison authored Jul 24, 2023
1 parent 83480ec commit 42d1eea
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 1 deletion.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ $ go get github.com/thinkdata-works/gopipeline
- Dynamic step definition - supply one or more pure go functions to define your pipeline work
- Custom error handler - register a function to handle errors and toggle between halting and non-halting behaviours
- Wait group handling - Register the pipeline with one or more other waitgroups to build this into other asynchronous processes
- Stats reporting - asynchronously track the process of the pipeline for progress or benchmarking

## Quickstart

Expand All @@ -31,7 +32,7 @@ package main

import (
"context"

"time"
"github.com/thinkdata-works/gopipeline/pkg/gopipeline"
)

Expand Down Expand Up @@ -66,6 +67,15 @@ func process(ctx context.Context, resources []resource) []error {
getNewExternalUrl, reSignResource, applyChanges, notifyDownstream,
)

pipeline.RegisterReporter(
5 * time.Second, func(r Report) {
fmt.Printf(
"\n\n=====Begin stats=====\nTotal items finished: %d\nTotal items in pipeline: %d\nAverage items per second: %.6f\n=====End stats=====\n\n",
r.TotalFinished, r.TotalInPipeline, r.ItemsPerSecond,
)
},
)

err := pipeline.Work(ctx)
if err != nil {
errs = append(errs, err)
Expand Down
36 changes: 36 additions & 0 deletions pkg/gopipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"
)

type Pipeline[I any] interface {
Expand Down Expand Up @@ -42,8 +43,17 @@ type Pipeline[I any] interface {
*/
RegisterWaitGroups(...*sync.WaitGroup)

/*
Register a time interival in which to run a stats reporter on the pipeline. Also register a function which will recieve the stats report. You may use this function for printing status of the pipeline or benchmarking
Subsequent calls will replace the reporter
*/
RegisterReporter(reportInterval time.Duration, reporter func(r Report))

/*
Begins pipeline execution. Returns errors if they occur during setup
PLEASE NOTE that this function is not idempotent. Running the work function twice will not produce the same result. For subsequent runs, please create a new pipeline
*/
Work(ctx context.Context) error
}
Expand All @@ -62,6 +72,9 @@ type pipeline[I any] struct {

// internals
pipelineWg *sync.WaitGroup

// stats
stats *stats
}

func NewPipeline[I any](concurrencyLevel, bufferSize int) Pipeline[I] {
Expand Down Expand Up @@ -94,7 +107,18 @@ func (p *pipeline[I]) RegisterWaitGroups(groups ...*sync.WaitGroup) {
}
}

func (p *pipeline[I]) RegisterReporter(reportInterval time.Duration, reporter func(r Report)) {
p.stats = newStatsTracker(reportInterval, reporter)
}

func (p *pipeline[I]) Work(ctx context.Context) error {
if p.stats != nil {
p.stats.start(time.Now())
defer func() {
p.stats.done()
}()
}

defer func() {
for _, g := range p.listeningwgs {
g.Done()
Expand All @@ -110,6 +134,14 @@ func (p *pipeline[I]) Work(ctx context.Context) error {
return fmt.Errorf("must register at least one step")
}

if p.stats != nil {
// Append a step to the end where the item is completed
p.steps = append(p.steps, func(ctx context.Context, i I) (I, error) {
p.stats.registerItemComplete()
return i, nil
})
}

// Create the channel that inputs will be passed to
p.inputStream = make(chan I, p.bufferSize)

Expand Down Expand Up @@ -207,6 +239,10 @@ func (e *executor[I]) work(ctx context.Context) {
defer close(e.topChannel)

for item := range e.pipeline.inputStream {
// If we are collecting stats for the pipeline then register each new item
if e.pipeline.stats != nil {
e.pipeline.stats.registerNewItem()
}
e.topChannel <- item
}
}
35 changes: 35 additions & 0 deletions pkg/gopipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -133,6 +134,40 @@ func TestPipeline_ErrorHandler_Halting(t *testing.T) {
assertItemsWithStepTwoHaltingError(t, items)
}

func TestPipeline_Reporter(t *testing.T) {
pipeline := NewPipeline[*testItem](5, 20)

// create placeholder for test vals
items := make([]testItem, 1000)
pipeline.RegisterInputProvider(testPipelineProvider(&items))
pipeline.RegisterSteps(
stepOne, stepTwo, func(ctx context.Context, ti *testItem) (*testItem, error) {
time.Sleep(5 * time.Millisecond)
return ti, nil
}, stepThree,
)

var reportCalled int
previousTotal := int64(0)
pipeline.RegisterReporter(50*time.Millisecond, func(r Report) {
reportCalled++

// Numbers should be increasing
assert.Greater(t, r.TotalFinished, previousTotal)
previousTotal = r.TotalFinished

// Set minimum
assert.Greater(t, r.ItemsPerSecond, float64(50))

// No sensible assertions for TotalInPipeline
})
err := pipeline.Work(context.Background())
assert.NoError(t, err)

assertItems(t, items)
assert.GreaterOrEqual(t, reportCalled, 20)
}

func assertItems(t *testing.T, items []testItem) {
for i, item := range items {
assert.Equal(t, i, item.id)
Expand Down
97 changes: 97 additions & 0 deletions pkg/gopipeline/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package gopipeline

import (
"sync"
"time"
)

type stats struct {
mutex *sync.RWMutex
startedAt *time.Time

ticker *time.Ticker
doneChan chan bool

// Current number of items being processed by the pipeline
totalInPipeline int64
// Total items that have finished processing by the pipeline
totalFinished int64
// Number of items processed per second by the pipeline
itemsPerSecond float64

// Function to report the stats
reporter func(r Report)
interval time.Duration
}

type Report struct {
// Current number of items being processed by the pipeline
TotalInPipeline int64
// Total items that have finished processing by the pipeline
TotalFinished int64
// Number of items processed per second by the pipeline
ItemsPerSecond float64
}

func newStatsTracker(interval time.Duration, reporter func(r Report)) *stats {
s := &stats{
mutex: &sync.RWMutex{},
interval: interval,
reporter: reporter,
}
return s
}

func (s *stats) start(t time.Time) {
s.startedAt = &t

s.ticker = time.NewTicker(s.interval)
s.doneChan = make(chan bool)

go func() {
for {
select {
case <-s.doneChan:
return
case <-s.ticker.C:
s.report()
}
}
}()
}

func (s *stats) registerNewItem() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.totalInPipeline++
}

func (s *stats) registerItemComplete() {
s.mutex.Lock()
s.totalFinished++
s.totalInPipeline--
s.mutex.Unlock()

s.itemsPerSecond = float64(s.totalFinished) / time.Since(*s.startedAt).Seconds()
}

func (s *stats) done() {
s.ticker.Stop()
s.doneChan <- true
s.report() // one last report
}

func (s *stats) report() {
// Ext if there is nothing to report
if s.reporter == nil {
return
}

s.mutex.RLock()
defer s.mutex.RUnlock()
s.reporter(Report{
ItemsPerSecond: s.itemsPerSecond,
TotalFinished: s.totalFinished,
TotalInPipeline: s.totalInPipeline,
})
}

0 comments on commit 42d1eea

Please sign in to comment.