From 1029bb760c0dcd7a80a804b1b8212dd0732f6ead Mon Sep 17 00:00:00 2001 From: Yevhen Vydolob Date: Thu, 7 Dec 2023 12:46:35 +0200 Subject: [PATCH] Add status change event Will propagated to client via SSE in ith own channel, to inform clients about CRC status change. Status modified in "status_change_stream.go", as current status implementation has no single point of change, so tracking transition between states is challenging. Signed-off-by: Yevhen Vydolob --- pkg/crc/api/events/event_server.go | 3 + pkg/crc/api/events/events.go | 5 +- pkg/crc/api/events/status_change_stream.go | 69 ++++++++++++++++++++++ pkg/crc/machine/sync.go | 28 ++++++++- pkg/events/events.go | 14 +++++ 5 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 pkg/crc/api/events/status_change_stream.go create mode 100644 pkg/events/events.go diff --git a/pkg/crc/api/events/event_server.go b/pkg/crc/api/events/event_server.go index 4943a37558..1a7e5155c8 100644 --- a/pkg/crc/api/events/event_server.go +++ b/pkg/crc/api/events/event_server.go @@ -56,6 +56,7 @@ func NewEventServer(machine machine.Client) *EventServer { sseServer.CreateStream(Logs) sseServer.CreateStream(ClusterLoad) + sseServer.CreateStream(StatusChange) return eventServer } @@ -69,6 +70,8 @@ func createEventStream(server *EventServer, streamID string) EventStream { return newLogsStream(server) case ClusterLoad: return newClusterLoadStream(server) + case StatusChange: + return newStatusChangeStream(server) } return nil } diff --git a/pkg/crc/api/events/events.go b/pkg/crc/api/events/events.go index 0fe954fc3b..027d35d7da 100644 --- a/pkg/crc/api/events/events.go +++ b/pkg/crc/api/events/events.go @@ -3,8 +3,9 @@ package events import "github.com/r3labs/sse/v2" const ( - Logs = "logs" // Logs event channel, contains daemon logs - ClusterLoad = "cluster_load" // status event channel, contains VM load info + Logs = "logs" // Logs event channel, contains daemon logs + ClusterLoad = "cluster_load" // status event channel, contains VM load info + StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc ) type EventPublisher interface { diff --git a/pkg/crc/api/events/status_change_stream.go b/pkg/crc/api/events/status_change_stream.go new file mode 100644 index 0000000000..42d3daeb2f --- /dev/null +++ b/pkg/crc/api/events/status_change_stream.go @@ -0,0 +1,69 @@ +package events + +import ( + "encoding/json" + + "github.com/crc-org/crc/v2/pkg/crc/logging" + "github.com/crc-org/crc/v2/pkg/crc/machine" + "github.com/crc-org/crc/v2/pkg/crc/machine/state" + "github.com/crc-org/crc/v2/pkg/crc/machine/types" + "github.com/crc-org/crc/v2/pkg/events" + "github.com/r3labs/sse/v2" +) + +type serializableEvent struct { + Status *types.ClusterStatusResult `json:"status"` + Error string `json:"error,omitempty"` +} + +type statusChangeListener struct { + machineClient machine.Client + publisher EventPublisher +} + +func newStatusChangeStream(server *EventServer) EventStream { + return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer)) +} + +func newStatusChangeListener(client machine.Client) EventProducer { + return &statusChangeListener{ + machineClient: client, + } +} + +func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) { + logging.Debugf("State Changed Event %s", changedEvent) + var event serializableEvent + status, err := st.machineClient.Status() + // if we cannot receive actual state, send error state with error description + if err != nil { + event = serializableEvent{Status: &types.ClusterStatusResult{ + CrcStatus: state.Error, + }, Error: err.Error()} + } else { + // event could be fired, before actual code, which change state is called + // so status could contain 'old' state, replace it with state received in event + status.CrcStatus = changedEvent.State // override with actual reported state + event = serializableEvent{Status: status} + if changedEvent.Error != nil { + event.Error = changedEvent.Error.Error() + } + + } + data, err := json.Marshal(event) + if err != nil { + logging.Errorf("Could not serealize status changed event in to JSON: %s", err) + return + } + st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data}) +} + +func (st *statusChangeListener) Start(publisher EventPublisher) { + st.publisher = publisher + events.StatusChanged.AddListener(st) + +} + +func (st *statusChangeListener) Stop() { + events.StatusChanged.RemoveListener(st) +} diff --git a/pkg/crc/machine/sync.go b/pkg/crc/machine/sync.go index 6da384b415..fe53086d1b 100644 --- a/pkg/crc/machine/sync.go +++ b/pkg/crc/machine/sync.go @@ -10,6 +10,7 @@ import ( "github.com/crc-org/crc/v2/pkg/crc/machine/state" "github.com/crc-org/crc/v2/pkg/crc/machine/types" crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset" + "github.com/crc-org/crc/v2/pkg/events" ) const startCancelTimeout = 15 * time.Second @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error { err := s.underlying.Delete() s.syncOperationDone <- Deleting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM}) + } return err } @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error { } s.startCancel = startCancel s.currentState = Starting + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting}) return nil } @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig) startResult, err := s.underlying.Start(ctx, startConfig) s.syncOperationDone <- Starting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + return startResult, err } @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) { if err := s.prepareStopDelete(Stopping); err != nil { return state.Error, err } + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping}) st, err := s.underlying.Stop() s.syncOperationDone <- Stopping + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: st}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } return st, err } @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) { } func (s *Synchronized) PowerOff() error { - return s.underlying.PowerOff() + err := s.underlying.PowerOff() + if err != nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + + return err } func (s *Synchronized) Status() (*types.ClusterStatusResult, error) { diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000..42a9865c7b --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,14 @@ +package events + +import ( + "github.com/crc-org/crc/v2/pkg/crc/machine/state" +) + +type StatusChangedEvent struct { + State state.State + Error error +} + +var ( + StatusChanged = NewEvent[StatusChangedEvent]() +)