Skip to content

Commit

Permalink
Add support for stop sessions mist call (#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 authored Mar 11, 2024
1 parent 9816ec5 commit b3390d5
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 0 deletions.
19 changes: 19 additions & 0 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MistAPIClient interface {
InvalidateSessions(streamName string) error
DeleteStream(streamName string) error
NukeStream(streamName string) error
StopSessions(streamName string) error
AddTrigger(streamName []string, triggerName string, sync bool) error
DeleteTrigger(streamName []string, triggerName string) error
GetStreamInfo(streamName string) (MistStreamInfo, error)
Expand Down Expand Up @@ -271,6 +272,14 @@ func (mc *MistClient) NukeStream(streamName string) error {
return nil
}

func (mc *MistClient) StopSessions(streamName string) error {
c := commandStopSessions(streamName)
if err := validateAuth(mc.sendCommand(c)); err != nil {
return err
}
return nil
}

// AddTrigger adds a trigger `triggerName` for the stream `streamName`.
// Note that Mist API supports only overriding the whole trigger configuration, therefore this function needs to:
// 1. Acquire a lock
Expand Down Expand Up @@ -469,6 +478,16 @@ func commandNukeStream(name string) nukeStreamCommand {
}
}

type stopSessionsCommand struct {
StopSessions string `json:"stop_sessions"`
}

func commandStopSessions(name string) stopSessionsCommand {
return stopSessionsCommand{
StopSessions: name,
}
}

type pushAutoAddCommand struct {
PushAutoAdd PushAutoAdd `json:"push_auto_add"`
}
Expand Down
4 changes: 4 additions & 0 deletions clients/mist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func TestRequestPayload(t *testing.T) {
"command=%7B%22nuke_stream%22%3A%22somestream%22%7D",
commandNukeStream("somestream"),
},
{
"command=%7B%22stop_sessions%22%3A%22somestream%22%7D",
commandStopSessions("somestream"),
},
{
"command=%7B%22config%22%3A%7B%22triggers%22%3A%7B%22PUSH_END%22%3A%5B%7B%22handler%22%3A%22http%3A%2F%2Flocalhost%2Fapi%22%2C%22streams%22%3A%5B%22somestream%22%5D%2C%22sync%22%3Afalse%7D%5D%7D%7D%7D",
commandAddTrigger([]string{"somestream"}, "PUSH_END", "http://localhost/api", Triggers{}, false),
Expand Down
13 changes: 13 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

const streamEventResource = "stream"
const nukeEventResource = "nuke"
const stopSessionsEventResource = "stopSessions"
const nodeUpdateEventResource = "nodeUpdate"

type Event interface{}
Expand All @@ -33,6 +34,11 @@ type NukeEvent struct {
PlaybackID string `json:"playback_id"`
}

type StopSessionsEvent struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}

// JSON representation is deliberately truncated to keep the message size small
type NodeUpdateEvent struct {
Resource string `json:"resource,omitempty"`
Expand Down Expand Up @@ -82,6 +88,13 @@ func Unmarshal(payload []byte) (Event, error) {
return nil, err
}
return event, nil
case stopSessionsEventResource:
event := &StopSessionsEvent{}
err := json.Unmarshal(payload, event)
if err != nil {
return nil, err
}
return event, nil
case nodeUpdateEventResource:
event := &NodeUpdateEvent{}
err := json.Unmarshal(payload, event)
Expand Down
1 change: 1 addition & 0 deletions handlers/schemas/Event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ properties:
enum:
- stream
- nuke
- stopSessions
playback_id:
type: "string"
required:
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ func processClusterEvent(mapic mistapiconnector.IMac, bal balancer.Balancer, use
case *events.NukeEvent:
mapic.NukeStream(event.PlaybackID)
return
case *events.StopSessionsEvent:
glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID)
mapic.StopSessions(event.PlaybackID)
return
case *events.NodeUpdateEvent:
if glog.V(5) {
glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEvent.Payload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ","))
Expand Down
14 changes: 14 additions & 0 deletions mapic/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
RefreshStreamIfNeeded(playbackID string)
NukeStream(playbackID string)
InvalidateAllSessions(playbackID string)
StopSessions(playbackID string)
IStreamCache
}

Expand Down Expand Up @@ -194,6 +195,19 @@ func (mc *mac) NukeStream(playbackID string) {
mc.nukeAllStreamNames(playbackID)
}

func (mc *mac) StopSessions(playbackID string) {
streamNames := []string{
"video+" + playbackID,
}

for _, streamName := range streamNames {
err := mc.mist.StopSessions(streamName)
if err != nil {
glog.Errorf("error stopping sessions playbackId=%s streamName=%s err=%q", playbackID, streamName, err)
}
}
}

func (mc *mac) InvalidateAllSessions(playbackID string) {
mc.invalidateAllSessions(playbackID)
}
Expand Down

0 comments on commit b3390d5

Please sign in to comment.