Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize document detachment in Cluster Server #1055

Merged
merged 9 commits into from
Nov 8, 2024
8 changes: 8 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ type Database interface {
docRefKey types.DocRefKey,
) error

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*ChangeInfo, error)

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
31 changes: 31 additions & 0 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,37 @@ func (d *DB) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (d *DB) FindLatestChangeInfoByActor(
_ context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*database.ChangeInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

iterator, err := txn.ReverseLowerBound(
tblChanges,
"doc_id_actor_id_server_seq",
docRefKey.DocID.String(),
actorID.String(),
serverSeq,
)
if err != nil {
return nil, fmt.Errorf("fetch changes of %s: %w", actorID, err)
}

for raw := iterator.Next(); raw != nil; raw = iterator.Next() {
info := raw.(*database.ChangeInfo)
if info != nil && info.ActorID == actorID {
return info, nil
}
}

return nil, database.ErrChangeNotFound
}

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
func (d *DB) FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/memory/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func TestDB(t *testing.T) {
testcases.RunFindChangeInfosBetweenServerSeqsTest(t, db, projectID)
})

t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) {
testcases.RunFindLatestChangeInfoTest(t, db, projectID)
})

t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) {
testcases.RunFindClosestSnapshotInfoTest(t, db, projectID)
})
Expand Down
10 changes: 10 additions & 0 deletions server/backend/database/memory/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ var schema = &memdb.DBSchema{
},
},
},
"doc_id_actor_id_server_seq": {
Name: "doc_id_actor_id_server_seq",
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "DocID"},
&memdb.StringFieldIndex{Field: "ActorID"},
&memdb.IntFieldIndex{Field: "ServerSeq"},
},
},
},
},
},
tblSnapshots: {
Expand Down
35 changes: 35 additions & 0 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,41 @@ func (c *Client) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (c *Client) FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
serverSeq int64,
) (*database.ChangeInfo, error) {
option := options.FindOne().SetSort(bson.M{
"server_seq": -1,
})

result := c.collection(ColChanges).FindOne(ctx, bson.M{
"project_id": docRefKey.ProjectID,
"doc_id": docRefKey.DocID,
"actor_id": actorID,
"server_seq": bson.M{
"$lte": serverSeq,
},
}, option)

changeInfo := &database.ChangeInfo{}
if result.Err() == mongo.ErrNoDocuments {
return changeInfo, nil
}
if result.Err() != nil {
return nil, fmt.Errorf("find change: %w", result.Err())
}

if err := result.Decode(changeInfo); err != nil {
return nil, fmt.Errorf("decode change: %w", err)
}

return changeInfo, nil
}

// FindChangesBetweenServerSeqs returns the changes between two server sequences.
func (c *Client) FindChangesBetweenServerSeqs(
ctx context.Context,
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func TestClient(t *testing.T) {
testcases.RunFindChangeInfosBetweenServerSeqsTest(t, cli, dummyProjectID)
})

t.Run("RunFindLatestChangeInfoTest test", func(t *testing.T) {
testcases.RunFindLatestChangeInfoTest(t, cli, dummyProjectID)
})

t.Run("RunFindClosestSnapshotInfo test", func(t *testing.T) {
testcases.RunFindClosestSnapshotInfoTest(t, cli, dummyProjectID)
})
Expand Down
8 changes: 8 additions & 0 deletions server/backend/database/mongo/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ var collectionInfos = []collectionInfo{
{Key: "server_seq", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}, {
Keys: bsonx.Doc{
{Key: "doc_id", Value: bsonx.Int32(1)}, // shard key
{Key: "project_id", Value: bsonx.Int32(1)},
{Key: "actor_id", Value: bsonx.Int32(1)},
{Key: "server_seq", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}},
}, {
name: ColSnapshots,
Expand Down
76 changes: 76 additions & 0 deletions server/backend/database/testcases/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,82 @@ func RunFindChangeInfosBetweenServerSeqsTest(
})
}

