Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Version Vector Handling for Legacy SDK and Snapshots #1096

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,16 @@ func (id ID) SyncClocks(other ID) ID {
lamport = other.lamport + 1
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(other.versionVector))
// NOTE(chacha912): For changes created by legacy SDK prior to v0.5.2 that lack version
// vectors, document's version vector was not being properly accumlated. To address this,
// we generate a version vector using the lamport timestamp when no version vector exists.
otherVV := other.versionVector
if len(otherVV) == 0 {
otherVV = otherVV.DeepCopy()
otherVV.Set(other.actorID, other.lamport)
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(otherVV))
newID.versionVector.Set(id.actorID, lamport)
return newID
}
Expand All @@ -117,6 +126,15 @@ func (id ID) SetClocks(otherLamport int64, vector time.VersionVector) ID {
lamport = otherLamport + 1
}

// NOTE(chacha912): Documents created by server may have an InitialActorID
// in their version vector. Although server is not an actual client, it
// generates document snapshots from changes by participating with an
// InitialActorID during document instance creation and accumulating stored
// changes in DB.
// Semantically, including a non-client actor in version vector is
// problematic. To address this, we remove the InitialActorID from snapshots.
vector.Unset(time.InitialActorID)

newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(vector))
newID.versionVector.Set(id.actorID, lamport)

Expand Down
2 changes: 1 addition & 1 deletion pkg/document/crdt/rga_tree_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (s *RGATreeSplit[V]) deleteNodes(
if versionVector == nil && maxCreatedAtMapByActor == nil {
// Local edit - use version vector comparison
clientLamportAtChange = time.MaxLamport
} else if versionVector != nil {
} else if len(versionVector) > 0 {
lamport, ok := versionVector.Get(actorID)
if ok {
clientLamportAtChange = lamport
Expand Down
2 changes: 1 addition & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
if hasSnapshot {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
if err := d.doc.applySnapshot(pack.Snapshot, pack.VersionVector); err != nil {
return err
}
} else {
Expand Down
14 changes: 10 additions & 4 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er

// 01. Apply remote changes to both the cloneRoot and the document.
if hasSnapshot {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
if err := d.applySnapshot(pack.Snapshot, pack.VersionVector); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (d *InternalDocument) RootObject() *crdt.Object {
return d.root.Object()
}

func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64, vector time.VersionVector) error {
func (d *InternalDocument) applySnapshot(snapshot []byte, vector time.VersionVector) error {
rootObj, presences, err := converter.BytesToSnapshot(snapshot)
if err != nil {
return err
Expand All @@ -269,8 +269,14 @@ func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64, vecto
d.root = crdt.NewRoot(rootObj)
d.presences = presences

// TODO(hackerwins): We need to check we can use serverSeq as lamport timestamp.
d.changeID = d.changeID.SetClocks(serverSeq, vector)
// NOTE(chacha912): Documents created from snapshots were experiencing edit
// restrictions due to low lamport values.
// Previously, the code attempted to generate document lamport from ServerSeq.
// However, after aligning lamport logic with the original research paper,
// ServerSeq could potentially become smaller than the lamport value.
// To resolve this, we initialize document's lamport by using the highest
// lamport value stored in version vector as the starting point.
d.changeID = d.changeID.SetClocks(vector.MaxLamport(), vector)

return nil
}
Expand Down
93 changes: 93 additions & 0 deletions test/integration/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,4 +1447,97 @@ func TestGarbageCollection(t *testing.T) {
assert.Equal(t, `{"text":[{"val":"x"},{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"x"},{"val":"a"}]}`, d1.Marshal())
})

t.Run("snapshot version vector test", func(t *testing.T) {
clients := activeClients(t, 3)
c1, c2, c3 := clients[0], clients[1], clients[2]
defer deactivateAndCloseClients(t, clients)

ctx := context.Background()

d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
assert.NoError(t, err)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewText("text").Edit(0, 0, "a")
return nil
})
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
assert.NoError(t, err)

d3 := document.New(helper.TestDocKey(t))
err = c3.Attach(ctx, d3)
assert.NoError(t, err)

err = c1.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"a"}]}`, d1.Marshal())
assert.Equal(t, `{"text":[{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"a"}]}`, d3.Marshal())
assert.Equal(t, true, checkVV(d1.VersionVector(), versionOf(d1.ActorID(), 4), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d2.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 4), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 4)))

// 01. Update changes over snapshot threshold.
for i := 0; i <= int(helper.SnapshotThreshold)/2; i++ {
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, strconv.Itoa(i))
return nil
})
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)

err = d2.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, strconv.Itoa(i))
return nil
})
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
}
assert.Equal(t, true, checkVV(d1.VersionVector(), versionOf(d1.ActorID(), 28), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d2.VersionVector(), versionOf(d1.ActorID(), 25), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 4)))

// 02. Makes local changes then pull a snapshot from the server.
err = d3.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, "c")
return nil
})
assert.NoError(t, err)
err = c3.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 25), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 30)))
assert.Equal(t, `{"text":[{"val":"5"},{"val":"5"},{"val":"4"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d3.Marshal())

// 03. Delete text after receiving the snapshot.
err = d3.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(1, 3, "")
return nil
})
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d3.Marshal())

err = c3.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d1.Marshal())
})
}
Loading