Skip to content

Commit

Permalink
Optimize document detachment in Cluster Server (#1055)
Browse files Browse the repository at this point in the history
This change significantly improves document detachment performance by optimizing
the presence clear event generation process. The execution time has been reduced
from ~150ms to ~30ms per document.

Key improvements:
- Eliminated document construction overhead by generating changes directly.
- Added index on change collection for `{doc_id, project_id, actor_id, server_seq}`.
- Implemented direct change generation based on client's latestChangeInfo.
- Streamlined the presence clear event generation process.

Note: This optimization introduces a known limitation where Lamport timestamps
and version vectors may differ from the actual client document state. This
issue is being tracked separately.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
raararaara and hackerwins authored Nov 8, 2024
1 parent 6547bf8 commit 38526ff
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 13 deletions.
8 changes: 8 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ type Database interface {
docRefKey types.DocRefKey,
) error

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*ChangeInfo, error)

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
31 changes: 31 additions & 0 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,37 @@ func (d *DB) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (d *DB) FindLatestChangeInfoByActor(
_ context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*database.ChangeInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

iterator, err := txn.ReverseLowerBound(
tblChanges,
"doc_id_actor_id_server_seq",
docRefKey.DocID.String(),
actorID.String(),
serverSeq,
)
if err != nil {
return nil, fmt.Errorf("fetch changes of %s: %w", actorID, err)
}

for raw := iterator.Next(); raw != nil; raw = iterator.Next() {
info := raw.(*database.ChangeInfo)
if info != nil && info.ActorID == actorID {
return info, nil
}
}

return nil, database.ErrChangeNotFound
}

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
func (d *DB) FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/memory/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func TestDB(t *testing.T) {
testcases.RunFindChangeInfosBetweenServerSeqsTest(t, db, projectID)
})

t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) {
testcases.RunFindLatestChangeInfoTest(t, db, projectID)
})

t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) {
testcases.RunFindClosestSnapshotInfoTest(t, db, projectID)
})
Expand Down
10 changes: 10 additions & 0 deletions server/backend/database/memory/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ var schema = &memdb.DBSchema{
},
},
},
"doc_id_actor_id_server_seq": {
Name: "doc_id_actor_id_server_seq",
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "DocID"},
&memdb.StringFieldIndex{Field: "ActorID"},
&memdb.IntFieldIndex{Field: "ServerSeq"},
},
},
},
},
},
tblSnapshots: {
Expand Down
35 changes: 35 additions & 0 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,41 @@ func (c *Client) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (c *Client) FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*database.ChangeInfo, error) {
option := options.FindOne().SetSort(bson.M{
"server_seq": -1,
})

result := c.collection(ColChanges).FindOne(ctx, bson.M{
"project_id": docRefKey.ProjectID,
"doc_id": docRefKey.DocID,
"actor_id": actorID,
"server_seq": bson.M{
"$lte": serverSeq,
},
}, option)

changeInfo := &database.ChangeInfo{}
if result.Err() == mongo.ErrNoDocuments {
return changeInfo, nil
}
if result.Err() != nil {
return nil, fmt.Errorf("find change: %w", result.Err())
}

if err := result.Decode(changeInfo); err != nil {
return nil, fmt.Errorf("decode change: %w", err)
}

return changeInfo, nil
}

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
func (c *Client) FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func TestClient(t *testing.T) {
testcases.RunFindChangeInfosBetweenServerSeqsTest(t, cli, dummyProjectID)
})

t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) {
testcases.RunFindLatestChangeInfoTest(t, cli, dummyProjectID)
})

t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) {
testcases.RunFindClosestSnapshotInfoTest(t, cli, dummyProjectID)
})
Expand Down
8 changes: 8 additions & 0 deletions server/backend/database/mongo/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ var collectionInfos = []collectionInfo{
{Key: "server_seq", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}, {
Keys: bsonx.Doc{
{Key: "doc_id", Value: bsonx.Int32(1)}, // shard key
{Key: "project_id", Value: bsonx.Int32(1)},
{Key: "actor_id", Value: bsonx.Int32(1)},
{Key: "server_seq", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}},
}, {
name: ColSnapshots,
Expand Down
76 changes: 76 additions & 0 deletions server/backend/database/testcases/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,82 @@ func RunFindChangeInfosBetweenServerSeqsTest(
})
}

