Skip to content

Commit

Permalink
CBG-3557 [3.1.2 backport] Implement nextSequenceGreaterThan to reduce…
Browse files Browse the repository at this point in the history
… incr and unused seq traffic (#6549)

Backports CBG-3516 to 3.1.2.
  • Loading branch information
adamcfraser authored Oct 23, 2023
1 parent f63068e commit 37e3612
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 34 deletions.
32 changes: 27 additions & 5 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,37 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
} else {
changedChannels.Add(unusedSeq)
}
c.channelCache.AddSkippedSequence(change)
c.channelCache.AddUnusedSequence(change)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
}
}

// releaseUnusedSequenceRange calls processEntry for each sequence in the range, but only issues a single notify.
func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequence uint64, toSequence uint64, timeReceived time.Time) {

base.InfofCtx(ctx, base.KeyCache, "Received #%d-#%d (unused sequence range)", fromSequence, toSequence)

unusedSeq := channels.NewID(unusedSeqKey, unusedSeqCollectionID)
allChangedChannels := channels.SetOfNoValidate(unusedSeq)
for sequence := fromSequence; sequence <= toSequence; sequence++ {
change := &LogEntry{
Sequence: sequence,
TimeReceived: timeReceived,
}

// Since processEntry may unblock pending sequences, if there were any changed channels we need
// to notify any change listeners that are working changes feeds for these channels
changedChannels := c.processEntry(ctx, change)
allChangedChannels = allChangedChannels.Update(changedChannels)
c.channelCache.AddUnusedSequence(change)
}

if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
}
}

// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID string) {
// _sync:unusedSequences:fromSeq:toSeq
Expand All @@ -601,10 +626,7 @@ func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID stri
return
}

// TODO: There should be a more efficient way to do this
for seq := fromSequence; seq <= toSequence; seq++ {
c.releaseUnusedSequence(ctx, seq, time.Now())
}
c.releaseUnusedSequenceRange(ctx, fromSequence, toSequence, time.Now())
}

func (c *changeCache) processPrincipalDoc(ctx context.Context, docID string, docJSON []byte, isUser bool, timeReceived time.Time) {
Expand Down
8 changes: 4 additions & 4 deletions db/channel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ChannelCache interface {
// Notifies the cache of a principal update. Updates the cache's high sequence
AddPrincipal(change *LogEntry)

// Notifies the cache of a skipped sequence update. Updates the cache's high sequence
AddSkippedSequence(change *LogEntry)
// Notifies the cache of an unused sequence update. Updates the cache's high sequence
AddUnusedSequence(change *LogEntry)

// Remove purges the given doc IDs from all channel caches and returns the number of items removed.
Remove(ctx context.Context, collectionID uint32, docIDs []string, startTime time.Time) (count int)
Expand Down Expand Up @@ -191,8 +191,8 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) {
c.updateHighCacheSequence(change.Sequence)
}

// AddSkipedSequence notifies the cache of a skipped sequence update. Updates the cache's high sequence
func (c *channelCacheImpl) AddSkippedSequence(change *LogEntry) {
// Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence
func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {
c.updateHighCacheSequence(change.Sequence)
}

Expand Down
26 changes: 13 additions & 13 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,22 +1546,22 @@ func (db *DatabaseContext) assignSequence(ctx context.Context, docSequence uint6
unusedSequences = append(unusedSequences, docSequence)
}

for {
var err error
if docSequence, err = db.sequences.nextSequence(ctx); err != nil {
return unusedSequences, err
}
var err error
if docSequence, err = db.sequences.nextSequence(ctx); err != nil {
return unusedSequences, err
}

if docSequence > doc.Sequence {
break
} else {
if err := db.sequences.releaseSequence(ctx, docSequence); err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, err)
}
// If the assigned sequence is less than or equal to the previous sequence on the document, release
// the assigned sequence and acquire one using nextSequenceGreaterThan
if docSequence <= doc.Sequence {
if err = db.sequences.releaseSequence(ctx, docSequence); err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, err)
}
docSequence, err = db.sequences.nextSequenceGreaterThan(ctx, doc.Sequence)
if err != nil {
return unusedSequences, err
}
}
// Could add a db.Sequences.nextSequenceGreaterThan(doc.Sequence) to push the work down into the sequence allocator
// - sequence allocator is responsible for releasing unused sequences, could optimize to do that in bulk if needed
}

