Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work be1 arnauds5 federation tokens exchange rumor #1975

Merged
merged 3 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,34 @@ func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
}
}

func (h *Handler) Handle(channelPath string, msg mmessage.Message,
func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
jsonData, err := base64.URLEncoding.DecodeString(msg.Data)
if err != nil {
return errors.NewInvalidMessageFieldError("failed to decode message data: %v", err)
}

err = h.schema.VerifyJSON(jsonData, validation.Data)
if err != nil {
return err
}

object, action, err := channel.GetObjectAndAction(jsonData)
if err != nil {
return err
}

if object != channel.FederationObject {
return errors.NewInvalidMessageFieldError("invalid object %v", object)
}

if action != channel.FederationActionTokensExchange {
return errors.NewInvalidActionError("invalid action %v", action)
}

return h.handleTokensExchange(msg, channelPath)
}

func (h *Handler) HandleWithSocket(channelPath string, msg mmessage.Message,
socket socket.Socket) error {
err := msg.VerifyMessage()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func Test_handleChannelFederation(t *testing.T) {

for _, arg := range args {
t.Run(arg.name, func(t *testing.T) {
err = federationHandler.Handle(arg.channelPath, arg.msg, &fakeSocket)
err = federationHandler.HandleWithSocket(arg.channelPath, arg.msg, &fakeSocket)
if arg.isError {
require.Error(t, err, arg.contains)
} else {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 21 additions & 10 deletions be1-go/internal/handler/method/publish/hpublish/publish.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hpublish

import (
"encoding/base64"
"encoding/json"
"github.com/rs/zerolog"
"popstellar/internal/errors"
Expand Down Expand Up @@ -28,7 +29,7 @@ type MessageHandler interface {
}

type FederationHandler interface {
Handle(channelPath string, msg mmessage.Message, socket socket.Socket) error
HandleWithSocket(channelPath string, msg mmessage.Message, socket socket.Socket) error
}

type Handler struct {
Expand Down Expand Up @@ -58,24 +59,34 @@ func (h *Handler) Handle(socket socket.Socket, msg []byte) (*int, error) {
}

// The federation handler need to have access to the socket and are not
// using rumors
// using rumors, except for tokens exchange
if strings.Contains(publish.Params.Channel, channel.Federation) {
err = h.federationHandler.Handle(publish.Params.Channel, publish.Params.Message, socket)
if err != nil {
return &publish.ID, err
}

socket.SendResult(publish.ID, nil, nil)
return nil, nil
err = h.federationHandler.HandleWithSocket(publish.Params.Channel, publish.Params.Message, socket)
} else {
err = h.messageHandler.Handle(publish.Params.Channel, publish.Params.Message, false)
}

err = h.messageHandler.Handle(publish.Params.Channel, publish.Params.Message, false)
if err != nil {
return &publish.ID, err
}

socket.SendResult(publish.ID, nil, nil)

jsonData, err := base64.URLEncoding.DecodeString(publish.Params.Message.Data)
if err != nil {
return &publish.ID, errors.NewInvalidMessageFieldError(
"failed to decode message data: %v", err)
}

object, action, err := channel.GetObjectAndAction(jsonData)
if err != nil {
return &publish.ID, err
}

if object == channel.FederationObject && action != channel.FederationActionTokensExchange {
return nil, nil
}

logger.Logger.Debug().Msgf("sender rumor need to add message %s", publish.Params.Message.MessageID)
nbMessagesInsideRumor, err := h.db.AddMessageToMyRumor(publish.Params.Message.MessageID)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func Test_Handle(t *testing.T) {
publishBuf = generator.NewPublishQuery(t, queryID, channelPath, msg)

//messageHandler.On("Handle", channelPath, msg, false).Return(nil).Once()
fHandler.On("Handle", channelPath, msg, socket).Return(nil).Once()
fHandler.On("HandleWithSocket", channelPath, msg, socket).Return(nil).Once()
db.On("AddMessageToMyRumor", msg.MessageID).Return(thresholdMessagesByRumor-1, nil).Once()

id, err = publishHandler.Handle(socket, publishBuf)
require.Nil(t, id)
Expand Down
21 changes: 11 additions & 10 deletions be1-go/internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func New(dbPath string, ownerPubKey kyber.Point, clientAddress, serverAddress st
return nil, err
}

// Create the greetserver handler
greetserverHandler := hgreetserver.New(conf, peers, log)

// Create the rumor state handler
rumorStateHandler := hrumorstate.New(queries, sockets, &db, log)

// Create the federation handler
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db,
rumorStateHandler, greetserverHandler, schemaValidator, log)

// Create the message channel handlers
channelHandlers := make(hmessage.ChannelHandlers)
channelHandlers[channel.RootObject] = hroot.New(conf, &db, subs, peers, schemaValidator, log)
Expand All @@ -175,23 +185,14 @@ func New(dbPath string, ownerPubKey kyber.Point, clientAddress, serverAddress st
channelHandlers[channel.ChirpObject] = hchirp.New(conf, subs, &db, schemaValidator, log)
channelHandlers[channel.ReactionObject] = hreaction.New(subs, &db, schemaValidator, log)
channelHandlers[channel.CoinObject] = hcoin.New(subs, &db, schemaValidator, log)
channelHandlers[channel.FederationObject] = federationHandler

// Create the message handler
msgHandler := hmessage.New(&db, channelHandlers, log)

// Create the greetserver handler
greetserverHandler := hgreetserver.New(conf, peers, log)

// Create the rumor handler
rumorHandler := hrumor.New(queries, sockets, &db, msgHandler, log)

// Create the rumor state handler
rumorStateHandler := hrumorstate.New(queries, sockets, &db, log)

// Create the federation handler
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db,
rumorStateHandler, greetserverHandler, schemaValidator, log)

// Create the query handler
methodHandlers := make(hquery.MethodHandlers)
methodHandlers[mquery.MethodCatchUp] = hcatchup.New(&db, log)
Expand Down
Loading