Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Dec 21, 2023
1 parent c7cf74a commit 0e0085e
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions opwindow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func TestOpWindow(t *testing.T) {
ctx := context.Background()
completed1 := 0
completed2 := 0
cg1 := NewCallGroup(func(finalState map[ID]*Response) {
cg1 := NewCallGroup(func(map[ID]*Response) {
completed1++
})

cg2 := NewCallGroup(func(finalState map[ID]*Response) {
cg2 := NewCallGroup(func(map[ID]*Response) {
completed2++
})

Expand Down Expand Up @@ -79,8 +79,8 @@ func TestOpWindowClose(t *testing.T) {

winTime := 100 * time.Hour // we want everything to hang until we close the queue.

cg1 := NewCallGroup(func(finalState map[ID]*Response) {})
cg2 := NewCallGroup(func(finalState map[ID]*Response) {})
cg1 := NewCallGroup(func(map[ID]*Response) {})
cg2 := NewCallGroup(func(map[ID]*Response) {})

now := time.Now()

Expand Down Expand Up @@ -123,4 +123,58 @@ func TestOpWindowClose(t *testing.T) {
time.Sleep(1000 * time.Millisecond)
assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes))
assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued

err = window.Enqueue(ctx, op1_1.Key, op1_1)
require.ErrorIs(t, err, ErrQueueClosed)
}

func TestOpWindowErrQueueSaturatedWidth(t *testing.T) {
t.Parallel()
cg := NewCallGroup(func(map[ID]*Response) {})
now := time.Now()

op1 := cg.Add(1, &tsMsg{123, now})
op2 := cg.Add(1, &tsMsg{123, now})

window := NewOpWindow(2, 1, time.Millisecond)
ctx := context.Background()
err := window.Enqueue(ctx, op1.Key, op1)
require.NoError(t, err)

err = window.Enqueue(ctx, op2.Key, op2)
require.ErrorIs(t, err, ErrQueueSaturatedWidth)

_, err = window.Dequeue(ctx)
require.NoError(t, err)

err = window.Enqueue(ctx, op2.Key, op2)
require.NoError(t, err)
}

func TestOpWindowErrQueueSaturatedDepth(t *testing.T) {
t.Parallel()
cg := NewCallGroup(func(map[ID]*Response) {})
now := time.Now()
op1 := cg.Add(1, &tsMsg{123, now})
op2 := cg.Add(2, &tsMsg{234, now})

window := NewOpWindow(1, 1, time.Millisecond)
err := window.Enqueue(context.Background(), op1.Key, op1)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) // let it run for a sec for coverage ¯\_(ツ)_/¯
defer cancel()
go func() {
<-ctx.Done()
// pretend we dequeued but were full again
window.fullCond.Signal()
}()
err = window.Enqueue(ctx, op2.Key, op2)
require.ErrorIs(t, err, ErrQueueSaturatedDepth)

_, err = window.Dequeue(context.Background())
require.NoError(t, err)

err = window.Enqueue(context.Background(), op2.Key, op2)
require.NoError(t, err)
}

0 comments on commit 0e0085e

Please sign in to comment.