Skip to content

Commit

Permalink
fixes CRDB watch implementation accumulating past updates
Browse files Browse the repository at this point in the history
when the timestamp to start emitting updates from
is way in the past, CRDB will not emit checkpoints.

As a consequence, every update in the w.r.t to the
moment the changefeed was created will accumulate
in memory and OOM the process with a large enough
backlog.

The proposed solution is to compute one checkpoint
from the real-time stream of updates, and then use
that as the high-watermark for the backlog of
changes from the past. We need to emit all the updates
before we can emit a checkpoint, so downstream callers
would have to handle it accordingly.
  • Loading branch information
vroldanbet committed Oct 24, 2024
1 parent b02eed1 commit 5735bbb
Showing 1 changed file with 66 additions and 8 deletions.
74 changes: 66 additions & 8 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
log "github.com/authzed/spicedb/internal/logging"
"strconv"
"strings"
"time"
Expand All @@ -24,7 +25,7 @@ import (
)

const (
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0s';"
queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';"
)

Expand Down Expand Up @@ -81,7 +82,34 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev
return updates, errs
}

go cds.watch(ctx, afterRevision, options, updates, errs)
// CRDB does not emit checkpoints for historical, non-real time data. If "afterRevision" is way in the past,
// that would lead to accumulating ALL the changes until the current time before starting to emit them.
// To address that, we compute the current checkpoint, and emit all revisions between "afterRevision" and the
// current checkpoint without waiting for checkpointing from CRDB's side.
var mostRecentCheckpoint datastore.Revision
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

// FIXME we should probably not pass the provided afterRevision and instead get "HeadRevision".
go cds.watch(cancelCtx, afterRevision, datastore.WatchOptions{Content: datastore.WatchCheckpoints}, updates, errs, nil)
select {
case updt := <-updates:
if updt.IsCheckpoint {
mostRecentCheckpoint = updt.Revision
}
case err := <-errs:
fmt.Printf("error: %v", err)
return nil, nil // FIXME
case <-cancelCtx.Done():
return nil, nil // FIXME
}

cancel()

updates = make(chan *datastore.RevisionChanges, watchBufferLength)
errs = make(chan error, 1)

go cds.watch(ctx, afterRevision, options, updates, errs, mostRecentCheckpoint)

return updates, errs
}
Expand All @@ -92,6 +120,7 @@ func (cds *crdbDatastore) watch(
opts datastore.WatchOptions,
updates chan *datastore.RevisionChanges,
errs chan error,
mostRecentCheckpoint datastore.Revision,
) {
defer close(updates)
defer close(errs)
Expand Down Expand Up @@ -195,6 +224,8 @@ func (cds *crdbDatastore) watch(

tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)

var count int
var updated revisions.HLCRevision
for changes.Next() {
var tableNameBytes []byte
var changeJSON []byte
Expand All @@ -215,13 +246,16 @@ func (cds *crdbDatastore) watch(
// Resolved indicates that the specified revision is "complete"; no additional updates can come in before or at it.
// Therefore, at this point, we issue tracked updates from before that time, and the checkpoint update.
if details.Resolved != "" {
rev, err := revisions.HLCRevisionFromString(details.Resolved)
log.Info().Int("count", count).Msg("resolving changes via resolved highwatermark")
count = 0

resolved, err := revisions.HLCRevisionFromString(details.Resolved)
if err != nil {
sendError(fmt.Errorf("malformed resolved timestamp: %w", err))
return
}

filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, resolved)
if err != nil {
sendError(err)
return
Expand All @@ -236,7 +270,7 @@ func (cds *crdbDatastore) watch(

if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if !sendChange(&datastore.RevisionChanges{
Revision: rev,
Revision: resolved,
IsCheckpoint: true,
}) {
return
Expand All @@ -245,6 +279,30 @@ func (cds *crdbDatastore) watch(
continue
}

if mostRecentCheckpoint != nil && updated != (revisions.HLCRevision{}) && updated.LessThan(mostRecentCheckpoint) {
log.Info().Int("count", count).Msg("resolving backlog update")
count = 0
rev := mostRecentCheckpoint.(revisions.HLCRevision)

filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
if err != nil {
sendError(err)
return
}

for _, revChange := range filtered {
revChange := revChange
if !sendChange(&revChange) {
return
}
}

updated = revisions.HLCRevision{}
// FIXME send checkpoint?

// need to continue
}

// Otherwise, this a notification of a row change.
tableName := string(tableNameBytes)

Expand Down Expand Up @@ -309,19 +367,19 @@ func (cds *crdbDatastore) watch(
OptionalIntegrity: integrity,
}

rev, err := revisions.HLCRevisionFromString(details.Updated)
updated, err = revisions.HLCRevisionFromString(details.Updated)
if err != nil {
sendError(fmt.Errorf("malformed update timestamp: %w", err))
return
}

if details.After == nil {
if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationDelete); err != nil {
if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationDelete); err != nil {
sendError(err)
return
}
} else {
if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationTouch); err != nil {
if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationTouch); err != nil {
sendError(err)
return
}
Expand Down

0 comments on commit 5735bbb

Please sign in to comment.