// RunFindLatestChangeInfoTest runs the FindLatestChangeInfoByActor test for the given db.
func RunFindLatestChangeInfoTest(t *testing.T,
db database.Database,
projectID types.ID,
) {
t.Run("store changes and find latest changeInfo test", func(t *testing.T) {
ctx := context.Background()

docKey := key.Key(fmt.Sprintf("tests$%s", t.Name()))

// 01. Activate client and find document info.
clientInfo, err := db.ActivateClient(ctx, projectID, t.Name())
assert.NoError(t, err)
docInfo, err := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true)
assert.NoError(t, err)
docRefKey := docInfo.RefKey()
assert.NoError(t, clientInfo.AttachDocument(docInfo.ID, false))
assert.NoError(t, db.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo))

initialServerSeq := docInfo.ServerSeq

// 02. Create a document and store changes.
bytesID, err := clientInfo.ID.Bytes()
assert.NoError(t, err)
actorID, _ := time.ActorIDFromBytes(bytesID)
assert.NoError(t, err)

doc := document.New(key.Key(t.Name()))
doc.SetActor(actorID)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("array")
return nil
}))
for idx := 0; idx < 5; idx++ {
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("array").AddInteger(idx)
return nil
}))
}
pack := doc.CreateChangePack()
for _, c := range pack.Changes {
serverSeq := docInfo.IncreaseServerSeq()
c.SetServerSeq(serverSeq)
}

assert.NoError(t, db.CreateChangeInfos(
ctx,
projectID,
docInfo,
initialServerSeq,
pack.Changes,
false,
))

// 03. Find all changes and determine the maximum Lamport timestamp.
changes, err := db.FindChangesBetweenServerSeqs(ctx, docRefKey, 1, 10)
assert.NoError(t, err)
maxLamport := int64(0)
for _, ch := range changes {
if maxLamport < ch.ID().Lamport() {
maxLamport = ch.ID().Lamport()
}
}

// 04. Find the latest change info by actor before the given server sequence.
latestChangeInfo, err := db.FindLatestChangeInfoByActor(
ctx,
docRefKey,
types.ID(actorID.String()),
10,
)
assert.NoError(t, err)
assert.Equal(t, maxLamport, latestChangeInfo.Lamport)
})
}

// RunFindClosestSnapshotInfoTest runs the FindClosestSnapshotInfo test for the given db.
func RunFindClosestSnapshotInfoTest(t *testing.T, db database.Database, projectID types.ID) {
t.Run("store and find snapshots test", func(t *testing.T) {
Expand Down
34 changes: 21 additions & 13 deletions server/rpc/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
Expand Down Expand Up @@ -93,20 +94,27 @@ func (s *clusterServer) DetachDocument(
return nil, err
}

// TODO(hackerwins): BuildDocForCheckpoint is expensive because it reads the entire document.
// We need to optimize this by creating a ChangePack directly.
// 01. Create ChangePack with clear presence.
doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID)
// 01. Create changePack with presence clear change
cp := clientInfo.Checkpoint(summary.ID)
latestChangeInfo, err := s.backend.DB.FindLatestChangeInfoByActor(
ctx,
docRefKey,
types.ID(req.Msg.ClientId),
cp.ServerSeq,
)
if err != nil {
return nil, err
}
changeCtx := change.NewContext(
change.NewID(cp.ClientSeq, cp.ServerSeq, latestChangeInfo.Lamport, actorID, latestChangeInfo.VersionVector).Next(),
"",
nil,
)
p := presence.New(changeCtx, innerpresence.NewPresence())
p.Clear()

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Clear()
return nil
}); err != nil {
return nil, err
}
changes := []*change.Change{changeCtx.ToChange()}
pack := change.NewPack(docInfo.Key, cp, changes, nil, nil)

// 02. PushPull with the created ChangePack.
if _, err := packs.PushPull(
Expand All @@ -115,9 +123,9 @@ func (s *clusterServer) DetachDocument(
project,
clientInfo,
docInfo,
doc.CreateChangePack(),
pack,
packs.PushPullOptions{
Mode: types.SyncModePushPull,
Mode: types.SyncModePushOnly,
Status: document.StatusDetached,
},
); err != nil {
Expand Down

0 comments on commit 38526ff

Please sign in to comment.