From b3390d5179c6303c65a54bc9a10a32fdb9085d26 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 11 Mar 2024 10:59:40 +0000 Subject: [PATCH] Add support for stop sessions mist call (#1169) --- clients/mist_client.go | 19 +++++++++++++++++++ clients/mist_client_test.go | 4 ++++ events/events.go | 13 +++++++++++++ handlers/schemas/Event.yaml | 1 + main.go | 4 ++++ mapic/mistapiconnector_app.go | 14 ++++++++++++++ 6 files changed, 55 insertions(+) diff --git a/clients/mist_client.go b/clients/mist_client.go index 7af841492..f7b9e816b 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -26,6 +26,7 @@ type MistAPIClient interface { InvalidateSessions(streamName string) error DeleteStream(streamName string) error NukeStream(streamName string) error + StopSessions(streamName string) error AddTrigger(streamName []string, triggerName string, sync bool) error DeleteTrigger(streamName []string, triggerName string) error GetStreamInfo(streamName string) (MistStreamInfo, error) @@ -271,6 +272,14 @@ func (mc *MistClient) NukeStream(streamName string) error { return nil } +func (mc *MistClient) StopSessions(streamName string) error { + c := commandStopSessions(streamName) + if err := validateAuth(mc.sendCommand(c)); err != nil { + return err + } + return nil +} + // AddTrigger adds a trigger `triggerName` for the stream `streamName`. // Note that Mist API supports only overriding the whole trigger configuration, therefore this function needs to: // 1. Acquire a lock @@ -469,6 +478,16 @@ func commandNukeStream(name string) nukeStreamCommand { } } +type stopSessionsCommand struct { + StopSessions string `json:"stop_sessions"` +} + +func commandStopSessions(name string) stopSessionsCommand { + return stopSessionsCommand{ + StopSessions: name, + } +} + type pushAutoAddCommand struct { PushAutoAdd PushAutoAdd `json:"push_auto_add"` } diff --git a/clients/mist_client_test.go b/clients/mist_client_test.go index ede1d4d63..c63815c4f 100644 --- a/clients/mist_client_test.go +++ b/clients/mist_client_test.go @@ -43,6 +43,10 @@ func TestRequestPayload(t *testing.T) { "command=%7B%22nuke_stream%22%3A%22somestream%22%7D", commandNukeStream("somestream"), }, + { + "command=%7B%22stop_sessions%22%3A%22somestream%22%7D", + commandStopSessions("somestream"), + }, { "command=%7B%22config%22%3A%7B%22triggers%22%3A%7B%22PUSH_END%22%3A%5B%7B%22handler%22%3A%22http%3A%2F%2Flocalhost%2Fapi%22%2C%22streams%22%3A%5B%22somestream%22%5D%2C%22sync%22%3Afalse%7D%5D%7D%7D%7D", commandAddTrigger([]string{"somestream"}, "PUSH_END", "http://localhost/api", Triggers{}, false), diff --git a/events/events.go b/events/events.go index c6706fde6..cf469fe36 100644 --- a/events/events.go +++ b/events/events.go @@ -15,6 +15,7 @@ import ( const streamEventResource = "stream" const nukeEventResource = "nuke" +const stopSessionsEventResource = "stopSessions" const nodeUpdateEventResource = "nodeUpdate" type Event interface{} @@ -33,6 +34,11 @@ type NukeEvent struct { PlaybackID string `json:"playback_id"` } +type StopSessionsEvent struct { + Resource string `json:"resource"` + PlaybackID string `json:"playback_id"` +} + // JSON representation is deliberately truncated to keep the message size small type NodeUpdateEvent struct { Resource string `json:"resource,omitempty"` @@ -82,6 +88,13 @@ func Unmarshal(payload []byte) (Event, error) { return nil, err } return event, nil + case stopSessionsEventResource: + event := &StopSessionsEvent{} + err := json.Unmarshal(payload, event) + if err != nil { + return nil, err + } + return event, nil case nodeUpdateEventResource: event := &NodeUpdateEvent{} err := json.Unmarshal(payload, event) diff --git a/handlers/schemas/Event.yaml b/handlers/schemas/Event.yaml index 4419bd5c6..87bbc661c 100644 --- a/handlers/schemas/Event.yaml +++ b/handlers/schemas/Event.yaml @@ -5,6 +5,7 @@ properties: enum: - stream - nuke + - stopSessions playback_id: type: "string" required: diff --git a/main.go b/main.go index 232fb7245..aea091afc 100644 --- a/main.go +++ b/main.go @@ -382,6 +382,10 @@ func processClusterEvent(mapic mistapiconnector.IMac, bal balancer.Balancer, use case *events.NukeEvent: mapic.NukeStream(event.PlaybackID) return + case *events.StopSessionsEvent: + glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) + mapic.StopSessions(event.PlaybackID) + return case *events.NodeUpdateEvent: if glog.V(5) { glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEvent.Payload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index 56a21ee74..91fd24501 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -44,6 +44,7 @@ type ( RefreshStreamIfNeeded(playbackID string) NukeStream(playbackID string) InvalidateAllSessions(playbackID string) + StopSessions(playbackID string) IStreamCache } @@ -194,6 +195,19 @@ func (mc *mac) NukeStream(playbackID string) { mc.nukeAllStreamNames(playbackID) } +func (mc *mac) StopSessions(playbackID string) { + streamNames := []string{ + "video+" + playbackID, + } + + for _, streamName := range streamNames { + err := mc.mist.StopSessions(streamName) + if err != nil { + glog.Errorf("error stopping sessions playbackId=%s streamName=%s err=%q", playbackID, streamName, err) + } + } +} + func (mc *mac) InvalidateAllSessions(playbackID string) { mc.invalidateAllSessions(playbackID) }