Skip to content

Commit

Permalink
fixes CRDB watch implementation accumulating past updates
Browse files Browse the repository at this point in the history
when the timestamp to start emitting updates from
is way in the past, CRDB will not emit checkpoints.

As a consequence, every update in the w.r.t to the
moment the changefeed was created will accumulate
in memory and OOM the process with a large enough
backlog.

The proposed solution is to compute one checkpoint
from the real-time stream of updates, and then use
that as the high-watermark for the backlog of
changes from the past. We need to emit all the updates
before we can emit a checkpoint, so downstream callers
would have to handle it accordingly.
  • Loading branch information
vroldanbet committed Oct 24, 2024
1 parent b02eed1 commit 191971b
Showing 1 changed file with 55 additions and 4 deletions.
59 changes: 55 additions & 4 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,34 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev
return updates, errs
}

go cds.watch(ctx, afterRevision, options, updates, errs)
// CRDB does not emit checkpoints for historical, non-real time data. If "afterRevision" is way in the past,
// that would lead to accumulating ALL the changes until the current time before starting to emit them.
// To address that, we compute the current checkpoint, and emit all revisions between "afterRevision" and the
// current checkpoint without waiting for checkpointing from CRDB's side.
var mostRecentCheckpoint datastore.Revision
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

// FIXME we should probably not pass the provided afterRevision and instead get "HeadRevision".
go cds.watch(cancelCtx, afterRevision, datastore.WatchOptions{Content: datastore.WatchCheckpoints}, updates, errs, nil)
select {
case updt := <-updates:
if updt.IsCheckpoint {
mostRecentCheckpoint = updt.Revision
}
case err := <-errs:
fmt.Printf("error: %v", err)
return nil, nil // FIXME
case <-cancelCtx.Done():
return nil, nil // FIXME
}

cancel()

updates = make(chan *datastore.RevisionChanges, watchBufferLength)
errs = make(chan error, 1)

go cds.watch(ctx, afterRevision, options, updates, errs, mostRecentCheckpoint)

return updates, errs
}
Expand All @@ -92,6 +119,7 @@ func (cds *crdbDatastore) watch(
opts datastore.WatchOptions,
updates chan *datastore.RevisionChanges,
errs chan error,
mostRecentCheckpoint datastore.Revision,
) {
defer close(updates)
defer close(errs)
Expand Down Expand Up @@ -195,6 +223,7 @@ func (cds *crdbDatastore) watch(

tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)

var updated revisions.HLCRevision
for changes.Next() {
var tableNameBytes []byte
var changeJSON []byte
Expand Down Expand Up @@ -245,6 +274,28 @@ func (cds *crdbDatastore) watch(
continue
}

if mostRecentCheckpoint != nil && updated != (revisions.HLCRevision{}) && updated.LessThan(mostRecentCheckpoint) {
rev := mostRecentCheckpoint.(revisions.HLCRevision)

filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
if err != nil {
sendError(err)
return
}

for _, revChange := range filtered {
revChange := revChange
if !sendChange(&revChange) {
return
}
}

updated = revisions.HLCRevision{}
// FIXME send checkpoint?

// need to continue
}

// Otherwise, this a notification of a row change.
tableName := string(tableNameBytes)

Expand Down Expand Up @@ -309,19 +360,19 @@ func (cds *crdbDatastore) watch(
OptionalIntegrity: integrity,
}

rev, err := revisions.HLCRevisionFromString(details.Updated)
updated, err = revisions.HLCRevisionFromString(details.Updated)
if err != nil {
sendError(fmt.Errorf("malformed update timestamp: %w", err))
return
}

if details.After == nil {
if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationDelete); err != nil {
if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationDelete); err != nil {
sendError(err)
return
}
} else {
if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationTouch); err != nil {
if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationTouch); err != nil {
sendError(err)
return
}
Expand Down

0 comments on commit 191971b

Please sign in to comment.