Skip to content

Commit

Permalink
Merge pull request #2120 from authzed/crdb-emission-strategy
Browse files Browse the repository at this point in the history
introduce emission strategy into CockroachDB Watch API
  • Loading branch information
vroldanbet authored Nov 7, 2024
2 parents 554777d + 6c4ceb2 commit ef6e183
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 20 deletions.
9 changes: 9 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}, nil
}

Expand All @@ -509,6 +512,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}, nil
}

Expand All @@ -532,6 +538,9 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}
if cds.supportsIntegrity {
features.IntegrityData.Status = datastore.FeatureSupported
Expand Down
165 changes: 145 additions & 20 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -72,15 +74,23 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev

features, err := cds.Features(ctx)
if err != nil {
close(updates)
errs <- err
return updates, errs
}

if features.Watch.Status != datastore.FeatureSupported {
close(updates)
errs <- datastore.NewWatchDisabledErr(fmt.Sprintf("%s. See https://spicedb.dev/d/enable-watch-api-crdb", features.Watch.Reason))
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy && !(options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints) {
close(updates)
errs <- errors.New("EmitImmediatelyStrategy requires WatchCheckpoints to be set")
return updates, errs
}

go cds.watch(ctx, afterRevision, options, updates, errs)

return updates, errs
Expand Down Expand Up @@ -161,10 +171,10 @@ func (cds *crdbDatastore) watch(
watchBufferWriteTimeout = cds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change *datastore.RevisionChanges) error {
select {
case updates <- change:
return true
return nil

default:
// If we cannot immediately write, setup the timer and try again.
Expand All @@ -175,11 +185,10 @@ func (cds *crdbDatastore) watch(

select {
case updates <- change:
return true
return nil

case <-timer.C:
errs <- datastore.NewWatchDisconnectedErr()
return false
return datastore.NewWatchDisconnectedErr()
}
}

Expand All @@ -193,7 +202,130 @@ func (cds *crdbDatastore) watch(
// no return value so we're not really losing anything.
defer func() { go changes.Close() }()

tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)
cds.processChanges(ctx, changes, sendError, sendChange, opts, opts.EmissionStrategy == datastore.EmitImmediatelyStrategy)
}

// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted
type changeTracker[R datastore.Revision, K comparable] interface {
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error)
AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error
AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error
AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error
AddDeletedCaveat(ctx context.Context, rev R, caveatName string) error
SetRevisionMetadata(ctx context.Context, rev R, metadata map[string]any) error
}

// streamingChangeProvider is a changeTracker that streams changes as they are processed. Instead of accumulating
// changes in memory before a checkpoint is reached, it leaves the responsibility of accumulating, deduplicating,
// normalizing changes, and waiting for a checkpoints to the caller.
//
// It's used when WatchOptions.EmissionStrategy is set to EmitImmediatelyStrategy.
type streamingChangeProvider struct {
content datastore.WatchContent
sendChange sendChangeFunc
sendError sendErrorFunc
}

func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) {
// we do not accumulate in this implementation, but stream right away
return nil, nil
}

func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error {
if s.content&datastore.WatchRelationships != datastore.WatchRelationships {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
}
switch op {
case tuple.UpdateOperationCreate:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Create(rel))
case tuple.UpdateOperationTouch:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Touch(rel))
case tuple.UpdateOperationDelete:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Delete(rel))
default:
return spiceerrors.MustBugf("unknown change operation")
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
ChangedDefinitions: []datastore.SchemaDefinition{def},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
DeletedNamespaces: []string{namespaceName},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
DeletedCaveats: []string{caveatName},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error {
if len(metadata) > 0 {
parsedMetadata, err := structpb.NewStruct(metadata)
if err != nil {
return spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
}

changes := datastore.RevisionChanges{
Revision: rev,
Metadata: parsedMetadata,
}

return s.sendChange(&changes)
}

return nil
}

type (
sendChangeFunc func(change *datastore.RevisionChanges) error
sendErrorFunc func(err error)
)

func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, sendError sendErrorFunc, sendChange sendChangeFunc, opts datastore.WatchOptions, streaming bool) {
var tracked changeTracker[revisions.HLCRevision, revisions.HLCRevision]
if streaming {
tracked = &streamingChangeProvider{
sendChange: sendChange,
sendError: sendError,
content: opts.Content,
}
} else {
tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)
}

for changes.Next() {
var tableNameBytes []byte
Expand Down Expand Up @@ -229,19 +361,22 @@ func (cds *crdbDatastore) watch(

for _, revChange := range filtered {
revChange := revChange
if !sendChange(&revChange) {
if err := sendChange(&revChange); err != nil {
sendError(err)
return
}
}

if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if !sendChange(&datastore.RevisionChanges{
if err := sendChange(&datastore.RevisionChanges{
Revision: rev,
IsCheckpoint: true,
}) {
}); err != nil {
sendError(err)
return
}
}

continue
}

Expand Down Expand Up @@ -427,17 +562,7 @@ func (cds *crdbDatastore) watch(
}

if changes.Err() != nil {
if errors.Is(ctx.Err(), context.Canceled) {
closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer closeCancel()
if err := conn.Close(closeCtx); err != nil {
errs <- err
return
}
errs <- datastore.NewWatchCanceledErr()
} else {
errs <- changes.Err()
}
sendError(changes.Err())
return
}
}
3 changes: 3 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ func (mdb *memdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MemDB")
return updates, errs
}

watchBufferWriteTimeout := options.WatchBufferWriteTimeout
if watchBufferWriteTimeout == 0 {
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ func (mds *Datastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
close(updates)
errs <- errors.New("schema watch unsupported in MySQL")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MySQL")
return updates, errs
}

afterRevision, ok := afterRevisionRaw.(revisions.TransactionIDRevision)
if !ok {
errs <- datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision)
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ func (pgd *pgDatastore) Watch(
errs := make(chan error, 1)

if !pgd.watchEnabled {
close(updates)
errs <- datastore.NewWatchDisabledErr("postgres must be run with track_commit_timestamp=on for watch to be enabled. See https://spicedb.dev/d/enable-watch-api-postgres")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in Postgres")
return updates, errs
}

afterRevision := afterRevisionRaw.(postgresRevision)
watchSleep := options.CheckpointInterval
if watchSleep < minimumWatchSleep {
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/spanner/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.R
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in Spanner")
return updates, errs
}

go sd.watch(ctx, afterRevision, opts, updates, errs)

return updates, errs
Expand Down
Loading

0 comments on commit ef6e183

Please sign in to comment.