From a8605ef830778ee67fbc9d7f3b82bc952811ff3d Mon Sep 17 00:00:00 2001 From: raararaara Date: Thu, 31 Oct 2024 14:19:55 +0900 Subject: [PATCH 1/9] Make change pack directly when document detach --- server/rpc/cluster_server.go | 40 ++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index 3d86178c0..cdf804886 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -25,7 +25,8 @@ import ( "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document" - "github.com/yorkie-team/yorkie/pkg/document/json" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/presence" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend" @@ -93,20 +94,32 @@ func (s *clusterServer) DetachDocument( return nil, err } + cp := clientInfo.Checkpoint(summary.ID) + changeCtx := change.NewContext( + change.NewID(cp.ClientSeq, cp.ServerSeq, 0, actorID, nil).Next(), + "", + nil, + ) + p := presence.New(changeCtx, innerpresence.NewPresence()) + p.Clear() + + changes := []*change.Change{changeCtx.ToChange()} + pack := change.NewPack(docInfo.Key, cp, changes, nil, nil) + // TODO(hackerwins): BuildDocForCheckpoint is expensive because it reads the entire document. // We need to optimize this by creating a ChangePack directly. // 01. Create ChangePack with clear presence. - doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID) - if err != nil { - return nil, err - } - - if err := doc.Update(func(root *json.Object, p *presence.Presence) error { - p.Clear() - return nil - }); err != nil { - return nil, err - } + //doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID) + //if err != nil { + // return nil, err + //} + // + //if err := doc.Update(func(root *json.Object, p *presence.Presence) error { + // p.Clear() + // return nil + //}); err != nil { + // return nil, err + //} // 02. PushPull with the created ChangePack. if _, err := packs.PushPull( @@ -115,7 +128,8 @@ func (s *clusterServer) DetachDocument( project, clientInfo, docInfo, - doc.CreateChangePack(), + //doc.CreateChangePack(), + pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusDetached, From 5aacd18bc9c139d357cd2783154b757a16b022c4 Mon Sep 17 00:00:00 2001 From: raararaara Date: Thu, 31 Oct 2024 16:54:33 +0900 Subject: [PATCH 2/9] Add latest change info when creating change pack --- server/backend/database/database.go | 7 ++++++ server/backend/database/memory/database.go | 22 ++++++++++++++++++ server/backend/database/mongo/client.go | 27 ++++++++++++++++++++++ server/rpc/cluster_server.go | 24 ++++++------------- 4 files changed, 63 insertions(+), 17 deletions(-) diff --git a/server/backend/database/database.go b/server/backend/database/database.go index fb4d0dae8..89fe14047 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -219,6 +219,13 @@ type Database interface { docRefKey types.DocRefKey, ) error + // FindChangeInfoByServerSeq returns the change by the given server sequence. + FindChangeInfoByServerSeq( + ctx context.Context, + docRefKey types.DocRefKey, + serverSeq int64, + ) (*ChangeInfo, error) + // FindChangesBetweenServerSeqs returns the changes between two server sequences. FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 98d17d833..fcb54fabd 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1012,6 +1012,28 @@ func (d *DB) PurgeStaleChanges( return nil } +// FindChangeInfoByServerSeq returns the change by the given server sequence. +func (d *DB) FindChangeInfoByServerSeq( + _ context.Context, + docRefKey types.DocRefKey, + serverSeq int64, +) (*database.ChangeInfo, error) { + txn := d.db.Txn(false) + defer txn.Abort() + raw, err := txn.First(tblSnapshots, "doc_id_server_seq", + docRefKey.DocID.String(), + serverSeq, + ) + if err != nil { + return nil, fmt.Errorf("find snapshot by serverSeq: %w", err) + } + if raw == nil { + return nil, fmt.Errorf("%s: %w", docRefKey, database.ErrChangeNotFound) + } + + return raw.(*database.ChangeInfo).DeepCopy(), nil +} + // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (d *DB) FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 29208624d..f830fcbf0 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -977,6 +977,33 @@ func (c *Client) PurgeStaleChanges( return nil } +// FindChangeInfoByServerSeq returns the change by the given server sequence. +func (c *Client) FindChangeInfoByServerSeq( + ctx context.Context, + docRefKey types.DocRefKey, + serverSeq int64, +) (*database.ChangeInfo, error) { + result := c.collection(ColChanges).FindOne(ctx, bson.M{ + "project_id": docRefKey.ProjectID, + "doc_id": docRefKey.DocID, + "server_seq": serverSeq, + }) + + changeInfo := &database.ChangeInfo{} + if result.Err() == mongo.ErrNoDocuments { + return changeInfo, nil + } + if result.Err() != nil { + return nil, fmt.Errorf("find change: %w", result.Err()) + } + + if err := result.Decode(changeInfo); err != nil { + return nil, fmt.Errorf("decode change: %w", err) + } + + return changeInfo, nil +} + // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (c *Client) FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index cdf804886..a92e560bb 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -94,9 +94,14 @@ func (s *clusterServer) DetachDocument( return nil, err } + // 02. Create changePack with presence clear change cp := clientInfo.Checkpoint(summary.ID) + latestChange, err := s.backend.DB.FindChangeInfoByServerSeq(ctx, docRefKey, cp.ServerSeq) + if err != nil { + return nil, err + } changeCtx := change.NewContext( - change.NewID(cp.ClientSeq, cp.ServerSeq, 0, actorID, nil).Next(), + change.NewID(cp.ClientSeq, cp.ServerSeq, latestChange.Lamport, actorID, latestChange.VersionVector).Next(), "", nil, ) @@ -106,22 +111,7 @@ func (s *clusterServer) DetachDocument( changes := []*change.Change{changeCtx.ToChange()} pack := change.NewPack(docInfo.Key, cp, changes, nil, nil) - // TODO(hackerwins): BuildDocForCheckpoint is expensive because it reads the entire document. - // We need to optimize this by creating a ChangePack directly. - // 01. Create ChangePack with clear presence. - //doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID) - //if err != nil { - // return nil, err - //} - // - //if err := doc.Update(func(root *json.Object, p *presence.Presence) error { - // p.Clear() - // return nil - //}); err != nil { - // return nil, err - //} - - // 02. PushPull with the created ChangePack. + // 03. PushPull with the created ChangePack. if _, err := packs.PushPull( ctx, s.backend, From 6e69b6351e53c5755ef9a561ca40382ce2076c41 Mon Sep 17 00:00:00 2001 From: raararaara Date: Mon, 4 Nov 2024 22:16:57 +0900 Subject: [PATCH 3/9] Fix lamport and version vector when detach --- server/rpc/cluster_server.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index a92e560bb..24d90decb 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -17,10 +17,8 @@ package rpc import ( - "context" - "connectrpc.com/connect" - + "context" "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" @@ -94,14 +92,19 @@ func (s *clusterServer) DetachDocument( return nil, err } - // 02. Create changePack with presence clear change + // 01. Create changePack with presence clear change cp := clientInfo.Checkpoint(summary.ID) latestChange, err := s.backend.DB.FindChangeInfoByServerSeq(ctx, docRefKey, cp.ServerSeq) if err != nil { return nil, err } + maxLamport := latestChange.Lamport + if cp.ServerSeq > maxLamport { + maxLamport = cp.ServerSeq + } + latestChange.VersionVector.Set(actorID, maxLamport) changeCtx := change.NewContext( - change.NewID(cp.ClientSeq, cp.ServerSeq, latestChange.Lamport, actorID, latestChange.VersionVector).Next(), + change.NewID(cp.ClientSeq, cp.ServerSeq, maxLamport, actorID, latestChange.VersionVector).Next(), "", nil, ) From 267b9fcb1397e07bdf5dc31a5cfc7f1f920b10a7 Mon Sep 17 00:00:00 2001 From: raararaara Date: Wed, 6 Nov 2024 20:58:50 +0900 Subject: [PATCH 4/9] Apply latest change info when generating change during clusterServer.detach --- server/backend/database/database.go | 7 ++++++ server/backend/database/memory/database.go | 9 +++++++ server/backend/database/mongo/client.go | 29 ++++++++++++++++++++++ server/backend/database/mongo/indexes.go | 6 +++++ server/rpc/cluster_server.go | 9 ++++--- 5 files changed, 56 insertions(+), 4 deletions(-) diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 89fe14047..814a5b661 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -219,6 +219,13 @@ type Database interface { docRefKey types.DocRefKey, ) error + // FindLatestChangeInfoByActor returns the latest change created by given actorID. + FindLatestChangeInfoByActor( + ctx context.Context, + docRefKey types.DocRefKey, + actorID types.ID, + ) (*ChangeInfo, error) + // FindChangeInfoByServerSeq returns the change by the given server sequence. FindChangeInfoByServerSeq( ctx context.Context, diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index fcb54fabd..21e9fdde4 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1012,6 +1012,15 @@ func (d *DB) PurgeStaleChanges( return nil } +// FindLatestChangeInfoByActor returns the latest change created by given actorID. +func (d *DB) FindLatestChangeInfoByActor( + _ context.Context, + _ types.DocRefKey, + _ types.ID, +) (*database.ChangeInfo, error) { + return nil, nil +} + // FindChangeInfoByServerSeq returns the change by the given server sequence. func (d *DB) FindChangeInfoByServerSeq( _ context.Context, diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index f830fcbf0..af343988a 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -977,6 +977,35 @@ func (c *Client) PurgeStaleChanges( return nil } +// FindLatestChangeInfoByActor returns the latest change created by given actorID. +func (c *Client) FindLatestChangeInfoByActor( + ctx context.Context, + docRefKey types.DocRefKey, + actorID types.ID, +) (*database.ChangeInfo, error) { + result := c.collection(ColChanges).FindOne(ctx, bson.M{ + "project_id": docRefKey.ProjectID, + "doc_id": docRefKey.DocID, + "actor_id": actorID, + }, options.FindOne().SetSort(bson.M{ + "lamport": -1, + })) + + changeInfo := &database.ChangeInfo{} + if result.Err() == mongo.ErrNoDocuments { + return changeInfo, nil + } + if result.Err() != nil { + return nil, fmt.Errorf("find change: %w", result.Err()) + } + + if err := result.Decode(changeInfo); err != nil { + return nil, fmt.Errorf("decode change: %w", err) + } + + return changeInfo, nil +} + // FindChangeInfoByServerSeq returns the change by the given server sequence. func (c *Client) FindChangeInfoByServerSeq( ctx context.Context, diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index 93a4a75f0..fef4a28bc 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -127,6 +127,12 @@ var collectionInfos = []collectionInfo{ {Key: "server_seq", Value: bsonx.Int32(1)}, }, Options: options.Index().SetUnique(true), + }, { + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key + {Key: "project_id", Value: bsonx.Int32(1)}, + {Key: "actor_id", Value: bsonx.Int32(1)}, + }, }}, }, { name: ColSnapshots, diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index 24d90decb..ec658ad01 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -17,8 +17,10 @@ package rpc import ( - "connectrpc.com/connect" "context" + + "connectrpc.com/connect" + "github.com/yorkie-team/yorkie/api/converter" "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" @@ -94,7 +96,7 @@ func (s *clusterServer) DetachDocument( // 01. Create changePack with presence clear change cp := clientInfo.Checkpoint(summary.ID) - latestChange, err := s.backend.DB.FindChangeInfoByServerSeq(ctx, docRefKey, cp.ServerSeq) + latestChange, err := s.backend.DB.FindLatestChangeInfoByActor(ctx, docRefKey, types.ID(req.Msg.ClientId)) if err != nil { return nil, err } @@ -114,14 +116,13 @@ func (s *clusterServer) DetachDocument( changes := []*change.Change{changeCtx.ToChange()} pack := change.NewPack(docInfo.Key, cp, changes, nil, nil) - // 03. PushPull with the created ChangePack. + // 02. PushPull with the created ChangePack. if _, err := packs.PushPull( ctx, s.backend, project, clientInfo, docInfo, - //doc.CreateChangePack(), pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, From f1baf051c5d70245e74e0c5f88a6aa72711c12f0 Mon Sep 17 00:00:00 2001 From: raararaara Date: Wed, 6 Nov 2024 22:21:42 +0900 Subject: [PATCH 5/9] Add tests for `FindLatestChangeInfoByActor` --- server/backend/database/database.go | 7 --- server/backend/database/memory/database.go | 30 +++++----- .../backend/database/memory/database_test.go | 4 ++ server/backend/database/memory/indexes.go | 9 +++ server/backend/database/mongo/client.go | 27 --------- server/backend/database/mongo/client_test.go | 4 ++ .../backend/database/testcases/testcases.go | 60 +++++++++++++++++++ 7 files changed, 91 insertions(+), 50 deletions(-) diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 814a5b661..85b50ca83 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -226,13 +226,6 @@ type Database interface { actorID types.ID, ) (*ChangeInfo, error) - // FindChangeInfoByServerSeq returns the change by the given server sequence. - FindChangeInfoByServerSeq( - ctx context.Context, - docRefKey types.DocRefKey, - serverSeq int64, - ) (*ChangeInfo, error) - // FindChangesBetweenServerSeqs returns the changes between two server sequences. FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 21e9fdde4..527575698 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1014,33 +1014,31 @@ func (d *DB) PurgeStaleChanges( // FindLatestChangeInfoByActor returns the latest change created by given actorID. func (d *DB) FindLatestChangeInfoByActor( - _ context.Context, - _ types.DocRefKey, - _ types.ID, -) (*database.ChangeInfo, error) { - return nil, nil -} - -// FindChangeInfoByServerSeq returns the change by the given server sequence. -func (d *DB) FindChangeInfoByServerSeq( _ context.Context, docRefKey types.DocRefKey, - serverSeq int64, + actorID types.ID, ) (*database.ChangeInfo, error) { txn := d.db.Txn(false) defer txn.Abort() - raw, err := txn.First(tblSnapshots, "doc_id_server_seq", + + iterator, err := txn.GetReverse( + tblChanges, + "doc_id_actor_id", docRefKey.DocID.String(), - serverSeq, + actorID.String(), ) if err != nil { - return nil, fmt.Errorf("find snapshot by serverSeq: %w", err) + return nil, fmt.Errorf("fetch changes of %s: %w", actorID, err) } - if raw == nil { - return nil, fmt.Errorf("%s: %w", docRefKey, database.ErrChangeNotFound) + + for raw := iterator.Next(); raw != nil; raw = iterator.Next() { + info := raw.(*database.ChangeInfo) + if info != nil && info.ActorID == actorID { + return info, nil + } } - return raw.(*database.ChangeInfo).DeepCopy(), nil + return nil, database.ErrChangeNotFound } // FindChangesBetweenServerSeqs returns the changes between two server sequences. diff --git a/server/backend/database/memory/database_test.go b/server/backend/database/memory/database_test.go index d308940a7..a6a59104c 100644 --- a/server/backend/database/memory/database_test.go +++ b/server/backend/database/memory/database_test.go @@ -64,6 +64,10 @@ func TestDB(t *testing.T) { testcases.RunFindChangeInfosBetweenServerSeqsTest(t, db, projectID) }) + t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) { + testcases.RunFindLatestChangeInfoTest(t, db, projectID) + }) + t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) { testcases.RunFindClosestSnapshotInfoTest(t, db, projectID) }) diff --git a/server/backend/database/memory/indexes.go b/server/backend/database/memory/indexes.go index 2a51f82e8..01c71a1f3 100644 --- a/server/backend/database/memory/indexes.go +++ b/server/backend/database/memory/indexes.go @@ -167,6 +167,15 @@ var schema = &memdb.DBSchema{ }, }, }, + "doc_id_actor_id": { + Name: "doc_id_actor_id", + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.StringFieldIndex{Field: "ActorID"}, + }, + }, + }, }, }, tblSnapshots: { diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index af343988a..9ef06dc69 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1006,33 +1006,6 @@ func (c *Client) FindLatestChangeInfoByActor( return changeInfo, nil } -// FindChangeInfoByServerSeq returns the change by the given server sequence. -func (c *Client) FindChangeInfoByServerSeq( - ctx context.Context, - docRefKey types.DocRefKey, - serverSeq int64, -) (*database.ChangeInfo, error) { - result := c.collection(ColChanges).FindOne(ctx, bson.M{ - "project_id": docRefKey.ProjectID, - "doc_id": docRefKey.DocID, - "server_seq": serverSeq, - }) - - changeInfo := &database.ChangeInfo{} - if result.Err() == mongo.ErrNoDocuments { - return changeInfo, nil - } - if result.Err() != nil { - return nil, fmt.Errorf("find change: %w", result.Err()) - } - - if err := result.Decode(changeInfo); err != nil { - return nil, fmt.Errorf("decode change: %w", err) - } - - return changeInfo, nil -} - // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (c *Client) FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index 6e0803b5d..15749678d 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -80,6 +80,10 @@ func TestClient(t *testing.T) { testcases.RunFindChangeInfosBetweenServerSeqsTest(t, cli, dummyProjectID) }) + t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) { + testcases.RunFindLatestChangeInfoTest(t, cli, dummyProjectID) + }) + t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) { testcases.RunFindClosestSnapshotInfoTest(t, cli, dummyProjectID) }) diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index 36f3f00c7..9f7ce58f1 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -523,6 +523,66 @@ func RunFindChangeInfosBetweenServerSeqsTest( }) } +// RunFindLatestChangeInfoTest runs the FindLatestChangeInfoByActor test for the given db. +func RunFindLatestChangeInfoTest(t *testing.T, + db database.Database, + projectID types.ID, +) { + t.Run("store changes and find latest changeInfo test", func(t *testing.T) { + ctx := context.Background() + + docKey := key.Key(fmt.Sprintf("tests$%s", t.Name())) + + clientInfo, _ := db.ActivateClient(ctx, projectID, t.Name()) + docInfo, _ := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true) + docRefKey := docInfo.RefKey() + assert.NoError(t, clientInfo.AttachDocument(docInfo.ID, false)) + assert.NoError(t, db.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo)) + + initialServerSeq := docInfo.ServerSeq + + // 01. Create a document and store changes + bytesID, _ := clientInfo.ID.Bytes() + actorID, _ := time.ActorIDFromBytes(bytesID) + doc := document.New(key.Key(t.Name())) + doc.SetActor(actorID) + assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { + root.SetNewArray("array") + return nil + })) + for idx := 0; idx < 5; idx++ { + assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { + root.GetArray("array").AddInteger(idx) + return nil + })) + } + pack := doc.CreateChangePack() + for _, c := range pack.Changes { + serverSeq := docInfo.IncreaseServerSeq() + c.SetServerSeq(serverSeq) + } + + err := db.CreateChangeInfos( + ctx, + projectID, + docInfo, + initialServerSeq, + pack.Changes, + false, + ) + assert.NoError(t, err) + + // 02. Find latest changeInfo + latestChangeInfo, err := db.FindLatestChangeInfoByActor( + ctx, + docRefKey, + types.ID(actorID.String()), + ) + assert.NoError(t, err) + assert.Equal(t, latestChangeInfo.Lamport, int64(6)) + }) +} + // RunFindClosestSnapshotInfoTest runs the FindClosestSnapshotInfo test for the given db. func RunFindClosestSnapshotInfoTest(t *testing.T, db database.Database, projectID types.ID) { t.Run("store and find snapshots test", func(t *testing.T) { From 9f93a91c8337d19fe8852d13cfabf4badc71b08a Mon Sep 17 00:00:00 2001 From: raararaara Date: Wed, 6 Nov 2024 23:58:32 +0900 Subject: [PATCH 6/9] Remove unnecessary lamport adjustment logic --- server/rpc/cluster_server.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index ec658ad01..494e26e9e 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -100,13 +100,8 @@ func (s *clusterServer) DetachDocument( if err != nil { return nil, err } - maxLamport := latestChange.Lamport - if cp.ServerSeq > maxLamport { - maxLamport = cp.ServerSeq - } - latestChange.VersionVector.Set(actorID, maxLamport) changeCtx := change.NewContext( - change.NewID(cp.ClientSeq, cp.ServerSeq, maxLamport, actorID, latestChange.VersionVector).Next(), + change.NewID(cp.ClientSeq, cp.ServerSeq, latestChange.Lamport, actorID, latestChange.VersionVector).Next(), "", nil, ) @@ -125,7 +120,7 @@ func (s *clusterServer) DetachDocument( docInfo, pack, packs.PushPullOptions{ - Mode: types.SyncModePushPull, + Mode: types.SyncModePushOnly, Status: document.StatusDetached, }, ); err != nil { From e56e9f87f74e400e43b40ca48c7f3074412de58b Mon Sep 17 00:00:00 2001 From: raararaara Date: Fri, 8 Nov 2024 15:17:41 +0900 Subject: [PATCH 7/9] Add `actor_id` field to Change collection index in MongoDB for improved query efficiency --- server/backend/database/database.go | 1 + server/backend/database/memory/database.go | 6 ++++-- server/backend/database/memory/indexes.go | 5 +++-- server/backend/database/mongo/client.go | 12 +++++++++--- server/backend/database/mongo/indexes.go | 7 +------ server/backend/database/testcases/testcases.go | 14 ++++++++++++-- server/rpc/cluster_server.go | 9 +++++++-- 7 files changed, 37 insertions(+), 17 deletions(-) diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 85b50ca83..0b846e502 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -224,6 +224,7 @@ type Database interface { ctx context.Context, docRefKey types.DocRefKey, actorID types.ID, + serverSeq int64, ) (*ChangeInfo, error) // FindChangesBetweenServerSeqs returns the changes between two server sequences. diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 527575698..eda760ea6 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1017,15 +1017,17 @@ func (d *DB) FindLatestChangeInfoByActor( _ context.Context, docRefKey types.DocRefKey, actorID types.ID, + serverSeq int64, ) (*database.ChangeInfo, error) { txn := d.db.Txn(false) defer txn.Abort() - iterator, err := txn.GetReverse( + iterator, err := txn.ReverseLowerBound( tblChanges, - "doc_id_actor_id", + "doc_id_actor_id_server_seq", docRefKey.DocID.String(), actorID.String(), + serverSeq, ) if err != nil { return nil, fmt.Errorf("fetch changes of %s: %w", actorID, err) diff --git a/server/backend/database/memory/indexes.go b/server/backend/database/memory/indexes.go index 01c71a1f3..4c30d7041 100644 --- a/server/backend/database/memory/indexes.go +++ b/server/backend/database/memory/indexes.go @@ -167,12 +167,13 @@ var schema = &memdb.DBSchema{ }, }, }, - "doc_id_actor_id": { - Name: "doc_id_actor_id", + "doc_id_actor_id_server_seq": { + Name: "doc_id_actor_id_server_seq", Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "DocID"}, &memdb.StringFieldIndex{Field: "ActorID"}, + &memdb.IntFieldIndex{Field: "ServerSeq"}, }, }, }, diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 9ef06dc69..fabb06f15 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -982,14 +982,20 @@ func (c *Client) FindLatestChangeInfoByActor( ctx context.Context, docRefKey types.DocRefKey, actorID types.ID, + serverSeq int64, ) (*database.ChangeInfo, error) { + option := options.FindOne().SetSort(bson.M{ + "server_seq": -1, + }) + result := c.collection(ColChanges).FindOne(ctx, bson.M{ "project_id": docRefKey.ProjectID, "doc_id": docRefKey.DocID, "actor_id": actorID, - }, options.FindOne().SetSort(bson.M{ - "lamport": -1, - })) + "server_seq": bson.M{ + "$lte": serverSeq, + }, + }, option) changeInfo := &database.ChangeInfo{} if result.Err() == mongo.ErrNoDocuments { diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index fef4a28bc..d8bbd4c8a 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -124,15 +124,10 @@ var collectionInfos = []collectionInfo{ Keys: bsonx.Doc{ {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key {Key: "project_id", Value: bsonx.Int32(1)}, + {Key: "actor_id", Value: bsonx.Int32(1)}, {Key: "server_seq", Value: bsonx.Int32(1)}, }, Options: options.Index().SetUnique(true), - }, { - Keys: bsonx.Doc{ - {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key - {Key: "project_id", Value: bsonx.Int32(1)}, - {Key: "actor_id", Value: bsonx.Int32(1)}, - }, }}, }, { name: ColSnapshots, diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index 9f7ce58f1..ecd362133 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -572,14 +572,24 @@ func RunFindLatestChangeInfoTest(t *testing.T, ) assert.NoError(t, err) - // 02. Find latest changeInfo + // 02-1. Find all changes + changes, err := db.FindChangesBetweenServerSeqs(ctx, docRefKey, 1, 10) + assert.NoError(t, err) + maxLamport := int64(0) + for _, ch := range changes { + if maxLamport < ch.ID().Lamport() { + maxLamport = ch.ID().Lamport() + } + } + // 02-2. Find latest changeInfo and compare results. latestChangeInfo, err := db.FindLatestChangeInfoByActor( ctx, docRefKey, types.ID(actorID.String()), + 10, ) assert.NoError(t, err) - assert.Equal(t, latestChangeInfo.Lamport, int64(6)) + assert.Equal(t, maxLamport, latestChangeInfo.Lamport) }) } diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index 494e26e9e..bf3e001a8 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -96,12 +96,17 @@ func (s *clusterServer) DetachDocument( // 01. Create changePack with presence clear change cp := clientInfo.Checkpoint(summary.ID) - latestChange, err := s.backend.DB.FindLatestChangeInfoByActor(ctx, docRefKey, types.ID(req.Msg.ClientId)) + latestChangeInfo, err := s.backend.DB.FindLatestChangeInfoByActor( + ctx, + docRefKey, + types.ID(req.Msg.ClientId), + cp.ServerSeq, + ) if err != nil { return nil, err } changeCtx := change.NewContext( - change.NewID(cp.ClientSeq, cp.ServerSeq, latestChange.Lamport, actorID, latestChange.VersionVector).Next(), + change.NewID(cp.ClientSeq, cp.ServerSeq, latestChangeInfo.Lamport, actorID, latestChangeInfo.VersionVector).Next(), "", nil, ) From 0a770cddb2e31e0d3a58ce982b4c0a66a3a4c2fb Mon Sep 17 00:00:00 2001 From: raararaara Date: Fri, 8 Nov 2024 17:09:59 +0900 Subject: [PATCH 8/9] Revise change collection index --- server/backend/database/mongo/indexes.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index d8bbd4c8a..578c8bf38 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -121,6 +121,13 @@ var collectionInfos = []collectionInfo{ }, { name: ColChanges, indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key + {Key: "project_id", Value: bsonx.Int32(1)}, + {Key: "server_seq", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }, { Keys: bsonx.Doc{ {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key {Key: "project_id", Value: bsonx.Int32(1)}, From c3f9542add83249fb5b63162ee67a3dc2ca36864 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Fri, 8 Nov 2024 18:59:08 +0900 Subject: [PATCH 9/9] Clean up codes --- .../backend/database/testcases/testcases.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index ecd362133..8f363db2f 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -533,17 +533,23 @@ func RunFindLatestChangeInfoTest(t *testing.T, docKey := key.Key(fmt.Sprintf("tests$%s", t.Name())) - clientInfo, _ := db.ActivateClient(ctx, projectID, t.Name()) - docInfo, _ := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true) + // 01. Activate client and find document info. + clientInfo, err := db.ActivateClient(ctx, projectID, t.Name()) + assert.NoError(t, err) + docInfo, err := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true) + assert.NoError(t, err) docRefKey := docInfo.RefKey() assert.NoError(t, clientInfo.AttachDocument(docInfo.ID, false)) assert.NoError(t, db.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo)) initialServerSeq := docInfo.ServerSeq - // 01. Create a document and store changes - bytesID, _ := clientInfo.ID.Bytes() + // 02. Create a document and store changes. + bytesID, err := clientInfo.ID.Bytes() + assert.NoError(t, err) actorID, _ := time.ActorIDFromBytes(bytesID) + assert.NoError(t, err) + doc := document.New(key.Key(t.Name())) doc.SetActor(actorID) assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { @@ -562,17 +568,16 @@ func RunFindLatestChangeInfoTest(t *testing.T, c.SetServerSeq(serverSeq) } - err := db.CreateChangeInfos( + assert.NoError(t, db.CreateChangeInfos( ctx, projectID, docInfo, initialServerSeq, pack.Changes, false, - ) - assert.NoError(t, err) + )) - // 02-1. Find all changes + // 03. Find all changes and determine the maximum Lamport timestamp. changes, err := db.FindChangesBetweenServerSeqs(ctx, docRefKey, 1, 10) assert.NoError(t, err) maxLamport := int64(0) @@ -581,7 +586,8 @@ func RunFindLatestChangeInfoTest(t *testing.T, maxLamport = ch.ID().Lamport() } } - // 02-2. Find latest changeInfo and compare results. + + // 04. Find the latest change info by actor before the given server sequence. latestChangeInfo, err := db.FindLatestChangeInfoByActor( ctx, docRefKey,