Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert presence change from string to binary #1069

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api/converter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func TestConverter(t *testing.T) {
})

t.Run("empty presence converting test", func(t *testing.T) {
change, err := innerpresence.NewChangeFromJSON(`{"ChangeType":"put","Presence":{}}`)
assert.NoError(t, err)
change := &innerpresence.PresenceChange{
ChangeType: innerpresence.Put,
Presence: innerpresence.NewPresence(),
}

pbChange := converter.ToPresenceChange(change)
clone := converter.FromPresenceChange(pbChange)
Expand Down
2 changes: 2 additions & 0 deletions cmd/yorkie/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/yorkie-team/yorkie/cmd/yorkie/config"
v053 "github.com/yorkie-team/yorkie/migrations/v0.5.3"
v056 "github.com/yorkie-team/yorkie/migrations/v0.5.6"
yorkiemongo "github.com/yorkie-team/yorkie/server/backend/database/mongo"
)

Expand All @@ -43,6 +44,7 @@ var (
// migrationMap is a map of migration functions for each version.
var migrationMap = map[string]func(ctx context.Context, db *mongo.Client, dbName string, batchSize int) error{
"v0.5.3": v053.RunMigration,
"v0.5.6": v056.RunMigration,
}

// runMigration runs the migration for the given version.
Expand Down
36 changes: 36 additions & 0 deletions migrations/v0.5.6/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package v056 provides migration for v0.5.6
package v056

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/mongo"
)

// RunMigration runs migrations for v0.5.6
func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
if err := MigratePresenceChange(ctx, db, databaseName, batchSize); err != nil {
return err
}

fmt.Println("v0.5.6 migration completed")

return nil
}
chacha912 marked this conversation as resolved.
Show resolved Hide resolved
166 changes: 166 additions & 0 deletions migrations/v0.5.6/migrate-presence-change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v056

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

// validatePresenceChangeMigration validates if all string presence changes are properly migrated
func validatePresenceChangeMigration(ctx context.Context, db *mongo.Client, databaseName string) error {
collection := db.Database(databaseName).Collection("changes")

cursor, err := collection.Find(ctx, bson.M{
"presence_change": bson.M{
"$type": "string",
},
})
if err != nil {
return err
}

for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("decode document: %w", err)
}

if presenceChange, ok := doc["presence_change"]; ok {
if _, isString := presenceChange.(string); isString {
return fmt.Errorf("found presence change still stored as string")
}
}
}

return nil
}

// processMigrationBatchPresence processes a batch of presence change migrations
func processMigrationBatchPresence(
ctx context.Context,
collection *mongo.Collection,
docs []bson.M,
) error {
var operations []mongo.WriteModel

for _, doc := range docs {
if presenceChange, ok := doc["presence_change"]; ok {
if presenceChangeStr, isString := presenceChange.(string); isString {
var operation *mongo.UpdateOneModel

if presenceChangeStr == "" {
operation = mongo.NewUpdateOneModel().SetFilter(bson.M{
"_id": doc["_id"],
}).SetUpdate(bson.M{
"$set": bson.M{
"presence_change": nil,
},
})
} else {
operation = mongo.NewUpdateOneModel().SetFilter(bson.M{
"_id": doc["_id"],
}).SetUpdate(bson.M{
"$set": bson.M{
"presence_change": []byte(presenceChangeStr),
},
})
}

operations = append(operations, operation)
}
}
}

if len(operations) > 0 {
_, err := collection.BulkWrite(ctx, operations)
if err != nil {
return fmt.Errorf("execute bulk write: %w", err)
}
}

return nil
}

// MigratePresenceChange migrates presence changes from string to byte array format
func MigratePresenceChange(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
collection := db.Database(databaseName).Collection("changes")
filter := bson.M{
"presence_change": bson.M{
"$type": "string",
},
}

totalCount, err := collection.CountDocuments(ctx, filter)
if err != nil {
return err
}
if totalCount == 0 {
fmt.Println("No data found to migrate")
return nil
}

batchCount := 1
prevPercentage := 0
cursor, err := collection.Find(ctx, filter)
if err != nil {
return err
}

var docs []bson.M

for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("decode document: %w", err)
}

docs = append(docs, doc)

if len(docs) >= batchSize {
if err := processMigrationBatchPresence(ctx, collection, docs); err != nil {
return err
}

percentage := int(float64(batchSize*batchCount) / float64(totalCount) * 100)

if percentage != prevPercentage {
fmt.Printf("%s.changes presence change migration %d%% completed \n", databaseName, percentage)
prevPercentage = percentage
}
chacha912 marked this conversation as resolved.
Show resolved Hide resolved

docs = docs[:0]
batchCount++
}
}

if len(docs) > 0 {
if err := processMigrationBatchPresence(ctx, collection, docs); err != nil {
return fmt.Errorf("process final batch: %w", err)
}
}

if err := validatePresenceChangeMigration(ctx, db, databaseName); err != nil {
return err
}

fmt.Printf("%s.changes presence change migration completed: %d converted \n", databaseName, totalCount)
return nil
}
27 changes: 5 additions & 22 deletions pkg/document/innerpresence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package innerpresence

