diff --git a/.gitignore b/.gitignore index bf142f7..3ce0214 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ # coverage.txt (created by go.test.sh) coverage.txt + +# vscode settings +.vscode/settings.json \ No newline at end of file diff --git a/opset.go b/opset.go index b95b606..4a6d957 100644 --- a/opset.go +++ b/opset.go @@ -1,13 +1,9 @@ package inflight -import "time" - // OpSet represents the set of Ops that have been merged in an OpQueue, // It provides convenience functions for appending new Ops and for completing them. type OpSet struct { set []*Op - // used by the opWindow determine when it's ok to dequeue - enqueuedAt time.Time } func newOpSet(op *Op) *OpSet { diff --git a/opwindow.go b/opwindow.go index 1a55403..bb4eabb 100644 --- a/opwindow.go +++ b/opwindow.go @@ -3,211 +3,168 @@ package inflight import ( "container/list" "context" + "fmt" "sync" "time" ) -// OpWindow is a thread-safe duplicate operation suppression queue, -// that combines duplicate operations (queue entires) into sets -// that will be dequeued together. -// -// For example, If you enqueue an item with a key that already exists, -// then that item will be appended to that key's set of items. Otherwise -// the item is inserted into the head of the list as a new item. -// -// On Dequeue a SET is returned of all items that share a key in the -// queue. It blocks on dequeue if the queue is empty, but returns an -// error if the queue is full during enqueue. +// OpWindow is a windowed, microbatching priority queue. +// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order. +// OpWindow provides backpressure for both depth (i.e., number of microbatches in queue) and width (i.e., number of operations in a microbatch). +// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow(). type OpWindow struct { - cond *sync.Cond - ctx context.Context - can context.CancelFunc + mu sync.Mutex + q list.List // *queueItem + m map[ID]*queueItem + + // These are selectable sync.Cond: use blocking read for Wait() and non-blocking write for Signal(). + queueHasItems chan struct{} + queueHasSpace chan struct{} + + once sync.Once + done chan struct{} depth int width int - windowedBy int64 - - q *list.List - entries map[ID]*OpSet + windowedBy time.Duration } -// NewOpWindow create a new OpWindow. Close by calling Close or cancaling -// the provided context. -func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow { - cond := sync.NewCond(&sync.Mutex{}) - myctx, can := context.WithCancel(ctx) +// NewOpWindow creates a new OpWindow. +// +// depth: maximum number of entries in a queue +// width: maximum number of entries in a microbatch. +// windowedBy: window size. +func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { q := &OpWindow{ - cond: cond, - ctx: myctx, - can: can, - depth: depth, - width: width, - q: list.New(), - entries: map[ID]*OpSet{}, - windowedBy: windowedBy.Nanoseconds(), + queueHasItems: make(chan struct{}), + queueHasSpace: make(chan struct{}), + done: make(chan struct{}), + depth: depth, + width: width, + windowedBy: windowedBy, + m: make(map[ID]*queueItem), } - go func() { - t := time.NewTicker(250 * time.Millisecond) - for { - select { - case <-q.ctx.Done(): - return - case n := <-t.C: - // signal someone to wake up incase we have any Items falling - // out of the window. - q.cond.L.Lock() - if q.itemsReady(n) { - q.cond.Signal() - } - q.cond.L.Unlock() - } - } - }() + q.q.Init() return q } -// Close releases resources associated with this callgroup, by canceling -// the context. The owner of this OpWindow should either call Close or cancel -// the context, both are equivalent. +// Close provides graceful shutdown: no new ops will be enqueued. func (q *OpWindow) Close() { - q.cond.L.Lock() - defer q.cond.L.Unlock() - - q.can() - // alert all dequeue calls that they should wake up and return. - q.windowedBy = 0 // turn off windowing so everything is dequeue - q.cond.Broadcast() - return + q.once.Do(func() { + q.mu.Lock() + defer q.mu.Unlock() + close(q.done) + // HACK (2023-12) (mh): Set depth to zero so new entries are rejected. + q.depth = 0 + }) } -// Len returns the number of uniq IDs in the queue, that is the depth of the -// queue. -func (q *OpWindow) Len() int { - q.cond.L.Lock() - defer q.cond.L.Unlock() - return q.q.Len() -} +// Enqueue op into queue, blocking until first of: op is enqueued, ID has hit max width, context is done, or queue is closed. +func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { + q.mu.Lock() // locked on returns below -// Enqueue add the op to the queue. If the ID already exists then the Op -// is added to the existing OpSet for this ID, otherwise it's inserted as -// a new OpSet. -// -// Enqueue doesn't block if the queue if full, instead it returns a -// ErrQueueSaturated error. -func (q *OpWindow) Enqueue(id ID, op *Op) error { - q.cond.L.Lock() - defer q.cond.L.Unlock() - - if q.q.Len() >= q.depth { - return ErrQueueSaturatedDepth - } - - set, ok := q.entries[id] - if !ok { - // This is a new item, so we need to insert it into the queue. - q.newEntry(id, op) - - // Signal one waiting go routine to wake up and Dequeue - // I believe we only need to signal if we enqueue a new item. - // Consider the following possible states the queue could be in : - // 1. if no one is currently waiting in Dequeue, the signal isn't - // needed and all items will be dequeued on the next call to - // Dequeue. - // 2. One or Many go-routines are waiting in Dequeue because it's - // empty, and calling Signal will wake up one. Which will - // dequeue the item and return. - // 3. At most One go-routine is in the act of Dequeueing existing - // items from the queue (i.e. only one can have the lock and be - // in the "if OK" condition within the forloop in Dequeue). In - // which cause the signal is ignored and after returning we - // return to condition (1) above. - // Note signaled waiting go-routines will not be able the acquire - // the condition lock until this method call returns, finishing - // its append of the new operation. - q.cond.Signal() - } else if len(set.Ops()) >= q.width { - return ErrQueueSaturatedWidth - } else { - set.append(op) - } - return nil -} + for { + item, ok := q.m[id] + if ok { + if len(item.OpSet.set) >= q.width { + close(item.IsFull) + q.mu.Unlock() + return ErrQueueSaturatedWidth + } + item.OpSet.append(op) + q.mu.Unlock() + return nil + } -// Dequeue removes the oldest OpSet from the queue and returns it. -// Dequeue will block if the Queue is empty. An Enqueue will wake the -// go routine up and it will continue on. -// -// If the OpWindow is closed, then Dequeue will return false -// for the second parameter. -func (q *OpWindow) Dequeue() (*OpSet, bool) { - q.cond.L.Lock() - defer q.cond.L.Unlock() + if q.q.Len() >= q.depth { + q.mu.Unlock() + select { + case <-ctx.Done(): + return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err()) + case <-q.done: + return ErrQueueClosed + case <-q.queueHasSpace: + q.mu.Lock() + continue + } + } - for { - if set, ok := q.dequeue(); ok { - return set, true + item = &queueItem{ + ID: id, + ProcessAt: time.Now().Add(q.windowedBy), + OpSet: newOpSet(op), + IsFull: make(chan struct{}), } + q.m[id] = item + q.q.PushBack(item) + q.mu.Unlock() select { - case <-q.ctx.Done(): - return nil, false + case q.queueHasItems <- struct{}{}: default: } - // release the lock and wait until signaled. On awake we'll acquire the lock. - // After wait acquires the lock we have to recheck the wait condition, - // because it's possible that someone else - // drained the queue while, we were reacquiring the lock. - q.cond.Wait() - } -} -type queElement struct { - id ID - enqueuedAtUnixN int64 + return nil + } } -func (q *OpWindow) newEntry(id ID, op *Op) { - set := newOpSet(op) - q.entries[id] = set +// Dequeue removes and returns the oldest OpSet whose window has passed from the queue, +// blocking until first of: OpSet is ready, context is canceled, or queue is closed. +func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) { + q.mu.Lock() // unlocked on returns below - eq := &queElement{id, time.Now().UnixNano()} - q.q.PushBack(eq) -} + var item *queueItem + for item == nil { + elem := q.q.Front() + if elem == nil { + q.mu.Unlock() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-q.done: + return nil, ErrQueueClosed + case <-q.queueHasItems: + q.mu.Lock() + continue + } -func (q *OpWindow) itemsReady(tim time.Time) bool { - elem := q.q.Front() - if elem == nil { - return false + } + item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item } - eq := elem.Value.(*queElement) - qt := tim.UnixNano() - eq.enqueuedAtUnixN - - if qt < q.windowedBy { - return false + waitFor := time.Until(item.ProcessAt) + if waitFor > 0 { + q.mu.Unlock() // + // NOTE (2023-12) (mh): Do we need to pool these? + timer := time.NewTimer(waitFor) + defer timer.Stop() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-q.done: + // process right away + case <-item.IsFull: + // process once full, regardless of windowing + case <-timer.C: + } + q.mu.Lock() } - return true -} -func (q *OpWindow) dequeue() (*OpSet, bool) { - elem := q.q.Front() - if elem == nil { - return nil, false - } + ops := item.OpSet + delete(q.m, item.ID) + q.mu.Unlock() + item = nil // gc - eq := elem.Value.(*queElement) - qt := time.Now().UnixNano() - eq.enqueuedAtUnixN - if qt < q.windowedBy { - return nil, false + select { + case q.queueHasSpace <- struct{}{}: + default: } + return ops, nil +} - q.q.Remove(elem) - id := eq.id - - set, ok := q.entries[id] - if !ok { - panic("invariant broken: we dequeued a value that isn't in the map") - } - delete(q.entries, id) - return set, true +type queueItem struct { + ID ID + ProcessAt time.Time + OpSet *OpSet + IsFull chan struct{} } diff --git a/opwindow_test.go b/opwindow_test.go index e2ef1e8..56c40e5 100644 --- a/opwindow_test.go +++ b/opwindow_test.go @@ -23,17 +23,18 @@ func TestOpWindow(t *testing.T) { 1 * time.Second, } - for _, winTimeT := range winTimes { - winTime := winTimeT // scope it locally so it can be correctly captured + for _, winTime := range winTimes { + winTime := winTime // scope it locally so it can be correctly captured t.Run(fmt.Sprintf("windowed_by_%v", winTime), func(t *testing.T) { t.Parallel() + ctx := context.Background() completed1 := 0 completed2 := 0 - cg1 := NewCallGroup(func(finalState map[ID]*Response) { + cg1 := NewCallGroup(func(map[ID]*Response) { completed1++ }) - cg2 := NewCallGroup(func(finalState map[ID]*Response) { + cg2 := NewCallGroup(func(map[ID]*Response) { completed2++ }) @@ -44,29 +45,29 @@ func TestOpWindow(t *testing.T) { op2_1 := cg2.Add(1, &tsMsg{123, now}) op2_2 := cg2.Add(2, &tsMsg{111, now}) - window := NewOpWindow(context.Background(), 3, 3, winTime) + window := NewOpWindow(3, 3, winTime) + t.Cleanup(window.Close) - defer window.Close() st := time.Now() { - err := window.Enqueue(op1_1.Key, op1_1) - assert.Equal(t, nil, err) - err = window.Enqueue(op2_1.Key, op2_1) - assert.Equal(t, nil, err) - err = window.Enqueue(op1_2.Key, op1_2) - assert.Equal(t, nil, err) - err = window.Enqueue(op2_2.Key, op2_2) - assert.Equal(t, nil, err) + err := window.Enqueue(ctx, op1_1.Key, op1_1) + require.NoError(t, err) + err = window.Enqueue(ctx, op2_1.Key, op2_1) + require.NoError(t, err) + err = window.Enqueue(ctx, op1_2.Key, op1_2) + require.NoError(t, err) + err = window.Enqueue(ctx, op2_2.Key, op2_2) + require.NoError(t, err) } - require.Equal(t, 2, window.Len()) // only 2 unique keys + require.Equal(t, 2, window.q.Len()) // only 2 unique keys - _, ok := window.Dequeue() - assert.True(t, ok) - _, ok = window.Dequeue() - assert.True(t, ok) + _, err := window.Dequeue(ctx) + assert.NoError(t, err) + _, err = window.Dequeue(ctx) + assert.NoError(t, err) - rt := time.Now().Sub(st) + rt := time.Since(st) assert.Greater(t, rt, winTime) }) } @@ -74,11 +75,12 @@ func TestOpWindow(t *testing.T) { func TestOpWindowClose(t *testing.T) { t.Parallel() + ctx := context.Background() winTime := 100 * time.Hour // we want everything to hang until we close the queue. - cg1 := NewCallGroup(func(finalState map[ID]*Response) {}) - cg2 := NewCallGroup(func(finalState map[ID]*Response) {}) + cg1 := NewCallGroup(func(map[ID]*Response) {}) + cg2 := NewCallGroup(func(map[ID]*Response) {}) now := time.Now() @@ -87,16 +89,16 @@ func TestOpWindowClose(t *testing.T) { op2_1 := cg2.Add(1, &tsMsg{123, now}) op2_2 := cg2.Add(2, &tsMsg{111, now}) - window := NewOpWindow(context.Background(), 3, 3, winTime) + window := NewOpWindow(3, 3, winTime) - err := window.Enqueue(op1_1.Key, op1_1) - assert.Equal(t, nil, err) - err = window.Enqueue(op2_1.Key, op2_1) - assert.Equal(t, nil, err) - err = window.Enqueue(op1_2.Key, op1_2) - assert.Equal(t, nil, err) - err = window.Enqueue(op2_2.Key, op2_2) - assert.Equal(t, nil, err) + err := window.Enqueue(ctx, op1_1.Key, op1_1) + require.NoError(t, err) + err = window.Enqueue(ctx, op2_1.Key, op2_1) + require.NoError(t, err) + err = window.Enqueue(ctx, op1_2.Key, op1_2) + require.NoError(t, err) + err = window.Enqueue(ctx, op2_2.Key, op2_2) + require.NoError(t, err) var ops uint64 var closes uint64 @@ -104,16 +106,13 @@ func TestOpWindowClose(t *testing.T) { for i := 0; i < workers; i++ { go func() { for { - e, ok := window.Dequeue() - if e != nil { - assert.True(t, ok) - atomic.AddUint64(&ops, 1) - } else { - assert.False(t, ok) - break + if _, err := window.Dequeue(ctx); err != nil { + require.ErrorIs(t, err, ErrQueueClosed) + atomic.AddUint64(&closes, 1) + return } + atomic.AddUint64(&ops, 1) } - atomic.AddUint64(&closes, 1) }() } @@ -124,4 +123,92 @@ func TestOpWindowClose(t *testing.T) { time.Sleep(1000 * time.Millisecond) assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes)) assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued + + err = window.Enqueue(ctx, op1_1.Key, op1_1) + require.ErrorIs(t, err, ErrQueueClosed) +} + +func TestOpWindowErrQueueSaturatedWidth(t *testing.T) { + t.Parallel() + cg := NewCallGroup(func(map[ID]*Response) {}) + now := time.Now() + + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(1, &tsMsg{123, now}) + + window := NewOpWindow(2, 1, time.Millisecond) + ctx := context.Background() + err := window.Enqueue(ctx, op1.Key, op1) + require.NoError(t, err) + + err = window.Enqueue(ctx, op2.Key, op2) + require.ErrorIs(t, err, ErrQueueSaturatedWidth) + + _, err = window.Dequeue(ctx) + require.NoError(t, err) + + err = window.Enqueue(ctx, op2.Key, op2) + require.NoError(t, err) +} + +func TestOpWindowErrQueueSaturatedDepth(t *testing.T) { + t.Parallel() + cg := NewCallGroup(func(map[ID]*Response) {}) + now := time.Now() + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(2, &tsMsg{234, now}) + + window := NewOpWindow(1, 1, time.Millisecond) + ctx := context.Background() + err := window.Enqueue(ctx, op1.Key, op1) + require.NoError(t, err) + + ctx1, cancel := context.WithTimeout(ctx, time.Millisecond) // let it run for a sec for coverage ¯\_(ツ)_/¯ + defer cancel() + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + go func() { + <-ctx1.Done() + window.queueHasSpace <- struct{}{} // emulate queue having space then filling back up + cancel() + }() + + err = window.Enqueue(ctx, op2.Key, op2) + require.ErrorIs(t, err, ErrQueueSaturatedDepth) + + _, err = window.Dequeue(ctx) + require.NoError(t, err) + + err = window.Enqueue(ctx, op2.Key, op2) + require.NoError(t, err) +} + +func TestOpWindowErrQueueSaturatedDepthClose(t *testing.T) { + t.Parallel() + cg := NewCallGroup(func(map[ID]*Response) {}) + now := time.Now() + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(2, &tsMsg{234, now}) + + window := NewOpWindow(1, 1, time.Millisecond) + ctx := context.Background() + err := window.Enqueue(ctx, op1.Key, op1) + require.NoError(t, err) + + go func() { + time.Sleep(time.Millisecond) + window.Close() + }() + + err = window.Enqueue(ctx, op2.Key, op2) + require.ErrorIs(t, err, ErrQueueClosed) +} + +func TestOpWindowDequeueEmptyQueue(t *testing.T) { + t.Parallel() + window := NewOpWindow(1, 1, time.Hour) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := window.Dequeue(ctx) + require.ErrorIs(t, err, ctx.Err()) }