diff --git a/server/backend/database/database.go b/server/backend/database/database.go index fb4d0dae8..0b846e502 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -219,6 +219,14 @@ type Database interface { docRefKey types.DocRefKey, ) error + // FindLatestChangeInfoByActor returns the latest change created by given actorID. + FindLatestChangeInfoByActor( + ctx context.Context, + docRefKey types.DocRefKey, + actorID types.ID, + serverSeq int64, + ) (*ChangeInfo, error) + // FindChangesBetweenServerSeqs returns the changes between two server sequences. FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 98d17d833..eda760ea6 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1012,6 +1012,37 @@ func (d *DB) PurgeStaleChanges( return nil } +// FindLatestChangeInfoByActor returns the latest change created by given actorID. +func (d *DB) FindLatestChangeInfoByActor( + _ context.Context, + docRefKey types.DocRefKey, + actorID types.ID, + serverSeq int64, +) (*database.ChangeInfo, error) { + txn := d.db.Txn(false) + defer txn.Abort() + + iterator, err := txn.ReverseLowerBound( + tblChanges, + "doc_id_actor_id_server_seq", + docRefKey.DocID.String(), + actorID.String(), + serverSeq, + ) + if err != nil { + return nil, fmt.Errorf("fetch changes of %s: %w", actorID, err) + } + + for raw := iterator.Next(); raw != nil; raw = iterator.Next() { + info := raw.(*database.ChangeInfo) + if info != nil && info.ActorID == actorID { + return info, nil + } + } + + return nil, database.ErrChangeNotFound +} + // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (d *DB) FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/memory/database_test.go b/server/backend/database/memory/database_test.go index d308940a7..a6a59104c 100644 --- a/server/backend/database/memory/database_test.go +++ b/server/backend/database/memory/database_test.go @@ -64,6 +64,10 @@ func TestDB(t *testing.T) { testcases.RunFindChangeInfosBetweenServerSeqsTest(t, db, projectID) }) + t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) { + testcases.RunFindLatestChangeInfoTest(t, db, projectID) + }) + t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) { testcases.RunFindClosestSnapshotInfoTest(t, db, projectID) }) diff --git a/server/backend/database/memory/indexes.go b/server/backend/database/memory/indexes.go index 2a51f82e8..4c30d7041 100644 --- a/server/backend/database/memory/indexes.go +++ b/server/backend/database/memory/indexes.go @@ -167,6 +167,16 @@ var schema = &memdb.DBSchema{ }, }, }, + "doc_id_actor_id_server_seq": { + Name: "doc_id_actor_id_server_seq", + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.StringFieldIndex{Field: "ActorID"}, + &memdb.IntFieldIndex{Field: "ServerSeq"}, + }, + }, + }, }, }, tblSnapshots: { diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 29208624d..fabb06f15 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -977,6 +977,41 @@ func (c *Client) PurgeStaleChanges( return nil } +// FindLatestChangeInfoByActor returns the latest change created by given actorID. +func (c *Client) FindLatestChangeInfoByActor( + ctx context.Context, + docRefKey types.DocRefKey, + actorID types.ID, + serverSeq int64, +) (*database.ChangeInfo, error) { + option := options.FindOne().SetSort(bson.M{ + "server_seq": -1, + }) + + result := c.collection(ColChanges).FindOne(ctx, bson.M{ + "project_id": docRefKey.ProjectID, + "doc_id": docRefKey.DocID, + "actor_id": actorID, + "server_seq": bson.M{ + "$lte": serverSeq, + }, + }, option) + + changeInfo := &database.ChangeInfo{} + if result.Err() == mongo.ErrNoDocuments { + return changeInfo, nil + } + if result.Err() != nil { + return nil, fmt.Errorf("find change: %w", result.Err()) + } + + if err := result.Decode(changeInfo); err != nil { + return nil, fmt.Errorf("decode change: %w", err) + } + + return changeInfo, nil +} + // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (c *Client) FindChangesBetweenServerSeqs( ctx context.Context, diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index 6e0803b5d..15749678d 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -80,6 +80,10 @@ func TestClient(t *testing.T) { testcases.RunFindChangeInfosBetweenServerSeqsTest(t, cli, dummyProjectID) }) + t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) { + testcases.RunFindLatestChangeInfoTest(t, cli, dummyProjectID) + }) + t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) { testcases.RunFindClosestSnapshotInfoTest(t, cli, dummyProjectID) }) diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index 93a4a75f0..578c8bf38 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -127,6 +127,14 @@ var collectionInfos = []collectionInfo{ {Key: "server_seq", Value: bsonx.Int32(1)}, }, Options: options.Index().SetUnique(true), + }, { + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, // shard key + {Key: "project_id", Value: bsonx.Int32(1)}, + {Key: "actor_id", Value: bsonx.Int32(1)}, + {Key: "server_seq", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), }}, }, { name: ColSnapshots, diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index 36f3f00c7..8f363db2f 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -523,6 +523,82 @@ func RunFindChangeInfosBetweenServerSeqsTest( }) } +// RunFindLatestChangeInfoTest runs the FindLatestChangeInfoByActor test for the given db. +func RunFindLatestChangeInfoTest(t *testing.T, + db database.Database, + projectID types.ID, +) { + t.Run("store changes and find latest changeInfo test", func(t *testing.T) { + ctx := context.Background() + + docKey := key.Key(fmt.Sprintf("tests$%s", t.Name())) + + // 01. Activate client and find document info. + clientInfo, err := db.ActivateClient(ctx, projectID, t.Name()) + assert.NoError(t, err) + docInfo, err := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true) + assert.NoError(t, err) + docRefKey := docInfo.RefKey() + assert.NoError(t, clientInfo.AttachDocument(docInfo.ID, false)) + assert.NoError(t, db.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo)) + + initialServerSeq := docInfo.ServerSeq + + // 02. Create a document and store changes. + bytesID, err := clientInfo.ID.Bytes() + assert.NoError(t, err) + actorID, _ := time.ActorIDFromBytes(bytesID) + assert.NoError(t, err) + + doc := document.New(key.Key(t.Name())) + doc.SetActor(actorID) + assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { + root.SetNewArray("array") + return nil + })) + for idx := 0; idx < 5; idx++ { + assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { + root.GetArray("array").AddInteger(idx) + return nil + })) + } + pack := doc.CreateChangePack() + for _, c := range pack.Changes { + serverSeq := docInfo.IncreaseServerSeq() + c.SetServerSeq(serverSeq) + } + + assert.NoError(t, db.CreateChangeInfos( + ctx, + projectID, + docInfo, + initialServerSeq, + pack.Changes, + false, + )) + + // 03. Find all changes and determine the maximum Lamport timestamp. + changes, err := db.FindChangesBetweenServerSeqs(ctx, docRefKey, 1, 10) + assert.NoError(t, err) + maxLamport := int64(0) + for _, ch := range changes { + if maxLamport < ch.ID().Lamport() { + maxLamport = ch.ID().Lamport() + } + } + + // 04. Find the latest change info by actor before the given server sequence. + latestChangeInfo, err := db.FindLatestChangeInfoByActor( + ctx, + docRefKey, + types.ID(actorID.String()), + 10, + ) + assert.NoError(t, err) + assert.Equal(t, maxLamport, latestChangeInfo.Lamport) + }) +} + // RunFindClosestSnapshotInfoTest runs the FindClosestSnapshotInfo test for the given db. func RunFindClosestSnapshotInfoTest(t *testing.T, db database.Database, projectID types.ID) { t.Run("store and find snapshots test", func(t *testing.T) { diff --git a/server/rpc/cluster_server.go b/server/rpc/cluster_server.go index 3d86178c0..bf3e001a8 100644 --- a/server/rpc/cluster_server.go +++ b/server/rpc/cluster_server.go @@ -25,7 +25,8 @@ import ( "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/pkg/document" - "github.com/yorkie-team/yorkie/pkg/document/json" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/pkg/document/innerpresence" "github.com/yorkie-team/yorkie/pkg/document/presence" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend" @@ -93,20 +94,27 @@ func (s *clusterServer) DetachDocument( return nil, err } - // TODO(hackerwins): BuildDocForCheckpoint is expensive because it reads the entire document. - // We need to optimize this by creating a ChangePack directly. - // 01. Create ChangePack with clear presence. - doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID) + // 01. Create changePack with presence clear change + cp := clientInfo.Checkpoint(summary.ID) + latestChangeInfo, err := s.backend.DB.FindLatestChangeInfoByActor( + ctx, + docRefKey, + types.ID(req.Msg.ClientId), + cp.ServerSeq, + ) if err != nil { return nil, err } + changeCtx := change.NewContext( + change.NewID(cp.ClientSeq, cp.ServerSeq, latestChangeInfo.Lamport, actorID, latestChangeInfo.VersionVector).Next(), + "", + nil, + ) + p := presence.New(changeCtx, innerpresence.NewPresence()) + p.Clear() - if err := doc.Update(func(root *json.Object, p *presence.Presence) error { - p.Clear() - return nil - }); err != nil { - return nil, err - } + changes := []*change.Change{changeCtx.ToChange()} + pack := change.NewPack(docInfo.Key, cp, changes, nil, nil) // 02. PushPull with the created ChangePack. if _, err := packs.PushPull( @@ -115,9 +123,9 @@ func (s *clusterServer) DetachDocument( project, clientInfo, docInfo, - doc.CreateChangePack(), + pack, packs.PushPullOptions{ - Mode: types.SyncModePushPull, + Mode: types.SyncModePushOnly, Status: document.StatusDetached, }, ); err != nil {