Skip to content

Commit

Permalink
OpWindow: Redux (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes authored Dec 27, 2023
1 parent 8d7d896 commit a9ffe31
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 212 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@

# coverage.txt (created by go.test.sh)
coverage.txt

# vscode settings
.vscode/settings.json
4 changes: 0 additions & 4 deletions opset.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package inflight

import "time"

// OpSet represents the set of Ops that have been merged in an OpQueue,
// It provides convenience functions for appending new Ops and for completing them.
type OpSet struct {
set []*Op
// used by the opWindow determine when it's ok to dequeue
enqueuedAt time.Time
}

func newOpSet(op *Op) *OpSet {
Expand Down
295 changes: 126 additions & 169 deletions opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,211 +3,168 @@ package inflight
import (
"container/list"
"context"
"fmt"
"sync"
"time"
)

// OpWindow is a thread-safe duplicate operation suppression queue,
// that combines duplicate operations (queue entires) into sets
// that will be dequeued together.
//
// For example, If you enqueue an item with a key that already exists,
// then that item will be appended to that key's set of items. Otherwise
// the item is inserted into the head of the list as a new item.
//
// On Dequeue a SET is returned of all items that share a key in the
// queue. It blocks on dequeue if the queue is empty, but returns an
// error if the queue is full during enqueue.
// OpWindow is a windowed, microbatching priority queue.
// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order.
// OpWindow provides backpressure for both depth (i.e., number of microbatches in queue) and width (i.e., number of operations in a microbatch).
// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow().
type OpWindow struct {
cond *sync.Cond
ctx context.Context
can context.CancelFunc
mu sync.Mutex
q list.List // *queueItem
m map[ID]*queueItem

// These are selectable sync.Cond: use blocking read for Wait() and non-blocking write for Signal().
queueHasItems chan struct{}
queueHasSpace chan struct{}

once sync.Once
done chan struct{}

depth int
width int
windowedBy int64

q *list.List
entries map[ID]*OpSet
windowedBy time.Duration
}

// NewOpWindow create a new OpWindow. Close by calling Close or cancaling
// the provided context.
func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow {
cond := sync.NewCond(&sync.Mutex{})
myctx, can := context.WithCancel(ctx)
// NewOpWindow creates a new OpWindow.
//
// depth: maximum number of entries in a queue
// width: maximum number of entries in a microbatch.
// windowedBy: window size.
func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
q := &OpWindow{
cond: cond,
ctx: myctx,
can: can,
depth: depth,
width: width,
q: list.New(),
entries: map[ID]*OpSet{},
windowedBy: windowedBy.Nanoseconds(),
queueHasItems: make(chan struct{}),
queueHasSpace: make(chan struct{}),
done: make(chan struct{}),
depth: depth,
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
}
go func() {
t := time.NewTicker(250 * time.Millisecond)
for {
select {
case <-q.ctx.Done():
return
case n := <-t.C:
// signal someone to wake up incase we have any Items falling
// out of the window.
q.cond.L.Lock()
if q.itemsReady(n) {
q.cond.Signal()
}
q.cond.L.Unlock()
}
}
}()
q.q.Init()
return q
}

// Close releases resources associated with this callgroup, by canceling
// the context. The owner of this OpWindow should either call Close or cancel
// the context, both are equivalent.
// Close provides graceful shutdown: no new ops will be enqueued.
func (q *OpWindow) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.can()
// alert all dequeue calls that they should wake up and return.
q.windowedBy = 0 // turn off windowing so everything is dequeue
q.cond.Broadcast()
return
q.once.Do(func() {
q.mu.Lock()
defer q.mu.Unlock()
close(q.done)
// HACK (2023-12) (mh): Set depth to zero so new entries are rejected.
q.depth = 0
})
}

// Len returns the number of uniq IDs in the queue, that is the depth of the
// queue.
func (q *OpWindow) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.q.Len()
}
// Enqueue op into queue, blocking until first of: op is enqueued, ID has hit max width, context is done, or queue is closed.
func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
q.mu.Lock() // locked on returns below

