Skip to content

Commit

Permalink
Add migration for converting string presence change to binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
chacha912 committed Nov 14, 2024
1 parent 49657d0 commit b9b1c0c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/yorkie/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/yorkie-team/yorkie/cmd/yorkie/config"
v053 "github.com/yorkie-team/yorkie/migrations/v0.5.3"
v056 "github.com/yorkie-team/yorkie/migrations/v0.5.6"
yorkiemongo "github.com/yorkie-team/yorkie/server/backend/database/mongo"
)

Expand All @@ -43,6 +44,7 @@ var (
// migrationMap is a map of migration functions for each version.
var migrationMap = map[string]func(ctx context.Context, db *mongo.Client, dbName string, batchSize int) error{
"v0.5.3": v053.RunMigration,
"v0.5.6": v056.RunMigration,
}

// runMigration runs the migration for the given version.
Expand Down
36 changes: 36 additions & 0 deletions migrations/v0.5.6/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package v056 provides migration for v0.5.6
package v056

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/mongo"
)

// RunMigration runs migrations for v0.5.6
func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
if err := MigratePresenceChange(ctx, db, databaseName, batchSize); err != nil {
return err
}

fmt.Println("v0.5.6 migration completed")

return nil
}
166 changes: 166 additions & 0 deletions migrations/v0.5.6/migrate-presence-change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v056

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

// validatePresenceChangeMigration validates if all string presence changes are properly migrated
func validatePresenceChangeMigration(ctx context.Context, db *mongo.Client, databaseName string) error {
collection := db.Database(databaseName).Collection("changes")

cursor, err := collection.Find(ctx, bson.M{
"presence_change": bson.M{
"$type": "string",
},
})
if err != nil {
return err
}

for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("decode document: %w", err)
}

if presenceChange, ok := doc["presence_change"]; ok {
if _, isString := presenceChange.(string); isString {
return fmt.Errorf("found presence change still stored as string")
}
}
}

return nil
}

// processMigrationBatchPresence processes a batch of presence change migrations
func processMigrationBatchPresence(
ctx context.Context,
collection *mongo.Collection,
docs []bson.M,
) error {
var operations []mongo.WriteModel

for _, doc := range docs {
if presenceChange, ok := doc["presence_change"]; ok {
if presenceChangeStr, isString := presenceChange.(string); isString {
var operation *mongo.UpdateOneModel

if presenceChangeStr == "" {
operation = mongo.NewUpdateOneModel().SetFilter(bson.M{
"_id": doc["_id"],
}).SetUpdate(bson.M{
"$set": bson.M{
"presence_change": nil,
},
})
} else {
operation = mongo.NewUpdateOneModel().SetFilter(bson.M{
"_id": doc["_id"],
}).SetUpdate(bson.M{
"$set": bson.M{
"presence_change": []byte(presenceChangeStr),
},
})
}

operations = append(operations, operation)
}
}
}

if len(operations) > 0 {
_, err := collection.BulkWrite(ctx, operations)
if err != nil {
return fmt.Errorf("execute bulk write: %w", err)
}
}

return nil
}

// MigratePresenceChange migrates presence changes from string to byte array format
func MigratePresenceChange(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
collection := db.Database(databaseName).Collection("changes")
filter := bson.M{
"presence_change": bson.M{
"$type": "string",
},
}

totalCount, err := collection.CountDocuments(ctx, filter)
if err != nil {
return err
}
if totalCount == 0 {
fmt.Println("No data found to migrate")
return nil
}

batchCount := 1
prevPercentage := 0
cursor, err := collection.Find(ctx, filter)
if err != nil {
return err
}

var docs []bson.M

for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("decode document: %w", err)
}

docs = append(docs, doc)

if len(docs) >= batchSize {
if err := processMigrationBatchPresence(ctx, collection, docs); err != nil {
return err
}

percentage := int(float64(batchSize*batchCount) / float64(totalCount) * 100)

if percentage != prevPercentage {
fmt.Printf("%s.changes presence change migration %d%% completed \n", databaseName, percentage)
prevPercentage = percentage
}

docs = docs[:0]
batchCount++
}
}

if len(docs) > 0 {
if err := processMigrationBatchPresence(ctx, collection, docs); err != nil {
return fmt.Errorf("process final batch: %w", err)
}
}

if err := validatePresenceChangeMigration(ctx, db, databaseName); err != nil {
return err
}

fmt.Printf("%s.changes presence change migration completed: %d converted \n", databaseName, totalCount)
return nil
}

0 comments on commit b9b1c0c

Please sign in to comment.