import (
"encoding/json"
"fmt"
"sync"
)

Expand Down Expand Up @@ -100,22 +98,8 @@ const (

// PresenceChange represents the change of presence.
type PresenceChange struct {
ChangeType PresenceChangeType `json:"changeType"`
Presence Presence `json:"presence"`
}

// NewChangeFromJSON creates a new instance of PresenceChange from JSON.
func NewChangeFromJSON(encodedChange string) (*PresenceChange, error) {
if encodedChange == "" {
return nil, nil
}

p := &PresenceChange{}
if err := json.Unmarshal([]byte(encodedChange), p); err != nil {
return nil, fmt.Errorf("unmarshal presence change: %w", err)
}

return p, nil
ChangeType PresenceChangeType
Presence Presence
}

// Presence represents custom presence that can be defined by the client.
Expand All @@ -132,17 +116,16 @@ func (p Presence) Set(key string, value string) {
}

// Clear clears the presence.
func (p Presence) Clear() {
for k := range p {
delete(p, k)
}
func (p *Presence) Clear() {
*p = make(map[string]string)
}

// DeepCopy copies itself deeply.
func (p Presence) DeepCopy() Presence {
if p == nil {
return nil
}

clone := make(map[string]string)
for k, v := range p {
clone[k] = v
Expand Down
72 changes: 40 additions & 32 deletions server/backend/database/change_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package database

import (
"encoding/json"
"errors"
"fmt"

Expand All @@ -40,17 +39,17 @@ var ErrDecodeOperationFailed = errors.New("decode operations failed")

// ChangeInfo is a structure representing information of a change.
type ChangeInfo struct {
ID types.ID `bson:"_id"`
ProjectID types.ID `bson:"project_id"`
DocID types.ID `bson:"doc_id"`
ServerSeq int64 `bson:"server_seq"`
ClientSeq uint32 `bson:"client_seq"`
Lamport int64 `bson:"lamport"`
ActorID types.ID `bson:"actor_id"`
VersionVector time.VersionVector `bson:"version_vector"`
Message string `bson:"message"`
Operations [][]byte `bson:"operations"`
PresenceChange string `bson:"presence_change"`
ID types.ID `bson:"_id"`
ProjectID types.ID `bson:"project_id"`
DocID types.ID `bson:"doc_id"`
ServerSeq int64 `bson:"server_seq"`
ClientSeq uint32 `bson:"client_seq"`
Lamport int64 `bson:"lamport"`
ActorID types.ID `bson:"actor_id"`
VersionVector time.VersionVector `bson:"version_vector"`
Message string `bson:"message"`
Operations [][]byte `bson:"operations"`
PresenceChange *innerpresence.PresenceChange `bson:"presence_change"`
chacha912 marked this conversation as resolved.
Show resolved Hide resolved
}

// EncodeOperations encodes the given operations into bytes array.
Expand All @@ -73,20 +72,6 @@ func EncodeOperations(operations []operations.Operation) ([][]byte, error) {
return encodedOps, nil
}

// EncodePresenceChange encodes the given presence change into string.
func EncodePresenceChange(p *innerpresence.PresenceChange) (string, error) {
if p == nil {
return "", nil
}

bytes, err := json.Marshal(p)
if err != nil {
return "", fmt.Errorf("marshal presence change to bytes: %w", err)
}

return string(bytes), nil
}

// ToChange creates Change model from this ChangeInfo.
func (i *ChangeInfo) ToChange() (*change.Change, error) {
actorID, err := time.ActorIDFromHex(i.ActorID.String())
Expand All @@ -110,12 +95,7 @@ func (i *ChangeInfo) ToChange() (*change.Change, error) {
return nil, err
}

p, err := innerpresence.NewChangeFromJSON(i.PresenceChange)
if err != nil {
return nil, err
}

c := change.New(changeID, i.Message, ops, p)
c := change.New(changeID, i.Message, ops, i.PresenceChange)
chacha912 marked this conversation as resolved.
Show resolved Hide resolved
c.SetServerSeq(i.ServerSeq)

return c, nil
Expand All @@ -132,3 +112,31 @@ func (i *ChangeInfo) DeepCopy() *ChangeInfo {

return clone
}

// EncodePresenceChange encodes the given PresenceChange into bytes array.
func EncodePresenceChange(p *innerpresence.PresenceChange) ([]byte, error) {
if p == nil {
return nil, nil
}

bytes, err := proto.Marshal(converter.ToPresenceChange(p))
if err != nil {
return nil, fmt.Errorf("encode presence change to bytes: %w", err)
}

return bytes, nil
}

// PresenceChangeFromBytes decodes the given bytes array into PresenceChange.
func PresenceChangeFromBytes(bytes []byte) (*innerpresence.PresenceChange, error) {
if bytes == nil {
return nil, nil
}

pbChange := &api.PresenceChange{}
if err := proto.Unmarshal(bytes, pbChange); err != nil {
return nil, fmt.Errorf("decode presence change: %w", err)
}

return converter.FromPresenceChange(pbChange), nil
}
Loading
Loading