// Enqueue add the op to the queue. If the ID already exists then the Op
// is added to the existing OpSet for this ID, otherwise it's inserted as
// a new OpSet.
//
// Enqueue doesn't block if the queue if full, instead it returns a
// ErrQueueSaturated error.
func (q *OpWindow) Enqueue(id ID, op *Op) error {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.q.Len() >= q.depth {
return ErrQueueSaturatedDepth
}

set, ok := q.entries[id]
if !ok {
// This is a new item, so we need to insert it into the queue.
q.newEntry(id, op)

// Signal one waiting go routine to wake up and Dequeue
// I believe we only need to signal if we enqueue a new item.
// Consider the following possible states the queue could be in :
// 1. if no one is currently waiting in Dequeue, the signal isn't
// needed and all items will be dequeued on the next call to
// Dequeue.
// 2. One or Many go-routines are waiting in Dequeue because it's
// empty, and calling Signal will wake up one. Which will
// dequeue the item and return.
// 3. At most One go-routine is in the act of Dequeueing existing
// items from the queue (i.e. only one can have the lock and be
// in the "if OK" condition within the forloop in Dequeue). In
// which cause the signal is ignored and after returning we
// return to condition (1) above.
// Note signaled waiting go-routines will not be able the acquire
// the condition lock until this method call returns, finishing
// its append of the new operation.
q.cond.Signal()
} else if len(set.Ops()) >= q.width {
return ErrQueueSaturatedWidth
} else {
set.append(op)
}
return nil
}
for {
item, ok := q.m[id]
if ok {
if len(item.OpSet.set) >= q.width {
close(item.IsFull)
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
item.OpSet.append(op)
q.mu.Unlock()
return nil
}

// Dequeue removes the oldest OpSet from the queue and returns it.
// Dequeue will block if the Queue is empty. An Enqueue will wake the
// go routine up and it will continue on.
//
// If the OpWindow is closed, then Dequeue will return false
// for the second parameter.
func (q *OpWindow) Dequeue() (*OpSet, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.q.Len() >= q.depth {
q.mu.Unlock()
select {
case <-ctx.Done():
return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err())
case <-q.done:
return ErrQueueClosed
case <-q.queueHasSpace:
q.mu.Lock()
continue
}
}

for {
if set, ok := q.dequeue(); ok {
return set, true
item = &queueItem{
ID: id,
ProcessAt: time.Now().Add(q.windowedBy),
OpSet: newOpSet(op),
IsFull: make(chan struct{}),
}
q.m[id] = item
q.q.PushBack(item)
q.mu.Unlock()

select {
case <-q.ctx.Done():
return nil, false
case q.queueHasItems <- struct{}{}:
default:
}
// release the lock and wait until signaled. On awake we'll acquire the lock.
// After wait acquires the lock we have to recheck the wait condition,
// because it's possible that someone else
// drained the queue while, we were reacquiring the lock.
q.cond.Wait()
}
}

type queElement struct {
id ID
enqueuedAtUnixN int64
return nil
}
}

func (q *OpWindow) newEntry(id ID, op *Op) {
set := newOpSet(op)
q.entries[id] = set
// Dequeue removes and returns the oldest OpSet whose window has passed from the queue,
// blocking until first of: OpSet is ready, context is canceled, or queue is closed.
func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
q.mu.Lock() // unlocked on returns below

eq := &queElement{id, time.Now().UnixNano()}
q.q.PushBack(eq)
}
var item *queueItem
for item == nil {
elem := q.q.Front()
if elem == nil {
q.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
return nil, ErrQueueClosed
case <-q.queueHasItems:
q.mu.Lock()
continue
}

func (q *OpWindow) itemsReady(tim time.Time) bool {
elem := q.q.Front()
if elem == nil {
return false
}
item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item
}

eq := elem.Value.(*queElement)
qt := tim.UnixNano() - eq.enqueuedAtUnixN

if qt < q.windowedBy {
return false
waitFor := time.Until(item.ProcessAt)
if waitFor > 0 {
q.mu.Unlock() //
// NOTE (2023-12) (mh): Do we need to pool these?
timer := time.NewTimer(waitFor)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
// process right away
case <-item.IsFull:
// process once full, regardless of windowing
case <-timer.C:
}
q.mu.Lock()
}
return true
}

func (q *OpWindow) dequeue() (*OpSet, bool) {
elem := q.q.Front()
if elem == nil {
return nil, false
}
ops := item.OpSet
delete(q.m, item.ID)
q.mu.Unlock()
item = nil // gc

eq := elem.Value.(*queElement)
qt := time.Now().UnixNano() - eq.enqueuedAtUnixN
if qt < q.windowedBy {
return nil, false
select {
case q.queueHasSpace <- struct{}{}:
default:
}
return ops, nil
}

q.q.Remove(elem)
id := eq.id

set, ok := q.entries[id]
if !ok {
panic("invariant broken: we dequeued a value that isn't in the map")
}
delete(q.entries, id)
return set, true
type queueItem struct {
ID ID
ProcessAt time.Time
OpSet *OpSet
IsFull chan struct{}
}
Loading

0 comments on commit a9ffe31

Please sign in to comment.