Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an error return to makeStreamID to make issues explicit #400

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func main() {

lpms.HandleRTMPPublish(
//makeStreamID (give the stream an ID)
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
s := exampleStream(randString(10))
return &s
return &s, nil
},

//gotStream
Expand Down
4 changes: 2 additions & 2 deletions cmd/example/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func main() {
lpms := core.New(&core.LPMSOpts{WorkDir: fmt.Sprintf("%v/.tmp", dir)})

lpms.HandleRTMPPublish(
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
glog.Infof("Stream has been started!: %v", url)
return exampleStream(randString(10))
return exampleStream(randString(10)), nil
},

func(url *url.URL, rs stream.RTMPVideoStream) (err error) {
Expand Down
20 changes: 10 additions & 10 deletions core/lpms.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
//To integrate with LPMS means your code will become the source / destination of the media server.
//This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
// The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
// To integrate with LPMS means your code will become the source / destination of the media server.
// This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
package core

import (
Expand Down Expand Up @@ -67,7 +67,7 @@ func defaultLPMSOpts(opts *LPMSOpts) {
}
}

//New creates a new LPMS server object. It really just brokers everything to the components.
// New creates a new LPMS server object. It really just brokers everything to the components.
func New(opts *LPMSOpts) *LPMS {
defaultLPMSOpts(opts)
var rtmpServer *joy4rtmp.Server
Expand All @@ -83,7 +83,7 @@ func New(opts *LPMSOpts) *LPMS {
return &LPMS{vidPlayer: player, vidListener: listener, workDir: opts.WorkDir, rtmpAddr: opts.RtmpAddr, httpAddr: httpAddr}
}

//Start starts the rtmp and http servers, and initializes ffmpeg
// Start starts the rtmp and http servers, and initializes ffmpeg
func (l *LPMS) Start(ctx context.Context) error {
ec := make(chan error, 1)
ffmpeg.InitFFmpeg()
Expand Down Expand Up @@ -114,21 +114,21 @@ func (l *LPMS) Start(ctx context.Context) error {
return nil
}

//HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
// HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
func (l *LPMS) HandleRTMPPublish(
makeStreamID func(url *url.URL) (strmID stream.AppData),
makeStreamID func(url *url.URL) (strmID stream.AppData, err error),
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error),
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {

l.vidListener.HandleRTMPPublish(makeStreamID, gotStream, endStream)
}

//HandleRTMPPlay offload to the video player
// HandleRTMPPlay offload to the video player
func (l *LPMS) HandleRTMPPlay(getStream func(url *url.URL) (stream.RTMPVideoStream, error)) error {
return l.vidPlayer.HandleRTMPPlay(getStream)
}

//HandleHLSPlay offload to the video player
// HandleHLSPlay offload to the video player
func (l *LPMS) HandleHLSPlay(
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
Expand All @@ -137,7 +137,7 @@ func (l *LPMS) HandleHLSPlay(
l.vidPlayer.HandleHLSPlay(getMasterPlaylist, getMediaPlaylist, getSegment)
}

//SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
// SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
func (l *LPMS) SegmentRTMPToHLS(ctx context.Context, rs stream.RTMPVideoStream, hs stream.HLSVideoStream, segOptions segmenter.SegmenterOptions) error {
// set localhost if necessary. Check more problematic addrs? [::] ?
rtmpAddr := l.rtmpAddr
Expand Down
14 changes: 7 additions & 7 deletions vidlistener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ type VidListener struct {
RtmpServer *joy4rtmp.Server
}

//HandleRTMPPublish takes 3 parameters - makeStreamID, gotStream, and endStream.
//makeStreamID is called when the stream starts. It should return a streamID from the requestURL.
//gotStream is called when the stream starts. It gives you access to the stream.
//endStream is called when the stream ends. It gives you access to the stream.
// HandleRTMPPublish takes 3 parameters - makeStreamID, gotStream, and endStream.
// makeStreamID is called when the stream starts. It should return a streamID from the requestURL.
// gotStream is called when the stream starts. It gives you access to the stream.
// endStream is called when the stream ends. It gives you access to the stream.
func (self *VidListener) HandleRTMPPublish(
makeStreamID func(url *url.URL) (strmID stream.AppData),
makeStreamID func(url *url.URL) (strmID stream.AppData, err error),
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error,
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {

if self.RtmpServer != nil {
self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
glog.V(2).Infof("RTMP server got upstream: %v", conn.URL)

strmID := makeStreamID(conn.URL)
if strmID == nil || strmID.StreamID() == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this makeStreamID() actually implemented? In go-livepeer? I wonder if changing this condition here won't bring any regression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, in go-livepeer. I think that we can probably keep the original check as well though if we'd like to be safe

strmID, err := makeStreamID(conn.URL)
if err != nil || strmID == nil || strmID.StreamID() == "" {
conn.Close()
return
}
Expand Down
12 changes: 6 additions & 6 deletions vidlistener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestListener(t *testing.T) {

listener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
return newTestStream()
func(url *url.URL) (stream.AppData, error) {
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
Expand Down Expand Up @@ -82,8 +82,8 @@ func TestListenerError(t *testing.T) {
failures := 0
badListener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
return newTestStream()
func(url *url.URL) (stream.AppData, error) {
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
Expand Down Expand Up @@ -123,9 +123,9 @@ func TestListenerEmptyStreamID(t *testing.T) {

badListener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
// On returning empty stream id connection should be closed
return newTestStream()
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
Expand Down
Loading