doc.Sequence = docSequence
Expand Down
50 changes: 50 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"log"
"testing"
"time"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
Expand Down Expand Up @@ -1629,3 +1630,52 @@ func TestPutStampClusterUUID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 32, len(xattr["cluster_uuid"]))
}

// TestAssignSequenceReleaseLoop repros conditions seen in CBG-3516 (where each sequence between nextSequence and docSequence has an unusedSeq doc)
func TestAssignSequenceReleaseLoop(t *testing.T) {

if base.UnitTestUrlIsWalrus() {
t.Skip("This test won't work under walrus")
}

base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD, base.KeyDCP)

// import disabled
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{})
defer db.Close(ctx)

// positive sequence gap (other cluster's sequencing is higher)
const otherClusterSequenceOffset = 10

startReleasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value()

collection := GetSingleDatabaseCollectionWithUser(t, db)
rev, doc, err := collection.Put(ctx, "doc1", Body{"foo": "bar"})
require.NoError(t, err)
t.Logf("doc sequence: %d", doc.Sequence)

// but we can fiddle with the sequence in the metadata of the doc write to simulate a doc from a different cluster (with a higher sequence)
var newSyncData map[string]interface{}
sd, err := json.Marshal(doc.SyncData)
require.NoError(t, err)
err = json.Unmarshal(sd, &newSyncData)
require.NoError(t, err)
newSyncData["sequence"] = doc.SyncData.Sequence + otherClusterSequenceOffset

subdocXattrStore, ok := base.AsSubdocXattrStore(collection.dataStore)
require.True(t, ok)
_, err = subdocXattrStore.SubdocUpdateXattr(doc.ID, base.SyncXattrName, 0, doc.Cas, newSyncData)
require.NoError(t, err)

_, doc, err = collection.Put(ctx, "doc1", Body{"foo": "buzz", BodyRev: rev})
require.NoError(t, err)
require.Greaterf(t, doc.Sequence, uint64(otherClusterSequenceOffset), "Expected new doc sequence %d to be greater than other cluster's sequence %d", doc.Sequence, otherClusterSequenceOffset)

// wait for the doc to be received
err = db.changeCache.waitForSequence(ctx, doc.Sequence, time.Second*30)
require.NoError(t, err)

expectedReleasedSequenceCount := otherClusterSequenceOffset
releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount
assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount)
}
128 changes: 116 additions & 12 deletions db/sequence_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,11 @@ func (s *sequenceAllocator) lastSequence(ctx context.Context) (uint64, error) {
// If no previously reserved sequences are available, reserves new batch.
func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64, err error) {
s.mutex.Lock()
sequencesReserved := false
if s.last >= s.max {
if err := s._reserveSequenceRange(ctx); err != nil {
s.mutex.Unlock()
return 0, err
}
sequencesReserved = true
}
s.last++
sequence = s.last
sequence, sequencesReserved, err := s._nextSequence(ctx)
s.mutex.Unlock()
if err != nil {
return 0, err
}

// If sequences were reserved, send notification to the release sequence monitor, to start the clock for releasing these sequences.
// Must be done after mutex is released.
Expand All @@ -188,8 +182,109 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64,
return sequence, nil
}

// Reserve a new sequence range. Called by nextSequence when the previously allocated sequences have all been used.
func (s *sequenceAllocator) _reserveSequenceRange(ctx context.Context) error {
// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize
// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing
// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation
func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existingSequence uint64) (sequence uint64, err error) {

targetSequence := existingSequence + 1
s.mutex.Lock()
// If the target sequence is less than or equal to one we've already allocated, can assign the sequence in the standard way
if targetSequence <= s.last {
sequence, sequencesReserved, err := s._nextSequence(ctx)
s.mutex.Unlock()
if err != nil {
return 0, err
}
if sequencesReserved {
s.reserveNotify <- struct{}{}
}
s.dbStats.SequenceAssignedCount.Add(1)
return sequence, nil
}

// If the target sequence is in our existing batch (between s.last and s.max), we want to release all unused sequences in the batch earlier
// than targetSequence, and then assign as targetSequence
if targetSequence <= s.max {
releaseFrom := s.last + 1
s.last = targetSequence
s.mutex.Unlock()
if releaseFrom < targetSequence {
if err := s.releaseSequenceRange(ctx, releaseFrom, targetSequence-1); err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] from existing batch. Will be handled by skipped sequence handling. Error:%v", releaseFrom, targetSequence-1, err)
}
}
s.dbStats.SequenceAssignedCount.Add(1)
return targetSequence, nil

}