// RunFindLatestChangeInfoTest runs the FindLatestChangeInfoByActor test for the given db.
func RunFindLatestChangeInfoTest(t *testing.T,
db database.Database,
projectID types.ID,
) {
t.Run("store changes and find latest changeInfo test", func(t *testing.T) {
ctx := context.Background()

docKey := key.Key(fmt.Sprintf("tests$%s", t.Name()))

// 01. Activate client and find document info.
clientInfo, err := db.ActivateClient(ctx, projectID, t.Name())
assert.NoError(t, err)
docInfo, err := db.FindDocInfoByKeyAndOwner(ctx, clientInfo.RefKey(), docKey, true)
assert.NoError(t, err)
docRefKey := docInfo.RefKey()
assert.NoError(t, clientInfo.AttachDocument(docInfo.ID, false))
assert.NoError(t, db.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo))

initialServerSeq := docInfo.ServerSeq

// 02. Create a document and store changes.
bytesID, err := clientInfo.ID.Bytes()
assert.NoError(t, err)
actorID, _ := time.ActorIDFromBytes(bytesID)
assert.NoError(t, err)

doc := document.New(key.Key(t.Name()))
doc.SetActor(actorID)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("array")
return nil
}))
for idx := 0; idx < 5; idx++ {
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("array").AddInteger(idx)
return nil
}))
}
pack := doc.CreateChangePack()
for _, c := range pack.Changes {
serverSeq := docInfo.IncreaseServerSeq()
c.SetServerSeq(serverSeq)
}

assert.NoError(t, db.CreateChangeInfos(
ctx,
projectID,
docInfo,
initialServerSeq,
pack.Changes,
false,
))

// 03. Find all changes and determine the maximum Lamport timestamp.
changes, err := db.FindChangesBetweenServerSeqs(ctx, docRefKey, 1, 10)
assert.NoError(t, err)
maxLamport := int64(0)
for _, ch := range changes {
if maxLamport < ch.ID().Lamport() {
maxLamport = ch.ID().Lamport()
}
}

// 04. Find the latest change info by actor before the given server sequence.
latestChangeInfo, err := db.FindLatestChangeInfoByActor(
ctx,
docRefKey,
types.ID(actorID.String()),
10,
)
assert.NoError(t, err)
assert.Equal(t, maxLamport, latestChangeInfo.Lamport)
})
}

// RunFindClosestSnapshotInfoTest runs the FindClosestSnapshotInfo test for the given db.
func RunFindClosestSnapshotInfoTest(t *testing.T, db database.Database, projectID types.ID) {
t.Run("store and find snapshots test", func(t *testing.T) {
Expand Down
34 changes: 21 additions & 13 deletions server/rpc/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
Expand Down Expand Up @@ -93,20 +94,27 @@ func (s *clusterServer) DetachDocument(
return nil, err
}

// TODO(hackerwins): BuildDocForCheckpoint is expensive because it reads the entire document.
// We need to optimize this by creating a ChangePack directly.
// 01. Create ChangePack with clear presence.
doc, err := packs.BuildDocForCheckpoint(ctx, s.backend, docInfo, clientInfo.Checkpoint(summary.ID), actorID)
// 01. Create changePack with presence clear change
cp := clientInfo.Checkpoint(summary.ID)
latestChangeInfo, err := s.backend.DB.FindLatestChangeInfoByActor(
ctx,
docRefKey,
types.ID(req.Msg.ClientId),
cp.ServerSeq,
)
if err != nil {
return nil, err
}
changeCtx := change.NewContext(
change.NewID(cp.ClientSeq, cp.ServerSeq, latestChangeInfo.Lamport, actorID, latestChangeInfo.VersionVector).Next(),
"",
nil,
)
p := presence.New(changeCtx, innerpresence.NewPresence())
p.Clear()

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Clear()
return nil
}); err != nil {
return nil, err
}
changes := []*change.Change{changeCtx.ToChange()}
pack := change.NewPack(docInfo.Key, cp, changes, nil, nil)

// 02. PushPull with the created ChangePack.
if _, err := packs.PushPull(
Expand All @@ -115,9 +123,9 @@ func (s *clusterServer) DetachDocument(
project,
clientInfo,
docInfo,
doc.CreateChangePack(),
pack,
packs.PushPullOptions{
Mode: types.SyncModePushPull,
Mode: types.SyncModePushOnly,
Status: document.StatusDetached,
},
); err != nil {
Expand Down
Loading