Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some delay before servers are sync #1970

Merged
merged 4 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 73 additions & 28 deletions be1-go/internal/handler/channel/federation/hfederation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Hub interface {
type Subscribers interface {
BroadcastToAllClients(msg mmessage.Message, channel string) error
AddChannel(channel string) error
HasChannel(channel string) bool
Subscribe(channel string, socket socket.Socket) error
SendToAll(buf []byte, channel string) error
}
Expand Down Expand Up @@ -366,42 +367,58 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
return err
}

err = h.db.StoreMessageAndData(channelPath, resultMsg)
if err != nil {
return err
}

remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil {
// In the edge case where the two LAOs are on the same server, the
// result message would already be stored and handleResult will not be
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(channelPath, resultMsg)
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)
_ = h.subs.BroadcastToAllClients(resultMsg, channelPath)

h.log.Info().Msgf("A federation was created with the local LAO %s",
federationExpect.LaoId)
} else {
// Add the socket to the list of server sockets
h.sockets.Upsert(socket)

// Send the rumor state directly to avoid delay while syncing
err = h.rumors.SendRumorStateTo(socket)
if err != nil {
return err
}

// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}
h.log.Info().Msgf("A federation was created with the LAO %s from: %s",
federationExpect.LaoId, federationExpect.ServerAddress)
return nil
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)

// Send the rumor state directly to avoid delay while syncing
err = h.rumors.SendRumorStateTo(socket)
if err != nil {
return err
}

// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}
h.log.Info().Msgf("A federation was created with the LAO %s from: %s",
federationExpect.LaoId, federationExpect.ServerAddress)

if h.subs.HasChannel(remoteChannel) {
_ = h.db.StoreMessageAndData(channelPath, resultMsg)

// If the server was already sync, no need to add a goroutine
return h.subs.BroadcastToAllClients(msg, channelPath)
}

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
go func() {
remoteLaoChannel := fmt.Sprintf("/root/%s", federationExpect.LaoId)

// wait until the remote channel is available to be subscribed on
h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30)

_ = h.db.StoreMessageAndData(channelPath, resultMsg)

// broadcast the FederationResult to the local organizer
_ = h.subs.BroadcastToAllClients(resultMsg, channelPath)
}()
return nil
}

func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {
Expand Down Expand Up @@ -445,17 +462,29 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {

// try to get a matching FederationInit, if found then we know that
// the local organizer was waiting this result
_, err = h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath)
federationInit, err := h.db.GetFederationInit(organizerPk, result.ChallengeMsg.Sender, federationChallenge, channelPath)
if err != nil {
return err
}

err = h.db.StoreMessageAndData(channelPath, msg)
if err != nil {
return err
remoteLaoChannel := fmt.Sprintf("/root/%s", federationInit.LaoId)
if h.subs.HasChannel(remoteLaoChannel) {
_ = h.db.StoreMessageAndData(channelPath, msg)

// If the server was already sync, no need to add a goroutine
return h.subs.BroadcastToAllClients(msg, channelPath)
}

return h.subs.BroadcastToAllClients(msg, channelPath)
go func() {
// wait until the remote channel is available to be subscribed on
h.waitSyncOrTimeout(remoteLaoChannel, time.Second*30)

_ = h.db.StoreMessageAndData(channelPath, msg)

_ = h.subs.BroadcastToAllClients(msg, channelPath)
}()

return nil
}

func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error {
Expand Down Expand Up @@ -618,3 +647,19 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string,
socket.Send(publishBytes)
return nil
}

// waitSyncOrTimeout will wait at most maxTime or until the channel is created
func (h *Handler) waitSyncOrTimeout(channelPath string, maxTime time.Duration) {
timeout := time.NewTimer(maxTime)
for {
select {
case <-timeout.C:
return
case <-time.After(time.Second):
if h.subs.HasChannel(channelPath) {
h.log.Info().Msgf("channel %s exists", channelPath)
return
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,15 @@ func Test_handleFederationChallenge(t *testing.T) {
laoID := "lsWUv1bKBQ0t1DqWZTFwb0nhLsP_EtfGoXHny4hsrwA="
laoID2 := "OWY4NmQwODE4ODRjN2Q2NTlhMmZlYWEwYzU1YWQwMQ=="
laoPath := fmt.Sprintf("/root/%s", laoID)
laoPath2 := fmt.Sprintf("/root/%s", laoID2)
channelPath := fmt.Sprintf("/root/%s/federation", laoID)
channelPath2 := fmt.Sprintf("/root/%s/federation", laoID2)

err = subs.AddChannel(laoPath)
require.NoError(t, err)
err = subs.AddChannel(laoPath2)
require.NoError(t, err)

err = subs.AddChannel(channelPath)
require.NoError(t, err)
err = subs.AddChannel(channelPath2)
Expand All @@ -715,8 +721,6 @@ func Test_handleFederationChallenge(t *testing.T) {
require.NoError(t, err)

fakeSocket2 := mock2.FakeSocket{Id: "2"}
err = subs.Subscribe(channelPath2, &fakeSocket2)
require.NoError(t, err)

rumors.On("SendRumorStateTo", &fakeSocket2).Return(nil)

Expand Down Expand Up @@ -770,10 +774,9 @@ func Test_handleFederationChallenge(t *testing.T) {
err = json.Unmarshal(fakeSocket2.Msg, &publishMsg)
require.NoError(t, err)
require.Equal(t, mquery.MethodPublish, publishMsg.Method)
require.Equal(t, broadcastMsg.Params.Message, publishMsg.Params.Message)

var resultMsg mfederation.FederationResult
err = broadcastMsg.Params.Message.UnmarshalData(&resultMsg)
err = publishMsg.Params.Message.UnmarshalData(&resultMsg)
require.NoError(t, err)

// it should contain the challenge from organizer, not organizer2
Expand Down Expand Up @@ -815,6 +818,9 @@ func Test_handleFederationResult(t *testing.T) {
laoPath := fmt.Sprintf("/root/%s", laoID)
channelPath := fmt.Sprintf("/root/%s/federation", laoID)

err = subs.AddChannel(laoPath)
require.NoError(t, err)

err = subs.AddChannel(channelPath)
require.NoError(t, err)

Expand Down
Loading