Skip to content

Commit

Permalink
Register custom encoder to store presence change as binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
chacha912 committed Nov 14, 2024
1 parent f8ebba2 commit 49657d0
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 56 deletions.
6 changes: 4 additions & 2 deletions api/converter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func TestConverter(t *testing.T) {
})

t.Run("empty presence converting test", func(t *testing.T) {
change, err := innerpresence.NewChangeFromJSON(`{"ChangeType":"put","Presence":{}}`)
assert.NoError(t, err)
change := &innerpresence.PresenceChange{
ChangeType: innerpresence.Put,
Presence: innerpresence.NewPresence(),
}

pbChange := converter.ToPresenceChange(change)
clone := converter.FromPresenceChange(pbChange)
Expand Down
22 changes: 18 additions & 4 deletions pkg/document/innerpresence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,28 @@ type PresenceChange struct {
Presence Presence `json:"presence"`
}

// NewChangeFromJSON creates a new instance of PresenceChange from JSON.
func NewChangeFromJSON(encodedChange string) (*PresenceChange, error) {
if encodedChange == "" {
// EncodeToBytes encodes the given presence change into bytes array.
func EncodeToBytes(p *PresenceChange) ([]byte, error) {
if p == nil {
return nil, nil
}

bytes, err := json.Marshal(p)
if err != nil {
return nil, fmt.Errorf("marshal presence change to bytes: %w", err)
}

return bytes, nil
}

// PresenceChangeFromBytes unmarshals the given bytes array into PresenceChange.
func PresenceChangeFromBytes(bytes []byte) (*PresenceChange, error) {
if bytes == nil {
return nil, nil
}

p := &PresenceChange{}
if err := json.Unmarshal([]byte(encodedChange), p); err != nil {
if err := json.Unmarshal(bytes, p); err != nil {
return nil, fmt.Errorf("unmarshal presence change: %w", err)
}

Expand Down
45 changes: 12 additions & 33 deletions server/backend/database/change_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package database

import (
"encoding/json"
"errors"
"fmt"

"google.golang.org/protobuf/proto"

Expand All @@ -40,17 +38,17 @@ var ErrDecodeOperationFailed = errors.New("decode operations failed")

// ChangeInfo is a structure representing information of a change.
type ChangeInfo struct {
ID types.ID `bson:"_id"`
ProjectID types.ID `bson:"project_id"`
DocID types.ID `bson:"doc_id"`
ServerSeq int64 `bson:"server_seq"`
ClientSeq uint32 `bson:"client_seq"`
Lamport int64 `bson:"lamport"`
ActorID types.ID `bson:"actor_id"`
VersionVector time.VersionVector `bson:"version_vector"`
Message string `bson:"message"`
Operations [][]byte `bson:"operations"`
PresenceChange string `bson:"presence_change"`
ID types.ID `bson:"_id"`
ProjectID types.ID `bson:"project_id"`
DocID types.ID `bson:"doc_id"`
ServerSeq int64 `bson:"server_seq"`
ClientSeq uint32 `bson:"client_seq"`
Lamport int64 `bson:"lamport"`
ActorID types.ID `bson:"actor_id"`
VersionVector time.VersionVector `bson:"version_vector"`
Message string `bson:"message"`
Operations [][]byte `bson:"operations"`
PresenceChange *innerpresence.PresenceChange `bson:"presence_change"`
}

// EncodeOperations encodes the given operations into bytes array.
Expand All @@ -73,20 +71,6 @@ func EncodeOperations(operations []operations.Operation) ([][]byte, error) {
return encodedOps, nil
}

// EncodePresenceChange encodes the given presence change into string.
func EncodePresenceChange(p *innerpresence.PresenceChange) (string, error) {
if p == nil {
return "", nil
}

bytes, err := json.Marshal(p)
if err != nil {
return "", fmt.Errorf("marshal presence change to bytes: %w", err)
}

return string(bytes), nil
}

// ToChange creates Change model from this ChangeInfo.
func (i *ChangeInfo) ToChange() (*change.Change, error) {
actorID, err := time.ActorIDFromHex(i.ActorID.String())
Expand All @@ -110,12 +94,7 @@ func (i *ChangeInfo) ToChange() (*change.Change, error) {
return nil, err
}

p, err := innerpresence.NewChangeFromJSON(i.PresenceChange)
if err != nil {
return nil, err
}

c := change.New(changeID, i.Message, ops, p)
c := change.New(changeID, i.Message, ops, i.PresenceChange)
c.SetServerSeq(i.ServerSeq)

return c, nil
Expand Down
6 changes: 1 addition & 5 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,10 +901,6 @@ func (d *DB) CreateChangeInfos(
if err != nil {
return err
}
encodedPresence, err := database.EncodePresenceChange(cn.PresenceChange())
if err != nil {
return err
}

if err := txn.Insert(tblChanges, &database.ChangeInfo{
ID: newID(),
Expand All @@ -917,7 +913,7 @@ func (d *DB) CreateChangeInfos(
VersionVector: cn.ID().VersionVector(),
Message: cn.Message(),
Operations: encodedOperations,
PresenceChange: encodedPresence,
PresenceChange: cn.PresenceChange(),
}); err != nil {
return fmt.Errorf("create change: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,6 @@ func (c *Client) CreateChangeInfos(
if err != nil {
return err
}
encodedPresence, err := database.EncodePresenceChange(cn.PresenceChange())
if err != nil {
return err
}

models = append(models, mongo.NewUpdateOneModel().SetFilter(bson.M{
"project_id": docRefKey.ProjectID,
Expand All @@ -882,7 +878,7 @@ func (c *Client) CreateChangeInfos(
"version_vector": cn.ID().VersionVector(),
"message": cn.Message(),
"operations": encodedOperations,
"presence_change": encodedPresence,
"presence_change": cn.PresenceChange(),
}}).SetUpsert(true))
}

Expand Down
60 changes: 60 additions & 0 deletions server/backend/database/mongo/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/time"
)

var tID = reflect.TypeOf(types.ID(""))
var tActorID = reflect.TypeOf(&time.ActorID{})
var tVersionVector = reflect.TypeOf(time.VersionVector{})
var tPresenceChange = reflect.TypeOf(&innerpresence.PresenceChange{})

// NewRegistryBuilder returns a new registry builder with the default encoder and decoder.
func NewRegistryBuilder() *bsoncodec.RegistryBuilder {
Expand All @@ -50,11 +52,13 @@ func NewRegistryBuilder() *bsoncodec.RegistryBuilder {
bsoncodec.NewStringCodec(bsonoptions.StringCodec().SetDecodeObjectIDAsHex(true)),
)
rb.RegisterTypeDecoder(tVersionVector, bsoncodec.ValueDecoderFunc(versionVectorDecoder))
rb.RegisterTypeDecoder(tPresenceChange, bsoncodec.ValueDecoderFunc(presenceChangeDecoder))

// Register the encoders for types.ID and time.ActorID.
rb.RegisterTypeEncoder(tID, bsoncodec.ValueEncoderFunc(idEncoder))
rb.RegisterTypeEncoder(tActorID, bsoncodec.ValueEncoderFunc(actorIDEncoder))
rb.RegisterTypeEncoder(tVersionVector, bsoncodec.ValueEncoderFunc(versionVectorEncoder))
rb.RegisterTypeEncoder(tPresenceChange, bsoncodec.ValueEncoderFunc(presenceChangeEncoder))

return rb
}
Expand Down Expand Up @@ -133,3 +137,59 @@ func versionVectorDecoder(_ bsoncodec.DecodeContext, vr bsonrw.ValueReader, val

return nil
}

func presenceChangeEncoder(_ bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error {
if !val.IsValid() || val.Type() != tPresenceChange {
return bsoncodec.ValueEncoderError{
Name: "presenceChangeEncoder", Types: []reflect.Type{tPresenceChange}, Received: val}
}

presenceChange := val.Interface().(*innerpresence.PresenceChange)
if presenceChange == nil {
if err := vw.WriteNull(); err != nil {
return fmt.Errorf("encode error: %w", err)
}
return nil
}

bytes, err := innerpresence.EncodeToBytes(presenceChange)
if err != nil {
return fmt.Errorf("encode error: %w", err)
}

if err := vw.WriteBinary(bytes); err != nil {
return fmt.Errorf("encode error: %w", err)
}

return nil
}

func presenceChangeDecoder(_ bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error {
if val.Type() != tPresenceChange {
return bsoncodec.ValueDecoderError{
Name: "presenceChangeDecoder", Types: []reflect.Type{tPresenceChange}, Received: val}
}

switch vrType := vr.Type(); vrType {
case bson.TypeNull:
if err := vr.ReadNull(); err != nil {
return fmt.Errorf("decode error: %w", err)
}
val.Set(reflect.Zero(tPresenceChange))
return nil
case bson.TypeBinary:
data, _, err := vr.ReadBinary()
if err != nil {
return fmt.Errorf("decode error: %w", err)
}

presenceChange, err := innerpresence.PresenceChangeFromBytes(data)
if err != nil {
return fmt.Errorf("decode error: %w", err)
}
val.Set(reflect.ValueOf(presenceChange))
return nil
default:
return fmt.Errorf("unsupported type: %v", vr.Type())
}
}
22 changes: 22 additions & 0 deletions server/backend/database/mongo/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend/database"
)
Expand Down Expand Up @@ -64,6 +65,27 @@ func TestRegistry(t *testing.T) {
assert.NoError(t, bson.UnmarshalWithRegistry(registry, data, &info))
assert.Equal(t, vector, info.VersionVector)
})

t.Run("presenceChange test", func(t *testing.T) {
presence := innerpresence.NewPresence()
presence.Set("color", "orange")
presenceChange := &innerpresence.PresenceChange{
ChangeType: innerpresence.Put,
Presence: presence,
}

data, err := bson.MarshalWithRegistry(registry, bson.M{
"presence_change": presenceChange,
})
assert.NoError(t, err)

info := struct {
PresenceChange *innerpresence.PresenceChange `bson:"presence_change"`
}{}
assert.NoError(t, bson.UnmarshalWithRegistry(registry, data, &info))

assert.Equal(t, presenceChange, info.PresenceChange)
})
}

func TestEncoder(t *testing.T) {
Expand Down
8 changes: 1 addition & 7 deletions server/packs/serverpacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend/database"
Expand Down Expand Up @@ -111,11 +110,6 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {
pbOps = append(pbOps, &pbOp)
}

p, err := innerpresence.NewChangeFromJSON(info.PresenceChange)
if err != nil {
return nil, err
}

pbChangeID, err := converter.ToChangeID(changeID)
if err != nil {
return nil, err
Expand All @@ -125,7 +119,7 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {
Id: pbChangeID,
Message: info.Message,
Operations: pbOps,
PresenceChange: converter.ToPresenceChange(p),
PresenceChange: converter.ToPresenceChange(info.PresenceChange),
})
}

Expand Down

0 comments on commit 49657d0

Please sign in to comment.