Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Oct 16, 2024
1 parent c9b782e commit 92e53fb
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 40 deletions.
5 changes: 2 additions & 3 deletions internal/datastore/common/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package common

import (
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/tuple"
)

// NewSliceRelationshipIterator creates a datastore.TupleIterator instance from a materialized slice of tuples.
func NewSliceRelationshipIterator(rels []tuple.Relationship, order options.SortOrder) datastore.RelationshipIterator {
// NewSliceRelationshipIterator creates a datastore.RelationshipIterator instance from a materialized slice of tuples.
func NewSliceRelationshipIterator(rels []tuple.Relationship) datastore.RelationshipIterator {
return func(yield func(tuple.Relationship, error) bool) {
for _, rel := range rels {
if !yield(rel, nil) {
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ func RelationshipIntegrityInfoTest(t *testing.T, tester test.DatastoreTester) {
require.NoError(err)

slice, err := datastore.IteratorToSlice(iter)
require.NoError(err)

rel := slice[0]

require.NotNil(rel.OptionalIntegrity)
Expand Down Expand Up @@ -529,6 +531,8 @@ func BulkRelationshipIntegrityInfoTest(t *testing.T, tester test.DatastoreTester
require.NoError(err)

slice, err := datastore.IteratorToSlice(iter)
require.NoError(err)

rel := slice[0]

require.NotNil(rel.OptionalIntegrity)
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/memdb/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func newSubjectSortedIterator(it memdb.ResultIterator, limit *uint64) (datastore
results = results[0:*limit]
}

return common.NewSliceRelationshipIterator(results, options.BySubject), nil
return common.NewSliceRelationshipIterator(results), nil
}

func noopCursorFilter(_ *relationship) bool {
Expand Down
10 changes: 6 additions & 4 deletions internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,13 @@ func (rwt *memdbReadWriteTx) BulkLoad(ctx context.Context, iter datastore.BulkWr
var numCopied uint64
var next *tuple.Relationship
var err error

updates := []tuple.RelationshipUpdate{{
Operation: tuple.UpdateOperationCreate,
}}

for next, err = iter.Next(ctx); next != nil && err == nil; next, err = iter.Next(ctx) {
updates := []tuple.RelationshipUpdate{{
Operation: tuple.UpdateOperationCreate,
Relationship: *next,
}}
updates[0].Relationship = *next
if err := rwt.WriteRelationships(ctx, updates); err != nil {
return 0, err
}
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -438,6 +439,12 @@ func newMySQLExecutor(tx querier) common.ExecuteQueryFunc {

span.AddEvent("Query issued to database")

relCount := 0

defer func() {
span.AddEvent("Relationships loaded", trace.WithAttributes(attribute.Int("relCount", relCount)))
}()

for rows.Next() {
var resourceObjectType string
var resourceObjectID string
Expand Down Expand Up @@ -468,6 +475,7 @@ func newMySQLExecutor(tx querier) common.ExecuteQueryFunc {
return
}

relCount++
if !yield(tuple.Relationship{
RelationshipReference: tuple.RelationshipReference{
Resource: tuple.ObjectAndRelation{
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ func RevisionInversionTest(t *testing.T, ds datastore.Datastore) {
<-waitToStart

commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
rtu := tuple.Touch(tuple.MustParse("resource:789#reader@user:456"))
rtu := tuple.Touch(tuple.MustParse("resource:789#reader@user:ten"))
return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{rtu})
})
close(waitToFinish)
Expand Down
7 changes: 3 additions & 4 deletions internal/datastore/proxy/hedging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/authzed/spicedb/internal/datastore/proxy/proxy_test"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)
Expand All @@ -32,7 +31,7 @@ var (
revisionKnown = revisions.NewForTransactionID(1)
anotherRevisionKnown = revisions.NewForTransactionID(2)

emptyIterator = common.NewSliceRelationshipIterator(nil, options.Unsorted)
emptyIterator = common.NewSliceRelationshipIterator(nil)
)

type testFunc func(t *testing.T, proxy datastore.Datastore, expectFirst bool)
Expand Down Expand Up @@ -299,12 +298,12 @@ func TestDatastoreE2E(t *testing.T) {

delegateReader.
On("QueryRelationships", mock.Anything, mock.Anything).
Return(common.NewSliceRelationshipIterator(expectedRels, options.Unsorted), nil).
Return(common.NewSliceRelationshipIterator(expectedRels), nil).
WaitUntil(mockTime.After(2 * slowQueryTime)).
Once()
delegateReader.
On("QueryRelationships", mock.Anything, mock.Anything).
Return(common.NewSliceRelationshipIterator(expectedRels, options.Unsorted), nil).
Return(common.NewSliceRelationshipIterator(expectedRels), nil).
Once()

autoAdvance(mockTime, slowQueryTime/2, 2*slowQueryTime)
Expand Down
16 changes: 5 additions & 11 deletions internal/services/v1/preconditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/tuple"
)

var limitOne uint64 = 1
Expand All @@ -31,23 +30,18 @@ func checkPreconditions(
return fmt.Errorf("error reading relationships: %w", err)
}

var first *tuple.Relationship
for rel, err := range iter {
if err != nil {
return fmt.Errorf("error reading relationships from iterator: %w", err)
}

first = &rel
break
_, ok, err := datastore.FirstRelationshipIn(iter)
if err != nil {
return fmt.Errorf("error reading relationships from iterator: %w", err)
}

switch precond.Operation {
case v1.Precondition_OPERATION_MUST_NOT_MATCH:
if first != nil {
if ok {
return NewPreconditionFailedErr(precond)
}
case v1.Precondition_OPERATION_MUST_MATCH:
if first == nil {
if !ok {
return NewPreconditionFailedErr(precond)
}
default:
Expand Down
23 changes: 11 additions & 12 deletions internal/services/v1/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS
select {
case update, ok := <-updates:
if ok {
updates, err := tuple.UpdatesToV1RelationshipUpdates(update.RelationshipChanges)
if err != nil {
return status.Errorf(codes.Internal, "failed to convert updates: %s", err)
}

filtered := filterUpdates(objectTypes, filters, updates)
filtered := filterUpdates(objectTypes, filters, update.RelationshipChanges)
if len(filtered) > 0 {
converted, err := tuple.UpdatesToV1RelationshipUpdates(filtered)
if err != nil {
return status.Errorf(codes.Internal, "failed to convert updates: %s", err)
}

if err := stream.Send(&v1.WatchResponse{
Updates: filtered,
Updates: converted,
ChangesThrough: zedtoken.MustNewFromRevision(update.Revision),
OptionalTransactionMetadata: update.Metadata,
}); err != nil {
Expand All @@ -125,14 +125,14 @@ func (ws *watchServer) rewriteError(ctx context.Context, err error) error {
return shared.RewriteError(ctx, err, &shared.ConfigForErrors{})
}

func filterUpdates(objectTypes *mapz.Set[string], filters []datastore.RelationshipsFilter, updates []*v1.RelationshipUpdate) []*v1.RelationshipUpdate {
func filterUpdates(objectTypes *mapz.Set[string], filters []datastore.RelationshipsFilter, updates []tuple.RelationshipUpdate) []tuple.RelationshipUpdate {
if objectTypes.IsEmpty() && len(filters) == 0 {
return updates
}

filtered := make([]*v1.RelationshipUpdate, 0, len(updates))
filtered := make([]tuple.RelationshipUpdate, 0, len(updates))
for _, update := range updates {
objectType := update.GetRelationship().GetResource().GetObjectType()
objectType := update.Relationship.Resource.ObjectType
if !objectTypes.IsEmpty() && !objectTypes.Has(objectType) {
continue
}
Expand All @@ -141,8 +141,7 @@ func filterUpdates(objectTypes *mapz.Set[string], filters []datastore.Relationsh
// If there are filters, we need to check if the update matches any of them.
matched := false
for _, filter := range filters {
// TODO(jschorr): Maybe we should add TestRelationship to avoid the conversion?
if filter.Test(tuple.FromV1Relationship(update.GetRelationship())) {
if filter.Test(update.Relationship) {
matched = true
break
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,19 @@ func IteratorToSlice(iter RelationshipIterator) ([]tuple.Relationship, error) {
return results, nil
}

// FirstRelationshipIn returns the first relationship found via the iterator, if any.
func FirstRelationshipIn(iter RelationshipIterator) (tuple.Relationship, bool, error) {
for rel, err := range iter {
if err != nil {
return tuple.Relationship{}, false, err
}

return rel, true, nil
}

return tuple.Relationship{}, false, nil
}

// Revision is an interface for a comparable revision type that can be different for
// each datastore implementation.
type Revision interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/pagination/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func generateMock(t *testing.T, rels []tuple.Relationship, pageSize int, order o
pageSize64, err := safecast.ToUint64(pageSize)
require.NoError(t, err)

iter := common.NewSliceRelationshipIterator(rels[i:pastLastIndex], order)
iter := common.NewSliceRelationshipIterator(rels[i:pastLastIndex])
mock.On("QueryRelationships", last, order, pageSize64).Return(iter, nil)
if relsLen > 0 {
l := rels[pastLastIndex-1]
Expand Down
6 changes: 3 additions & 3 deletions pkg/proto/core/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
// ToCoreRelationTuple converts the input to a core RelationTuple.
func ToCoreRelationTuple(tuple *v0.RelationTuple) *RelationTuple {
return &RelationTuple{
ResourceAndRelation: ToCoreONRStringToCore(tuple.ObjectAndRelation),
Subject: ToCoreONRStringToCore(tuple.User.GetUserset()),
ResourceAndRelation: ToCoreObjectAndRelation(tuple.ObjectAndRelation),
Subject: ToCoreObjectAndRelation(tuple.User.GetUserset()),
}
}

Expand All @@ -22,7 +22,7 @@ func ToCoreRelationTuples(tuples []*v0.RelationTuple) []*RelationTuple {
}

// ToCoreObjectAndRelation converts the input to a core ToCoreObjectAndRelation.
func ToCoreONRStringToCore(onr *v0.ObjectAndRelation) *ObjectAndRelation {
func ToCoreObjectAndRelation(onr *v0.ObjectAndRelation) *ObjectAndRelation {
return &ObjectAndRelation{
Namespace: onr.Namespace,
ObjectId: onr.ObjectId,
Expand Down

0 comments on commit 92e53fb

Please sign in to comment.