From 191971b16659414b70a43cd1f51e9621ad9a82d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 24 Oct 2024 19:40:33 +0100 Subject: [PATCH] fixes CRDB watch implementation accumulating past updates when the timestamp to start emitting updates from is way in the past, CRDB will not emit checkpoints. As a consequence, every update in the w.r.t to the moment the changefeed was created will accumulate in memory and OOM the process with a large enough backlog. The proposed solution is to compute one checkpoint from the real-time stream of updates, and then use that as the high-watermark for the backlog of changes from the past. We need to emit all the updates before we can emit a checkpoint, so downstream callers would have to handle it accordingly. --- internal/datastore/crdb/watch.go | 59 +++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 883d14978a..92ee1d76c0 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -81,7 +81,34 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev return updates, errs } - go cds.watch(ctx, afterRevision, options, updates, errs) + // CRDB does not emit checkpoints for historical, non-real time data. If "afterRevision" is way in the past, + // that would lead to accumulating ALL the changes until the current time before starting to emit them. + // To address that, we compute the current checkpoint, and emit all revisions between "afterRevision" and the + // current checkpoint without waiting for checkpointing from CRDB's side. + var mostRecentCheckpoint datastore.Revision + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // FIXME we should probably not pass the provided afterRevision and instead get "HeadRevision". + go cds.watch(cancelCtx, afterRevision, datastore.WatchOptions{Content: datastore.WatchCheckpoints}, updates, errs, nil) + select { + case updt := <-updates: + if updt.IsCheckpoint { + mostRecentCheckpoint = updt.Revision + } + case err := <-errs: + fmt.Printf("error: %v", err) + return nil, nil // FIXME + case <-cancelCtx.Done(): + return nil, nil // FIXME + } + + cancel() + + updates = make(chan *datastore.RevisionChanges, watchBufferLength) + errs = make(chan error, 1) + + go cds.watch(ctx, afterRevision, options, updates, errs, mostRecentCheckpoint) return updates, errs } @@ -92,6 +119,7 @@ func (cds *crdbDatastore) watch( opts datastore.WatchOptions, updates chan *datastore.RevisionChanges, errs chan error, + mostRecentCheckpoint datastore.Revision, ) { defer close(updates) defer close(errs) @@ -195,6 +223,7 @@ func (cds *crdbDatastore) watch( tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) + var updated revisions.HLCRevision for changes.Next() { var tableNameBytes []byte var changeJSON []byte @@ -245,6 +274,28 @@ func (cds *crdbDatastore) watch( continue } + if mostRecentCheckpoint != nil && updated != (revisions.HLCRevision{}) && updated.LessThan(mostRecentCheckpoint) { + rev := mostRecentCheckpoint.(revisions.HLCRevision) + + filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev) + if err != nil { + sendError(err) + return + } + + for _, revChange := range filtered { + revChange := revChange + if !sendChange(&revChange) { + return + } + } + + updated = revisions.HLCRevision{} + // FIXME send checkpoint? + + // need to continue + } + // Otherwise, this a notification of a row change. tableName := string(tableNameBytes) @@ -309,19 +360,19 @@ func (cds *crdbDatastore) watch( OptionalIntegrity: integrity, } - rev, err := revisions.HLCRevisionFromString(details.Updated) + updated, err = revisions.HLCRevisionFromString(details.Updated) if err != nil { sendError(fmt.Errorf("malformed update timestamp: %w", err)) return } if details.After == nil { - if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationDelete); err != nil { + if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationDelete); err != nil { sendError(err) return } } else { - if err := tracked.AddRelationshipChange(ctx, rev, relationship, tuple.UpdateOperationTouch); err != nil { + if err := tracked.AddRelationshipChange(ctx, updated, relationship, tuple.UpdateOperationTouch); err != nil { sendError(err) return }