Skip to content

Commit

Permalink
Add status change event
Browse files Browse the repository at this point in the history
Will propagated to client via SSE in ith own channel, to inform clients about CRC status change.
Status modified in "status_change_stream.go", as current status implementation has no single point of change, so tracking transition between states is challenging.

Signed-off-by: Yevhen Vydolob <[email protected]>
  • Loading branch information
evidolob committed Aug 7, 2024
1 parent 6d61cbf commit 1029bb7
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/crc/api/events/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewEventServer(machine machine.Client) *EventServer {

sseServer.CreateStream(Logs)
sseServer.CreateStream(ClusterLoad)
sseServer.CreateStream(StatusChange)
return eventServer
}

Expand All @@ -69,6 +70,8 @@ func createEventStream(server *EventServer, streamID string) EventStream {
return newLogsStream(server)
case ClusterLoad:
return newClusterLoadStream(server)
case StatusChange:
return newStatusChangeStream(server)
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/crc/api/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package events
import "github.com/r3labs/sse/v2"

const (
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
)

type EventPublisher interface {
Expand Down
69 changes: 69 additions & 0 deletions pkg/crc/api/events/status_change_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events

import (
"encoding/json"

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/crc-org/crc/v2/pkg/crc/machine"
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
"github.com/crc-org/crc/v2/pkg/events"
"github.com/r3labs/sse/v2"
)

type serializableEvent struct {
Status *types.ClusterStatusResult `json:"status"`
Error string `json:"error,omitempty"`
}

type statusChangeListener struct {
machineClient machine.Client
publisher EventPublisher
}

func newStatusChangeStream(server *EventServer) EventStream {
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
}

func newStatusChangeListener(client machine.Client) EventProducer {
return &statusChangeListener{
machineClient: client,
}
}

func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) {
logging.Debugf("State Changed Event %s", changedEvent)
var event serializableEvent
status, err := st.machineClient.Status()
// if we cannot receive actual state, send error state with error description
if err != nil {
event = serializableEvent{Status: &types.ClusterStatusResult{
CrcStatus: state.Error,
}, Error: err.Error()}
} else {
// event could be fired, before actual code, which change state is called
// so status could contain 'old' state, replace it with state received in event
status.CrcStatus = changedEvent.State // override with actual reported state
event = serializableEvent{Status: status}
if changedEvent.Error != nil {
event.Error = changedEvent.Error.Error()
}

}
data, err := json.Marshal(event)
if err != nil {
logging.Errorf("Could not serealize status changed event in to JSON: %s", err)
return
}
st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
}

func (st *statusChangeListener) Start(publisher EventPublisher) {
st.publisher = publisher
events.StatusChanged.AddListener(st)

}

func (st *statusChangeListener) Stop() {
events.StatusChanged.RemoveListener(st)
}
28 changes: 27 additions & 1 deletion pkg/crc/machine/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
"github.com/crc-org/crc/v2/pkg/events"
)

const startCancelTimeout = 15 * time.Second
Expand Down Expand Up @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {

err := s.underlying.Delete()
s.syncOperationDone <- Deleting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM})
}
return err
}

Expand All @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
}
s.startCancel = startCancel
s.currentState = Starting
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})

return nil
}
Expand All @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)

startResult, err := s.underlying.Start(ctx, startConfig)
s.syncOperationDone <- Starting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return startResult, err
}

Expand Down Expand Up @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
if err := s.prepareStopDelete(Stopping); err != nil {
return state.Error, err
}
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})

st, err := s.underlying.Stop()
s.syncOperationDone <- Stopping

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}
return st, err
}

Expand All @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
}

func (s *Synchronized) PowerOff() error {
return s.underlying.PowerOff()
err := s.underlying.PowerOff()
if err != nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return err
}

func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
)

type StatusChangedEvent struct {
State state.State
Error error
}

var (
StatusChanged = NewEvent[StatusChangedEvent]()
)

0 comments on commit 1029bb7

Please sign in to comment.