From 37e3612f6e3488d6fee75a8b8f33158d9d090132 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 23 Oct 2023 16:36:49 -0700 Subject: [PATCH] CBG-3557 [3.1.2 backport] Implement nextSequenceGreaterThan to reduce incr and unused seq traffic (#6549) Backports CBG-3516 to 3.1.2. --- db/change_cache.go | 32 ++++++-- db/channel_cache.go | 8 +- db/crud.go | 26 +++--- db/crud_test.go | 50 ++++++++++++ db/sequence_allocator.go | 128 +++++++++++++++++++++++++++--- db/sequence_allocator_test.go | 144 ++++++++++++++++++++++++++++++++++ 6 files changed, 354 insertions(+), 34 deletions(-) diff --git a/db/change_cache.go b/db/change_cache.go index bddde8ffd5..3a5e291397 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -575,12 +575,37 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64 } else { changedChannels.Add(unusedSeq) } - c.channelCache.AddSkippedSequence(change) + c.channelCache.AddUnusedSequence(change) if c.notifyChange != nil && len(changedChannels) > 0 { c.notifyChange(ctx, changedChannels) } } +// releaseUnusedSequenceRange calls processEntry for each sequence in the range, but only issues a single notify. +func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequence uint64, toSequence uint64, timeReceived time.Time) { + + base.InfofCtx(ctx, base.KeyCache, "Received #%d-#%d (unused sequence range)", fromSequence, toSequence) + + unusedSeq := channels.NewID(unusedSeqKey, unusedSeqCollectionID) + allChangedChannels := channels.SetOfNoValidate(unusedSeq) + for sequence := fromSequence; sequence <= toSequence; sequence++ { + change := &LogEntry{ + Sequence: sequence, + TimeReceived: timeReceived, + } + + // Since processEntry may unblock pending sequences, if there were any changed channels we need + // to notify any change listeners that are working changes feeds for these channels + changedChannels := c.processEntry(ctx, change) + allChangedChannels = allChangedChannels.Update(changedChannels) + c.channelCache.AddUnusedSequence(change) + } + + if c.notifyChange != nil { + c.notifyChange(ctx, allChangedChannels) + } +} + // Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID string) { // _sync:unusedSequences:fromSeq:toSeq @@ -601,10 +626,7 @@ func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID stri return } - // TODO: There should be a more efficient way to do this - for seq := fromSequence; seq <= toSequence; seq++ { - c.releaseUnusedSequence(ctx, seq, time.Now()) - } + c.releaseUnusedSequenceRange(ctx, fromSequence, toSequence, time.Now()) } func (c *changeCache) processPrincipalDoc(ctx context.Context, docID string, docJSON []byte, isUser bool, timeReceived time.Time) { diff --git a/db/channel_cache.go b/db/channel_cache.go index edf202819c..8dc64f252c 100644 --- a/db/channel_cache.go +++ b/db/channel_cache.go @@ -44,8 +44,8 @@ type ChannelCache interface { // Notifies the cache of a principal update. Updates the cache's high sequence AddPrincipal(change *LogEntry) - // Notifies the cache of a skipped sequence update. Updates the cache's high sequence - AddSkippedSequence(change *LogEntry) + // Notifies the cache of an unused sequence update. Updates the cache's high sequence + AddUnusedSequence(change *LogEntry) // Remove purges the given doc IDs from all channel caches and returns the number of items removed. Remove(ctx context.Context, collectionID uint32, docIDs []string, startTime time.Time) (count int) @@ -191,8 +191,8 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) { c.updateHighCacheSequence(change.Sequence) } -// AddSkipedSequence notifies the cache of a skipped sequence update. Updates the cache's high sequence -func (c *channelCacheImpl) AddSkippedSequence(change *LogEntry) { +// Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence +func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) { c.updateHighCacheSequence(change.Sequence) } diff --git a/db/crud.go b/db/crud.go index 03750ab1c4..1714fbaa72 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1546,22 +1546,22 @@ func (db *DatabaseContext) assignSequence(ctx context.Context, docSequence uint6 unusedSequences = append(unusedSequences, docSequence) } - for { - var err error - if docSequence, err = db.sequences.nextSequence(ctx); err != nil { - return unusedSequences, err - } + var err error + if docSequence, err = db.sequences.nextSequence(ctx); err != nil { + return unusedSequences, err + } - if docSequence > doc.Sequence { - break - } else { - if err := db.sequences.releaseSequence(ctx, docSequence); err != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, err) - } + // If the assigned sequence is less than or equal to the previous sequence on the document, release + // the assigned sequence and acquire one using nextSequenceGreaterThan + if docSequence <= doc.Sequence { + if err = db.sequences.releaseSequence(ctx, docSequence); err != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, err) + } + docSequence, err = db.sequences.nextSequenceGreaterThan(ctx, doc.Sequence) + if err != nil { + return unusedSequences, err } } - // Could add a db.Sequences.nextSequenceGreaterThan(doc.Sequence) to push the work down into the sequence allocator - // - sequence allocator is responsible for releasing unused sequences, could optimize to do that in bulk if needed } doc.Sequence = docSequence diff --git a/db/crud_test.go b/db/crud_test.go index f3647eb69e..e36ac686ed 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -15,6 +15,7 @@ import ( "encoding/json" "log" "testing" + "time" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" @@ -1629,3 +1630,52 @@ func TestPutStampClusterUUID(t *testing.T) { require.NoError(t, err) require.Equal(t, 32, len(xattr["cluster_uuid"])) } + +// TestAssignSequenceReleaseLoop repros conditions seen in CBG-3516 (where each sequence between nextSequence and docSequence has an unusedSeq doc) +func TestAssignSequenceReleaseLoop(t *testing.T) { + + if base.UnitTestUrlIsWalrus() { + t.Skip("This test won't work under walrus") + } + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD, base.KeyDCP) + + // import disabled + db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{}) + defer db.Close(ctx) + + // positive sequence gap (other cluster's sequencing is higher) + const otherClusterSequenceOffset = 10 + + startReleasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() + + collection := GetSingleDatabaseCollectionWithUser(t, db) + rev, doc, err := collection.Put(ctx, "doc1", Body{"foo": "bar"}) + require.NoError(t, err) + t.Logf("doc sequence: %d", doc.Sequence) + + // but we can fiddle with the sequence in the metadata of the doc write to simulate a doc from a different cluster (with a higher sequence) + var newSyncData map[string]interface{} + sd, err := json.Marshal(doc.SyncData) + require.NoError(t, err) + err = json.Unmarshal(sd, &newSyncData) + require.NoError(t, err) + newSyncData["sequence"] = doc.SyncData.Sequence + otherClusterSequenceOffset + + subdocXattrStore, ok := base.AsSubdocXattrStore(collection.dataStore) + require.True(t, ok) + _, err = subdocXattrStore.SubdocUpdateXattr(doc.ID, base.SyncXattrName, 0, doc.Cas, newSyncData) + require.NoError(t, err) + + _, doc, err = collection.Put(ctx, "doc1", Body{"foo": "buzz", BodyRev: rev}) + require.NoError(t, err) + require.Greaterf(t, doc.Sequence, uint64(otherClusterSequenceOffset), "Expected new doc sequence %d to be greater than other cluster's sequence %d", doc.Sequence, otherClusterSequenceOffset) + + // wait for the doc to be received + err = db.changeCache.waitForSequence(ctx, doc.Sequence, time.Second*30) + require.NoError(t, err) + + expectedReleasedSequenceCount := otherClusterSequenceOffset + releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount + assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount) +} diff --git a/db/sequence_allocator.go b/db/sequence_allocator.go index 84c50d363d..4ef253cfdc 100644 --- a/db/sequence_allocator.go +++ b/db/sequence_allocator.go @@ -166,17 +166,11 @@ func (s *sequenceAllocator) lastSequence(ctx context.Context) (uint64, error) { // If no previously reserved sequences are available, reserves new batch. func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64, err error) { s.mutex.Lock() - sequencesReserved := false - if s.last >= s.max { - if err := s._reserveSequenceRange(ctx); err != nil { - s.mutex.Unlock() - return 0, err - } - sequencesReserved = true - } - s.last++ - sequence = s.last + sequence, sequencesReserved, err := s._nextSequence(ctx) s.mutex.Unlock() + if err != nil { + return 0, err + } // If sequences were reserved, send notification to the release sequence monitor, to start the clock for releasing these sequences. // Must be done after mutex is released. @@ -188,8 +182,109 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64, return sequence, nil } -// Reserve a new sequence range. Called by nextSequence when the previously allocated sequences have all been used. -func (s *sequenceAllocator) _reserveSequenceRange(ctx context.Context) error { +// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize +// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing +// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation +func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existingSequence uint64) (sequence uint64, err error) { + + targetSequence := existingSequence + 1 + s.mutex.Lock() + // If the target sequence is less than or equal to one we've already allocated, can assign the sequence in the standard way + if targetSequence <= s.last { + sequence, sequencesReserved, err := s._nextSequence(ctx) + s.mutex.Unlock() + if err != nil { + return 0, err + } + if sequencesReserved { + s.reserveNotify <- struct{}{} + } + s.dbStats.SequenceAssignedCount.Add(1) + return sequence, nil + } + + // If the target sequence is in our existing batch (between s.last and s.max), we want to release all unused sequences in the batch earlier + // than targetSequence, and then assign as targetSequence + if targetSequence <= s.max { + releaseFrom := s.last + 1 + s.last = targetSequence + s.mutex.Unlock() + if releaseFrom < targetSequence { + if err := s.releaseSequenceRange(ctx, releaseFrom, targetSequence-1); err != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] from existing batch. Will be handled by skipped sequence handling. Error:%v", releaseFrom, targetSequence-1, err) + } + } + s.dbStats.SequenceAssignedCount.Add(1) + return targetSequence, nil + + } + + // If the target sequence is greater than the highest in our batch (s.max), we want to: + // (a) Reserve n sequences past _sync:seq, where n = existingSequence - s.max. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and + // updated _sync:seq since we last updated s.max.), then + // (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way. + // (c) Release any previously allocated sequences (s.last to s.max) + // (d) Release the reserved sequences from part (a) + // We can perform (a) and (b) as a single increment operation, but (c) and (d) aren't necessarily contiguous blocks and must be released + // separately + + prevAllocReleaseFrom := s.last + 1 + prevAllocReleaseTo := s.max + + numberToRelease := existingSequence - s.max + numberToAllocate := s.sequenceBatchSize + allocatedToSeq, err := s.incrementSequence(numberToRelease + numberToAllocate) + if err != nil { + base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err) + s.mutex.Unlock() + return 0, err + } + + s.max = allocatedToSeq + s.last = allocatedToSeq - numberToAllocate + 1 + sequence = s.last + s.mutex.Unlock() + + // Perform standard batch handling and stats updates + s.lastSequenceReserveTime = time.Now() + s.reserveNotify <- struct{}{} + s.dbStats.SequenceReservedCount.Add(int64(numberToRelease + numberToAllocate)) + s.dbStats.SequenceAssignedCount.Add(1) + + // Release previously allocated sequences (c), if any + err = s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo) + if err != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err) + } + + // Release the newly allocated sequences that were used to catch up to existingSequence (d) + if numberToRelease > 0 { + releaseTo := allocatedToSeq - numberToAllocate + releaseFrom := releaseTo - numberToRelease + 1 // +1, as releaseSequenceRange is inclusive + err = s.releaseSequenceRange(ctx, releaseFrom, releaseTo) + if err != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] to reach target sequence. Will be handled by skipped sequence handling. Error:%v", releaseFrom, releaseTo, err) + } + } + + return sequence, err +} + +// _nextSequence reserves if needed, and then returns the next sequence +func (s *sequenceAllocator) _nextSequence(ctx context.Context) (sequence uint64, sequencesReserved bool, err error) { + if s.last >= s.max { + if err := s._reserveSequenceBatch(ctx); err != nil { + return 0, false, err + } + sequencesReserved = true + } + s.last++ + sequence = s.last + return sequence, sequencesReserved, nil +} + +// Reserve a new sequence range, based on batch size. Called by nextSequence when the previously allocated sequences have all been used. +func (s *sequenceAllocator) _reserveSequenceBatch(ctx context.Context) error { // If the time elapsed since the last reserveSequenceRange invocation reserve is shorter than our target frequency, // this indicates we're making an incr call more frequently than we want to. Triggers an increase in batch size to @@ -232,6 +327,10 @@ func (s *sequenceAllocator) incrementSequence(numToReserve uint64) (max uint64, return value, err } +type seqRange struct { + low, high uint64 +} + // ReleaseSequence writes an unused sequence document, used to notify sequence buffering that a sequence has been allocated and not used. // Sequence is stored as the document body to avoid null doc issues. func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64) error { @@ -251,6 +350,11 @@ func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64 // fromSeq and toSeq are inclusive (i.e. both fromSeq and toSeq are unused). // From and to seq are stored as the document contents to avoid null doc issues. func (s *sequenceAllocator) releaseSequenceRange(ctx context.Context, fromSequence, toSequence uint64) error { + + // Exit if there's nothing to release + if toSequence == 0 || toSequence < fromSequence { + return nil + } key := s.metaKeys.UnusedSeqRangeKey(fromSequence, toSequence) body := make([]byte, 16) binary.LittleEndian.PutUint64(body[:8], fromSequence) diff --git a/db/sequence_allocator_test.go b/db/sequence_allocator_test.go index c8bcd48ed1..47289f554b 100644 --- a/db/sequence_allocator_test.go +++ b/db/sequence_allocator_test.go @@ -234,3 +234,147 @@ func assertNewAllocatorStats(t *testing.T, stats *base.DatabaseStats, incr, rese assert.Equal(t, assigned, stats.SequenceAssignedCount.Value()) assert.Equal(t, released, stats.SequenceReleasedCount.Value()) } + +func TestNextSequenceGreaterThanSingleNode(t *testing.T) { + + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + + sgw, err := base.NewSyncGatewayStats() + require.NoError(t, err) + dbstats, err := sgw.NewDBStats("", false, false, false, nil, nil) + require.NoError(t, err) + testStats := dbstats.Database() + + // Create a sequence allocator without using constructor, to test without a releaseSequenceMonitor + // Set sequenceBatchSize=10 to test variations of batching + a := &sequenceAllocator{ + datastore: bucket.GetSingleDataStore(), + dbStats: testStats, + sequenceBatchSize: 10, // set initial batch size to 10 to support all test cases + reserveNotify: make(chan struct{}, 50), // Buffered to allow multiple allocations without releaseSequenceMonitor + metaKeys: base.DefaultMetadataKeys, + } + + initSequence, err := a.lastSequence(ctx) + assert.Equal(t, uint64(0), initSequence) + assert.NoError(t, err, "error retrieving last sequence") + + // nextSequenceGreaterThan(0) should perform initial batch allocation of size 10, and not release any sequences + nextSequence, err := a.nextSequenceGreaterThan(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(1), nextSequence) + assertNewAllocatorStats(t, testStats, 1, 10, 1, 0) // incr, reserved, assigned, released counts + + // Calling the same again should use from the existing batch + nextSequence, err = a.nextSequenceGreaterThan(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(2), nextSequence) + assertNewAllocatorStats(t, testStats, 1, 10, 2, 0) + + // Test case where greaterThan == s.Last + 1 + nextSequence, err = a.nextSequenceGreaterThan(ctx, 2) + assert.NoError(t, err) + assert.Equal(t, uint64(3), nextSequence) + assertNewAllocatorStats(t, testStats, 1, 10, 3, 0) + + // When requested nextSequenceGreaterThan is > s.Last + 1, we should release previously allocated sequences but + // don't require a new incr + nextSequence, err = a.nextSequenceGreaterThan(ctx, 5) + assert.NoError(t, err) + assert.Equal(t, uint64(6), nextSequence) + assertNewAllocatorStats(t, testStats, 1, 10, 4, 2) + + // Test when requested nextSequenceGreaterThan == s.Max; should release previously allocated sequences and allocate a new batch + nextSequence, err = a.nextSequenceGreaterThan(ctx, 10) + assert.NoError(t, err) + assert.Equal(t, uint64(11), nextSequence) + assertNewAllocatorStats(t, testStats, 2, 20, 5, 6) + + // Test when requested nextSequenceGreaterThan = s.Max + 1; should release previously allocated sequences AND max+1 + nextSequence, err = a.nextSequenceGreaterThan(ctx, 21) + assert.NoError(t, err) + assert.Equal(t, uint64(22), nextSequence) + assertNewAllocatorStats(t, testStats, 3, 31, 6, 16) + + // Test when requested nextSequenceGreaterThan > s.Max + batch size; should release 9 previously allocated sequences (23-31) + // and 19 in the gap to the requested sequence (32-50) + nextSequence, err = a.nextSequenceGreaterThan(ctx, 50) + assert.NoError(t, err) + assert.Equal(t, uint64(51), nextSequence) + assertNewAllocatorStats(t, testStats, 4, 60, 7, 44) + +} + +func TestNextSequenceGreaterThanMultiNode(t *testing.T) { + + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + + // Create two sequence allocators without using constructor, to test without a releaseSequenceMonitor + // Set sequenceBatchSize=10 to test variations of batching + stats, err := base.NewSyncGatewayStats() + require.NoError(t, err) + statsA, err := stats.NewDBStats("A", false, false, false, nil, nil) + require.NoError(t, err) + statsB, err := stats.NewDBStats("B", false, false, false, nil, nil) + require.NoError(t, err) + dbStatsA := statsA.DatabaseStats + dbStatsB := statsB.DatabaseStats + + require.NoError(t, err) + a := &sequenceAllocator{ + datastore: bucket.GetSingleDataStore(), + dbStats: dbStatsA, + sequenceBatchSize: 10, // set initial batch size to 10 to support all test cases + reserveNotify: make(chan struct{}, 50), // Buffered to allow multiple allocations without releaseSequenceMonitor + metaKeys: base.DefaultMetadataKeys, + } + + b := &sequenceAllocator{ + datastore: bucket.GetSingleDataStore(), + dbStats: dbStatsB, + sequenceBatchSize: 10, // set initial batch size to 10 to support all test cases + reserveNotify: make(chan struct{}, 50), // Buffered to allow multiple allocations without releaseSequenceMonitor + metaKeys: base.DefaultMetadataKeys, + } + + initSequence, err := a.lastSequence(ctx) + assert.Equal(t, uint64(0), initSequence) + assert.NoError(t, err, "error retrieving last sequence") + + initSequence, err = b.lastSequence(ctx) + assert.Equal(t, uint64(0), initSequence) + assert.NoError(t, err, "error retrieving last sequence") + + // nextSequenceGreaterThan(0) on A should perform initial batch allocation of size 10 (allocs 1-10), and not release any sequences + nextSequence, err := a.nextSequenceGreaterThan(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(1), nextSequence) + assertNewAllocatorStats(t, dbStatsA, 1, 10, 1, 0) // incr, reserved, assigned, released counts + + // nextSequenceGreaterThan(0) on B should perform initial batch allocation of size 10 (allocs 11-20), and not release any sequences + nextSequence, err = b.nextSequenceGreaterThan(ctx, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(11), nextSequence) + assertNewAllocatorStats(t, dbStatsB, 1, 10, 1, 0) + + // calling nextSequenceGreaterThan(15) on B will assign from the existing batch, and release 12-15 + nextSequence, err = b.nextSequenceGreaterThan(ctx, 15) + assert.NoError(t, err) + assert.Equal(t, uint64(16), nextSequence) + assertNewAllocatorStats(t, dbStatsB, 1, 10, 2, 4) + + // calling nextSequenceGreaterThan(15) on A will increment _sync:seq by 5 on it's previously allocated sequence (10). + // Since node B has already updated _sync:seq to 20, will result in: + // node A releasing sequences 2-10 from it's existing buffer + // node A allocating and releasing sequences 21-24 + // node A adding sequences 25-35 to its buffer, and assigning 25 to the current request + nextSequence, err = a.nextSequenceGreaterThan(ctx, 15) + assert.NoError(t, err) + assert.Equal(t, uint64(26), nextSequence) + assertNewAllocatorStats(t, dbStatsA, 2, 25, 2, 14) + +}