// If the target sequence is greater than the highest in our batch (s.max), we want to:
// (a) Reserve n sequences past _sync:seq, where n = existingSequence - s.max. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
// updated _sync:seq since we last updated s.max.), then
// (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way.
// (c) Release any previously allocated sequences (s.last to s.max)
// (d) Release the reserved sequences from part (a)
// We can perform (a) and (b) as a single increment operation, but (c) and (d) aren't necessarily contiguous blocks and must be released
// separately

prevAllocReleaseFrom := s.last + 1
prevAllocReleaseTo := s.max

numberToRelease := existingSequence - s.max
numberToAllocate := s.sequenceBatchSize
allocatedToSeq, err := s.incrementSequence(numberToRelease + numberToAllocate)
if err != nil {
base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)
s.mutex.Unlock()
return 0, err
}

s.max = allocatedToSeq
s.last = allocatedToSeq - numberToAllocate + 1
sequence = s.last
s.mutex.Unlock()

// Perform standard batch handling and stats updates
s.lastSequenceReserveTime = time.Now()
s.reserveNotify <- struct{}{}
s.dbStats.SequenceReservedCount.Add(int64(numberToRelease + numberToAllocate))
s.dbStats.SequenceAssignedCount.Add(1)

// Release previously allocated sequences (c), if any
err = s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err)
}

// Release the newly allocated sequences that were used to catch up to existingSequence (d)
if numberToRelease > 0 {
releaseTo := allocatedToSeq - numberToAllocate
releaseFrom := releaseTo - numberToRelease + 1 // +1, as releaseSequenceRange is inclusive
err = s.releaseSequenceRange(ctx, releaseFrom, releaseTo)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] to reach target sequence. Will be handled by skipped sequence handling. Error:%v", releaseFrom, releaseTo, err)
}
}

return sequence, err
}

// _nextSequence reserves if needed, and then returns the next sequence
func (s *sequenceAllocator) _nextSequence(ctx context.Context) (sequence uint64, sequencesReserved bool, err error) {
if s.last >= s.max {
if err := s._reserveSequenceBatch(ctx); err != nil {
return 0, false, err
}
sequencesReserved = true
}
s.last++
sequence = s.last
return sequence, sequencesReserved, nil
}

// Reserve a new sequence range, based on batch size. Called by nextSequence when the previously allocated sequences have all been used.
func (s *sequenceAllocator) _reserveSequenceBatch(ctx context.Context) error {

// If the time elapsed since the last reserveSequenceRange invocation reserve is shorter than our target frequency,
// this indicates we're making an incr call more frequently than we want to. Triggers an increase in batch size to
Expand Down Expand Up @@ -232,6 +327,10 @@ func (s *sequenceAllocator) incrementSequence(numToReserve uint64) (max uint64,
return value, err
}

type seqRange struct {
low, high uint64
}

// ReleaseSequence writes an unused sequence document, used to notify sequence buffering that a sequence has been allocated and not used.
// Sequence is stored as the document body to avoid null doc issues.
func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64) error {
Expand All @@ -251,6 +350,11 @@ func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64
// fromSeq and toSeq are inclusive (i.e. both fromSeq and toSeq are unused).
// From and to seq are stored as the document contents to avoid null doc issues.
func (s *sequenceAllocator) releaseSequenceRange(ctx context.Context, fromSequence, toSequence uint64) error {

// Exit if there's nothing to release
if toSequence == 0 || toSequence < fromSequence {
return nil
}
key := s.metaKeys.UnusedSeqRangeKey(fromSequence, toSequence)
body := make([]byte, 16)
binary.LittleEndian.PutUint64(body[:8], fromSequence)
Expand Down
Loading

0 comments on commit 37e3612

Please sign in to comment.