Skip to content

Commit

Permalink
emit memdb checkpoints after changes
Browse files Browse the repository at this point in the history
Clients may consider the revision has moved
forward before relationship / the changes were
emitted.
  • Loading branch information
vroldanbet committed Oct 1, 2024
1 parent bfd80f7 commit 20d5807
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
10 changes: 5 additions & 5 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,18 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti
changes = append(changes, &change.changes)
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
}

if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
changes = append(changes, &datastore.RevisionChanges{
Revision: revisions.NewForTimestamp(change.revisionNanos),
IsCheckpoint: true,
})
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
}

lastRevision = change.revisionNanos
}

Expand Down
18 changes: 17 additions & 1 deletion pkg/datastore/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
require.NoError(err)

changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchCheckpoints | datastore.WatchRelationships,
Content: datastore.WatchCheckpoints | datastore.WatchRelationships | datastore.WatchSchema,
CheckpointInterval: 100 * time.Millisecond,
})
require.Zero(len(errchan))
Expand All @@ -735,6 +735,14 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
tuple.Parse("document:firstdoc#viewer@user:tom"),
)
require.NoError(err)

verifyCheckpointUpdate(require, afterTouchRevision, changes)

afterTouchRevision, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
return rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{Name: "doesnotexist"})
})
require.NoError(err)

verifyCheckpointUpdate(require, afterTouchRevision, changes)
}

Expand All @@ -743,12 +751,20 @@ func verifyCheckpointUpdate(
expectedRevision datastore.Revision,
changes <-chan *datastore.RevisionChanges,
) {
var relChangeEmitted, schemaChangeEmitted bool
changeWait := time.NewTimer(waitForChangesTimeout)
for {
select {
case change, ok := <-changes:
require.True(ok)
if len(change.ChangedDefinitions) > 0 {
schemaChangeEmitted = true
}
if len(change.RelationshipChanges) > 0 {
relChangeEmitted = true
}
if change.IsCheckpoint {
require.True(relChangeEmitted || schemaChangeEmitted, "expected relationship/schema changes before checkpoint")
require.True(change.Revision.Equal(change.Revision) || change.Revision.GreaterThan(expectedRevision))
return
}
Expand Down

0 comments on commit 20d5807

Please sign in to comment.