diff --git a/api/converter/converter_test.go b/api/converter/converter_test.go index 1d58d5340..a689960a3 100644 --- a/api/converter/converter_test.go +++ b/api/converter/converter_test.go @@ -261,8 +261,10 @@ func TestConverter(t *testing.T) { }) t.Run("empty presence converting test", func(t *testing.T) { - change, err := innerpresence.NewChangeFromJSON(`{"ChangeType":"put","Presence":{}}`) - assert.NoError(t, err) + change := &innerpresence.PresenceChange{ + ChangeType: innerpresence.Put, + Presence: innerpresence.NewPresence(), + } pbChange := converter.ToPresenceChange(change) clone := converter.FromPresenceChange(pbChange) diff --git a/cmd/yorkie/migration.go b/cmd/yorkie/migration.go index a99ea0ca7..7ab31cdd7 100644 --- a/cmd/yorkie/migration.go +++ b/cmd/yorkie/migration.go @@ -30,6 +30,7 @@ import ( "github.com/yorkie-team/yorkie/cmd/yorkie/config" v053 "github.com/yorkie-team/yorkie/migrations/v0.5.3" + v056 "github.com/yorkie-team/yorkie/migrations/v0.5.6" yorkiemongo "github.com/yorkie-team/yorkie/server/backend/database/mongo" ) @@ -43,6 +44,7 @@ var ( // migrationMap is a map of migration functions for each version. var migrationMap = map[string]func(ctx context.Context, db *mongo.Client, dbName string, batchSize int) error{ "v0.5.3": v053.RunMigration, + "v0.5.6": v056.RunMigration, } // runMigration runs the migration for the given version. diff --git a/migrations/v0.5.6/main.go b/migrations/v0.5.6/main.go new file mode 100644 index 000000000..a12c66c1b --- /dev/null +++ b/migrations/v0.5.6/main.go @@ -0,0 +1,36 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package v056 provides migration for v0.5.6 +package v056 + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/mongo" +) + +// RunMigration runs migrations for v0.5.6 +func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error { + if err := MigratePresenceChange(ctx, db, databaseName, batchSize); err != nil { + return err + } + + fmt.Println("v0.5.6 migration completed") + + return nil +} diff --git a/migrations/v0.5.6/migrate-presence-change.go b/migrations/v0.5.6/migrate-presence-change.go new file mode 100644 index 000000000..98813eec8 --- /dev/null +++ b/migrations/v0.5.6/migrate-presence-change.go @@ -0,0 +1,166 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v056 + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// validatePresenceChangeMigration validates if all string presence changes are properly migrated +func validatePresenceChangeMigration(ctx context.Context, db *mongo.Client, databaseName string) error { + collection := db.Database(databaseName).Collection("changes") + + cursor, err := collection.Find(ctx, bson.M{ + "presence_change": bson.M{ + "$type": "string", + }, + }) + if err != nil { + return err + } + + for cursor.Next(ctx) { + var doc bson.M + if err := cursor.Decode(&doc); err != nil { + return fmt.Errorf("decode document: %w", err) + } + + if presenceChange, ok := doc["presence_change"]; ok { + if _, isString := presenceChange.(string); isString { + return fmt.Errorf("found presence change still stored as string") + } + } + } + + return nil +} + +// processMigrationBatchPresence processes a batch of presence change migrations +func processMigrationBatchPresence( + ctx context.Context, + collection *mongo.Collection, + docs []bson.M, +) error { + var operations []mongo.WriteModel + + for _, doc := range docs { + if presenceChange, ok := doc["presence_change"]; ok { + if presenceChangeStr, isString := presenceChange.(string); isString { + var operation *mongo.UpdateOneModel + + if presenceChangeStr == "" { + operation = mongo.NewUpdateOneModel().SetFilter(bson.M{ + "_id": doc["_id"], + }).SetUpdate(bson.M{ + "$set": bson.M{ + "presence_change": nil, + }, + }) + } else { + operation = mongo.NewUpdateOneModel().SetFilter(bson.M{ + "_id": doc["_id"], + }).SetUpdate(bson.M{ + "$set": bson.M{ + "presence_change": []byte(presenceChangeStr), + }, + }) + } + + operations = append(operations, operation) + } + } + } + + if len(operations) > 0 { + _, err := collection.BulkWrite(ctx, operations) + if err != nil { + return fmt.Errorf("execute bulk write: %w", err) + } + } + + return nil +} + +// MigratePresenceChange migrates presence changes from string to byte array format +func MigratePresenceChange(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error { + collection := db.Database(databaseName).Collection("changes") + filter := bson.M{ + "presence_change": bson.M{ + "$type": "string", + }, + } + + totalCount, err := collection.CountDocuments(ctx, filter) + if err != nil { + return err + } + if totalCount == 0 { + fmt.Println("No data found to migrate") + return nil + } + + batchCount := 1 + prevPercentage := 0 + cursor, err := collection.Find(ctx, filter) + if err != nil { + return err + } + + var docs []bson.M + + for cursor.Next(ctx) { + var doc bson.M + if err := cursor.Decode(&doc); err != nil { + return fmt.Errorf("decode document: %w", err) + } + + docs = append(docs, doc) + + if len(docs) >= batchSize { + if err := processMigrationBatchPresence(ctx, collection, docs); err != nil { + return err + } + + percentage := int(float64(batchSize*batchCount) / float64(totalCount) * 100) + + if percentage != prevPercentage { + fmt.Printf("%s.changes presence change migration %d%% completed \n", databaseName, percentage) + prevPercentage = percentage + } + + docs = docs[:0] + batchCount++ + } + } + + if len(docs) > 0 { + if err := processMigrationBatchPresence(ctx, collection, docs); err != nil { + return fmt.Errorf("process final batch: %w", err) + } + } + + if err := validatePresenceChangeMigration(ctx, db, databaseName); err != nil { + return err + } + + fmt.Printf("%s.changes presence change migration completed: %d converted \n", databaseName, totalCount) + return nil +} diff --git a/pkg/document/innerpresence/presence.go b/pkg/document/innerpresence/presence.go index fb1168a0b..d116a39fd 100644 --- a/pkg/document/innerpresence/presence.go +++ b/pkg/document/innerpresence/presence.go @@ -20,8 +20,6 @@ package innerpresence import ( - "encoding/json" - "fmt" "sync" ) @@ -100,22 +98,8 @@ const ( // PresenceChange represents the change of presence. type PresenceChange struct { - ChangeType PresenceChangeType `json:"changeType"` - Presence Presence `json:"presence"` -} - -// NewChangeFromJSON creates a new instance of PresenceChange from JSON. -func NewChangeFromJSON(encodedChange string) (*PresenceChange, error) { - if encodedChange == "" { - return nil, nil - } - - p := &PresenceChange{} - if err := json.Unmarshal([]byte(encodedChange), p); err != nil { - return nil, fmt.Errorf("unmarshal presence change: %w", err) - } - - return p, nil + ChangeType PresenceChangeType + Presence Presence } // Presence represents custom presence that can be defined by the client. @@ -132,10 +116,8 @@ func (p Presence) Set(key string, value string) { } // Clear clears the presence. -func (p Presence) Clear() { - for k := range p { - delete(p, k) - } +func (p *Presence) Clear() { + *p = make(map[string]string) } // DeepCopy copies itself deeply. @@ -143,6 +125,7 @@ func (p Presence) DeepCopy() Presence { if p == nil { return nil } + clone := make(map[string]string) for k, v := range p { clone[k] = v diff --git a/server/backend/database/change_info.go b/server/backend/database/change_info.go index 6f50bf713..6f18adce3 100644 --- a/server/backend/database/change_info.go +++ b/server/backend/database/change_info.go @@ -17,7 +17,6 @@ package database import ( - "encoding/json" "errors" "fmt" @@ -40,17 +39,17 @@ var ErrDecodeOperationFailed = errors.New("decode operations failed") // ChangeInfo is a structure representing information of a change. type ChangeInfo struct { - ID types.ID `bson:"_id"` - ProjectID types.ID `bson:"project_id"` - DocID types.ID `bson:"doc_id"` - ServerSeq int64 `bson:"server_seq"` - ClientSeq uint32 `bson:"client_seq"` - Lamport int64 `bson:"lamport"` - ActorID types.ID `bson:"actor_id"` - VersionVector time.VersionVector `bson:"version_vector"` - Message string `bson:"message"` - Operations [][]byte `bson:"operations"` - PresenceChange string `bson:"presence_change"` + ID types.ID `bson:"_id"` + ProjectID types.ID `bson:"project_id"` + DocID types.ID `bson:"doc_id"` + ServerSeq int64 `bson:"server_seq"` + ClientSeq uint32 `bson:"client_seq"` + Lamport int64 `bson:"lamport"` + ActorID types.ID `bson:"actor_id"` + VersionVector time.VersionVector `bson:"version_vector"` + Message string `bson:"message"` + Operations [][]byte `bson:"operations"` + PresenceChange *innerpresence.PresenceChange `bson:"presence_change"` } // EncodeOperations encodes the given operations into bytes array. @@ -73,20 +72,6 @@ func EncodeOperations(operations []operations.Operation) ([][]byte, error) { return encodedOps, nil } -// EncodePresenceChange encodes the given presence change into string. -func EncodePresenceChange(p *innerpresence.PresenceChange) (string, error) { - if p == nil { - return "", nil - } - - bytes, err := json.Marshal(p) - if err != nil { - return "", fmt.Errorf("marshal presence change to bytes: %w", err) - } - - return string(bytes), nil -} - // ToChange creates Change model from this ChangeInfo. func (i *ChangeInfo) ToChange() (*change.Change, error) { actorID, err := time.ActorIDFromHex(i.ActorID.String()) @@ -110,12 +95,7 @@ func (i *ChangeInfo) ToChange() (*change.Change, error) { return nil, err } - p, err := innerpresence.NewChangeFromJSON(i.PresenceChange) - if err != nil { - return nil, err - } - - c := change.New(changeID, i.Message, ops, p) + c := change.New(changeID, i.Message, ops, i.PresenceChange) c.SetServerSeq(i.ServerSeq) return c, nil @@ -132,3 +112,31 @@ func (i *ChangeInfo) DeepCopy() *ChangeInfo { return clone } + +// EncodePresenceChange encodes the given PresenceChange into bytes array. +func EncodePresenceChange(p *innerpresence.PresenceChange) ([]byte, error) { + if p == nil { + return nil, nil + } + + bytes, err := proto.Marshal(converter.ToPresenceChange(p)) + if err != nil { + return nil, fmt.Errorf("encode presence change to bytes: %w", err) + } + + return bytes, nil +} + +// PresenceChangeFromBytes decodes the given bytes array into PresenceChange. +func PresenceChangeFromBytes(bytes []byte) (*innerpresence.PresenceChange, error) { + if bytes == nil { + return nil, nil + } + + pbChange := &api.PresenceChange{} + if err := proto.Unmarshal(bytes, pbChange); err != nil { + return nil, fmt.Errorf("decode presence change: %w", err) + } + + return converter.FromPresenceChange(pbChange), nil +} diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index eda760ea6..fb6cfb066 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -901,10 +901,6 @@ func (d *DB) CreateChangeInfos( if err != nil { return err } - encodedPresence, err := database.EncodePresenceChange(cn.PresenceChange()) - if err != nil { - return err - } if err := txn.Insert(tblChanges, &database.ChangeInfo{ ID: newID(), @@ -917,7 +913,7 @@ func (d *DB) CreateChangeInfos( VersionVector: cn.ID().VersionVector(), Message: cn.Message(), Operations: encodedOperations, - PresenceChange: encodedPresence, + PresenceChange: cn.PresenceChange(), }); err != nil { return fmt.Errorf("create change: %w", err) } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index fabb06f15..de70d1308 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -866,10 +866,6 @@ func (c *Client) CreateChangeInfos( if err != nil { return err } - encodedPresence, err := database.EncodePresenceChange(cn.PresenceChange()) - if err != nil { - return err - } models = append(models, mongo.NewUpdateOneModel().SetFilter(bson.M{ "project_id": docRefKey.ProjectID, @@ -882,7 +878,7 @@ func (c *Client) CreateChangeInfos( "version_vector": cn.ID().VersionVector(), "message": cn.Message(), "operations": encodedOperations, - "presence_change": encodedPresence, + "presence_change": cn.PresenceChange(), }}).SetUpsert(true)) } diff --git a/server/backend/database/mongo/registry.go b/server/backend/database/mongo/registry.go index a8540a57c..4d95f4c72 100644 --- a/server/backend/database/mongo/registry.go +++ b/server/backend/database/mongo/registry.go @@ -29,12 +29,15 @@ import ( "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" + "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server/backend/database" ) var tID = reflect.TypeOf(types.ID("")) var tActorID = reflect.TypeOf(&time.ActorID{}) var tVersionVector = reflect.TypeOf(time.VersionVector{}) +var tPresenceChange = reflect.TypeOf(&innerpresence.PresenceChange{}) // NewRegistryBuilder returns a new registry builder with the default encoder and decoder. func NewRegistryBuilder() *bsoncodec.RegistryBuilder { @@ -50,11 +53,13 @@ func NewRegistryBuilder() *bsoncodec.RegistryBuilder { bsoncodec.NewStringCodec(bsonoptions.StringCodec().SetDecodeObjectIDAsHex(true)), ) rb.RegisterTypeDecoder(tVersionVector, bsoncodec.ValueDecoderFunc(versionVectorDecoder)) + rb.RegisterTypeDecoder(tPresenceChange, bsoncodec.ValueDecoderFunc(presenceChangeDecoder)) // Register the encoders for types.ID and time.ActorID. rb.RegisterTypeEncoder(tID, bsoncodec.ValueEncoderFunc(idEncoder)) rb.RegisterTypeEncoder(tActorID, bsoncodec.ValueEncoderFunc(actorIDEncoder)) rb.RegisterTypeEncoder(tVersionVector, bsoncodec.ValueEncoderFunc(versionVectorEncoder)) + rb.RegisterTypeEncoder(tPresenceChange, bsoncodec.ValueEncoderFunc(presenceChangeEncoder)) return rb } @@ -133,3 +138,59 @@ func versionVectorDecoder(_ bsoncodec.DecodeContext, vr bsonrw.ValueReader, val return nil } + +func presenceChangeEncoder(_ bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { + if !val.IsValid() || val.Type() != tPresenceChange { + return bsoncodec.ValueEncoderError{ + Name: "presenceChangeEncoder", Types: []reflect.Type{tPresenceChange}, Received: val} + } + + presenceChange := val.Interface().(*innerpresence.PresenceChange) + if presenceChange == nil { + if err := vw.WriteNull(); err != nil { + return fmt.Errorf("encode error: %w", err) + } + return nil + } + + bytes, err := database.EncodePresenceChange(presenceChange) + if err != nil { + return fmt.Errorf("encode error: %w", err) + } + + if err := vw.WriteBinary(bytes); err != nil { + return fmt.Errorf("encode error: %w", err) + } + + return nil +} + +func presenceChangeDecoder(_ bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { + if val.Type() != tPresenceChange { + return bsoncodec.ValueDecoderError{ + Name: "presenceChangeDecoder", Types: []reflect.Type{tPresenceChange}, Received: val} + } + + switch vrType := vr.Type(); vrType { + case bson.TypeNull: + if err := vr.ReadNull(); err != nil { + return fmt.Errorf("decode error: %w", err) + } + val.Set(reflect.Zero(tPresenceChange)) + return nil + case bson.TypeBinary: + data, _, err := vr.ReadBinary() + if err != nil { + return fmt.Errorf("decode error: %w", err) + } + + presenceChange, err := database.PresenceChangeFromBytes(data) + if err != nil { + return fmt.Errorf("decode error: %w", err) + } + val.Set(reflect.ValueOf(presenceChange)) + return nil + default: + return fmt.Errorf("unsupported type: %v", vr.Type()) + } +} diff --git a/server/backend/database/mongo/registry_test.go b/server/backend/database/mongo/registry_test.go index ae4887f8b..02c0cbf85 100644 --- a/server/backend/database/mongo/registry_test.go +++ b/server/backend/database/mongo/registry_test.go @@ -28,6 +28,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend/database" ) @@ -64,6 +65,27 @@ func TestRegistry(t *testing.T) { assert.NoError(t, bson.UnmarshalWithRegistry(registry, data, &info)) assert.Equal(t, vector, info.VersionVector) }) + + t.Run("presenceChange test", func(t *testing.T) { + presence := innerpresence.NewPresence() + presence.Set("color", "orange") + presenceChange := &innerpresence.PresenceChange{ + ChangeType: innerpresence.Put, + Presence: presence, + } + + data, err := bson.MarshalWithRegistry(registry, bson.M{ + "presence_change": presenceChange, + }) + assert.NoError(t, err) + + info := struct { + PresenceChange *innerpresence.PresenceChange `bson:"presence_change"` + }{} + assert.NoError(t, bson.UnmarshalWithRegistry(registry, data, &info)) + + assert.Equal(t, presenceChange, info.PresenceChange) + }) } func TestEncoder(t *testing.T) { diff --git a/server/packs/serverpacks.go b/server/packs/serverpacks.go index 2b3d147bf..d90ec5601 100644 --- a/server/packs/serverpacks.go +++ b/server/packs/serverpacks.go @@ -22,7 +22,6 @@ import ( "github.com/yorkie-team/yorkie/api/converter" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document/change" - "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend/database" @@ -111,11 +110,6 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) { pbOps = append(pbOps, &pbOp) } - p, err := innerpresence.NewChangeFromJSON(info.PresenceChange) - if err != nil { - return nil, err - } - pbChangeID, err := converter.ToChangeID(changeID) if err != nil { return nil, err @@ -125,7 +119,7 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) { Id: pbChangeID, Message: info.Message, Operations: pbOps, - PresenceChange: converter.ToPresenceChange(p), + PresenceChange: converter.ToPresenceChange(info.PresenceChange), }) }