Skip to content

Commit

Permalink
Apply latest change info when generating change during clusterServer.…
Browse files Browse the repository at this point in the history
…detach
  • Loading branch information
raararaara committed Nov 6, 2024
1 parent 6e69b63 commit 267b9fc
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 4 deletions.
7 changes: 7 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ type Database interface {
docRefKey types.DocRefKey,
) error

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
) (*ChangeInfo, error)

// FindChangeInfoByServerSeq returns the change by the given server sequence.
FindChangeInfoByServerSeq(
ctx context.Context,
Expand Down
9 changes: 9 additions & 0 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,15 @@ func (d *DB) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (d *DB) FindLatestChangeInfoByActor(
_ context.Context,
_ types.DocRefKey,
_ types.ID,
) (*database.ChangeInfo, error) {
return nil, nil
}

// FindChangeInfoByServerSeq returns the change by the given server sequence.
func (d *DB) FindChangeInfoByServerSeq(
_ context.Context,
Expand Down
29 changes: 29 additions & 0 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,35 @@ func (c *Client) PurgeStaleChanges(
return nil
}

// FindLatestChangeInfoByActor returns the latest change created by given actorID.
func (c *Client) FindLatestChangeInfoByActor(
ctx context.Context,
docRefKey types.DocRefKey,
actorID types.ID,
) (*database.ChangeInfo, error) {
result := c.collection(ColChanges).FindOne(ctx, bson.M{
"project_id": docRefKey.ProjectID,
"doc_id": docRefKey.DocID,
"actor_id": actorID,
}, options.FindOne().SetSort(bson.M{
"lamport": -1,
}))

changeInfo := &database.ChangeInfo{}
if result.Err() == mongo.ErrNoDocuments {
return changeInfo, nil
}
if result.Err() != nil {
return nil, fmt.Errorf("find change: %w", result.Err())
}

if err := result.Decode(changeInfo); err != nil {
return nil, fmt.Errorf("decode change: %w", err)
}

return changeInfo, nil
}

// FindChangeInfoByServerSeq returns the change by the given server sequence.
func (c *Client) FindChangeInfoByServerSeq(
ctx context.Context,
Expand Down
6 changes: 6 additions & 0 deletions server/backend/database/mongo/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ var collectionInfos = []collectionInfo{
{Key: "server_seq", Value: bsonx.Int32(1)},
},
Options: options.Index().SetUnique(true),
}, {
Keys: bsonx.Doc{
{Key: "doc_id", Value: bsonx.Int32(1)}, // shard key
{Key: "project_id", Value: bsonx.Int32(1)},
{Key: "actor_id", Value: bsonx.Int32(1)},
},
}},
}, {
name: ColSnapshots,
Expand Down
9 changes: 5 additions & 4 deletions server/rpc/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package rpc

import (
"connectrpc.com/connect"
"context"

"connectrpc.com/connect"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
Expand Down Expand Up @@ -94,7 +96,7 @@ func (s *clusterServer) DetachDocument(

// 01. Create changePack with presence clear change
cp := clientInfo.Checkpoint(summary.ID)
latestChange, err := s.backend.DB.FindChangeInfoByServerSeq(ctx, docRefKey, cp.ServerSeq)
latestChange, err := s.backend.DB.FindLatestChangeInfoByActor(ctx, docRefKey, types.ID(req.Msg.ClientId))
if err != nil {
return nil, err
}
Expand All @@ -114,14 +116,13 @@ func (s *clusterServer) DetachDocument(
changes := []*change.Change{changeCtx.ToChange()}
pack := change.NewPack(docInfo.Key, cp, changes, nil, nil)

// 03. PushPull with the created ChangePack.
// 02. PushPull with the created ChangePack.
if _, err := packs.PushPull(
ctx,
s.backend,
project,
clientInfo,
docInfo,
//doc.CreateChangePack(),
pack,
packs.PushPullOptions{
Mode: types.SyncModePushPull,
Expand Down

0 comments on commit 267b9fc

Please sign in to comment.