Skip to content

Commit

Permalink
(3.1.1 backport) CBG-3130 notify on request plus unused sequence docs (
Browse files Browse the repository at this point in the history
…#6326) (#6329)

* CBG-3130 notify on request plus skipped sequence docs
  • Loading branch information
torcolvin authored Jun 30, 2023
1 parent 0acdc95 commit 709bf6c
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 11 deletions.
9 changes: 9 additions & 0 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
DefaultCachePendingSeqMaxWait = 5 * time.Second // Max time we'll wait for a pending sequence before sending to missed queue
DefaultSkippedSeqMaxWait = 60 * time.Minute // Max time we'll wait for an entry in the missing before purging
QueryTombstoneBatch = 250 // Max number of tombstones checked per query during Compact
unusedSeqKey = "_unusedSeqKey" // Key used by ChangeWaiter to mark unused sequences
unusedSeqCollectionID = 0 // Collection ID used by ChangeWaiter to mark unused sequences
)

// Enable keeping a channel-log for the "*" channel (channel.UserStarChannel). The only time this channel is needed is if
Expand Down Expand Up @@ -549,6 +551,13 @@ func (c *changeCache) releaseUnusedSequence(sequence uint64, timeReceived time.T
// Since processEntry may unblock pending sequences, if there were any changed channels we need
// to notify any change listeners that are working changes feeds for these channels
changedChannels := c.processEntry(change)
unusedSeq := channels.NewID(unusedSeqKey, unusedSeqCollectionID)
if changedChannels == nil {
changedChannels = channels.SetOfNoValidate(unusedSeq)
} else {
changedChannels.Add(unusedSeq)
}
c.channelCache.AddSkippedSequence(change)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(changedChannels)
}
Expand Down
2 changes: 1 addition & 1 deletion db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestChannelCacheBufferingWithUserDoc(t *testing.T) {
// Start wait for doc in ABC
chans := channels.SetOfNoValidate(
channels.NewID("ABC", collectionID))
waiter := db.mutationListener.NewWaiterWithChannels(chans, nil)
waiter := db.mutationListener.NewWaiterWithChannels(chans, nil, false)

successChan := make(chan bool)
go func() {
Expand Down
19 changes: 14 additions & 5 deletions db/change_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,22 @@ type ChangeWaiter struct {
lastCounter uint64
lastTerminateCheckCounter uint64
lastUserCount uint64
trackUnusedSequences bool // track unused sequences in Wait functions
}

// Creates a new ChangeWaiter that will wait for changes for the given document keys.
func (listener *changeListener) NewWaiter(keys []string) *ChangeWaiter {
// NewWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
func (listener *changeListener) NewWaiter(keys []string, trackUnusedSequences bool) *ChangeWaiter {
return &ChangeWaiter{
listener: listener,
keys: keys,
lastCounter: listener.CurrentCount(keys),
lastTerminateCheckCounter: listener.terminateCheckCounter,
trackUnusedSequences: trackUnusedSequences,
}
}

func (listener *changeListener) NewWaiterWithChannels(chans channels.Set, user auth.User) *ChangeWaiter {
// NewWaiterWithChannels creates ChangeWaiter for a given channel and user, and will optionally track unused sequences.
func (listener *changeListener) NewWaiterWithChannels(chans channels.Set, user auth.User, trackUnusedSequences bool) *ChangeWaiter {
waitKeys := make([]string, 0, 5)
for channel := range chans {
waitKeys = append(waitKeys, channel.String())
Expand All @@ -366,7 +369,8 @@ func (listener *changeListener) NewWaiterWithChannels(chans channels.Set, user a
}
waitKeys = append(waitKeys, userKeys...)
}
waiter := listener.NewWaiter(waitKeys)
waiter := listener.NewWaiter(waitKeys, trackUnusedSequences)

waiter.userKeys = userKeys
if userKeys != nil {
waiter.lastUserCount = listener.CurrentCount(userKeys)
Expand Down Expand Up @@ -418,6 +422,9 @@ func (waiter *ChangeWaiter) UpdateChannels(collectionID uint32, timedSet channel
for channelName, _ := range timedSet {
updatedKeys = append(updatedKeys, channels.NewID(channelName, collectionID).String())
}
if waiter.trackUnusedSequences {
updatedKeys = append(updatedKeys, channels.NewID(unusedSeqKey, unusedSeqCollectionID).String())
}
if len(waiter.userKeys) > 0 {
updatedKeys = append(updatedKeys, waiter.userKeys...)
}
Expand Down Expand Up @@ -445,6 +452,8 @@ func (waiter *ChangeWaiter) RefreshUserKeys(user auth.User, metaKeys *base.Metad
}
}

// NewUserWaiter creates a change waiter with all keys for the matching user.
func (db *Database) NewUserWaiter() *ChangeWaiter {
return db.mutationListener.NewWaiterWithChannels(channels.Set{}, db.User())
trackUnusedSequences := false
return db.mutationListener.NewWaiterWithChannels(channels.Set{}, db.User(), trackUnusedSequences)
}
9 changes: 4 additions & 5 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ func (db *DatabaseCollectionWithUser) MultiChangesFeed(ctx context.Context, chan

}

func (db *DatabaseCollectionWithUser) startChangeWaiter() *ChangeWaiter {
return db.mutationListener().NewWaiterWithChannels(channels.Set{}, db.user)
func (db *DatabaseCollectionWithUser) startChangeWaiter(trackUnusedSequences bool) *ChangeWaiter {
return db.mutationListener().NewWaiterWithChannels(channels.Set{}, db.user, trackUnusedSequences)
}

func (db *DatabaseCollectionWithUser) appendUserFeed(feeds []<-chan *ChangeEntry, options ChangesOptions) []<-chan *ChangeEntry {
Expand Down Expand Up @@ -644,7 +644,8 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex

// If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter
if options.Wait || options.RequestPlusSeq > currentCachedSequence {
changeWaiter = col.startChangeWaiter() // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
trackUnusedSequences := options.RequestPlusSeq > 0
changeWaiter = col.startChangeWaiter(trackUnusedSequences) // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
userCounter = changeWaiter.CurrentUserCount()
// Reload user to pick up user changes that happened between auth and the change waiter
// initialization. Without this, notification for user doc changes in that window (a) won't be
Expand Down Expand Up @@ -1038,7 +1039,6 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex
}
if userChanged && col.user != nil {
newChannelsSince, _ := col.user.FilterToAvailableCollectionChannels(col.ScopeName, col.Name, chans)

changedChannels = newChannelsSince.CompareKeys(channelsSince)

if len(changedChannels) > 0 {
Expand All @@ -1059,7 +1059,6 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex

}
}()

return output, nil
}

Expand Down
8 changes: 8 additions & 0 deletions db/channel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type ChannelCache interface {
// Notifies the cache of a principal update. Updates the cache's high sequence
AddPrincipal(change *LogEntry)

// Notifies the cache of a skipped sequence update. Updates the cache's high sequence
AddSkippedSequence(change *LogEntry)

// Remove purges the given doc IDs from all channel caches and returns the number of items removed.
Remove(collectionID uint32, docIDs []string, startTime time.Time) (count int)

Expand Down Expand Up @@ -186,6 +189,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) {
c.updateHighCacheSequence(change.Sequence)
}

// AddSkipedSequence notifies the cache of a skipped sequence update. Updates the cache's high sequence
func (c *channelCacheImpl) AddSkippedSequence(change *LogEntry) {
c.updateHighCacheSequence(change.Sequence)
}

// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
// flag indicates whether it was a change arriving out of sequence
func (c *channelCacheImpl) AddToCache(change *LogEntry) (updatedChannels []channels.ID) {
Expand Down
66 changes: 66 additions & 0 deletions rest/changes_request_plus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package rest

import (
"encoding/json"
"fmt"
"net/http"
"testing"

"github.com/couchbase/sync_gateway/channels"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/require"
)

// TestRequirePlusSkippedSequence makes sure that a final skipped sequence in a request_plus request will not hang the request
func TestRequestPlusSkippedSequence(t *testing.T) {

defer db.SuspendSequenceBatching()()
restTesterConfig := RestTesterConfig{SyncFn: channels.DocChannelsSyncFunction}

// JWT claim based grants do not support named collections
rt := NewRestTester(t, &restTesterConfig)
defer rt.Close()

const (
username = "alice"
channel = "foo"
)
rt.CreateUser(username, []string{channel})

// add a single document for the user
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", fmt.Sprintf(`{"channels":"%s"}`, channel))
RequireStatus(t, resp, http.StatusCreated)
docSeq := rt.GetDocumentSequence("doc1")

require.NoError(t, rt.WaitForPendingChanges())

// add an unused sequence
unusedSeq, err := db.AllocateTestSequence(rt.GetDatabase())
require.NoError(t, err)

caughtUpCount := rt.GetDatabase().DbStats.CBLReplicationPull().NumPullReplCaughtUp.Value()

requestFinished := make(chan struct{})
// make sure this request doesn't hang
go func() {
resp = rt.SendUserRequest(http.MethodGet, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d&request_plus=true", docSeq), "", username)
RequireStatus(t, resp, http.StatusOK)
close(requestFinished)
}()
require.NoError(t, rt.GetDatabase().WaitForCaughtUp(caughtUpCount+1))
// the request should finish once the sequence is released
err = db.ReleaseTestSequence(rt.GetDatabase(), unusedSeq)
require.NoError(t, err)
<-requestFinished
var changesResp ChangesResults
require.NoError(t, json.Unmarshal(resp.BodyBytes(), &changesResp))
require.Len(t, changesResp.Results, 0)
}

0 comments on commit 709bf6c

Please sign in to comment.