From 141c5f545030e45b5e145a0ef37c4275289e01ca Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 14 Sep 2023 12:08:54 -0700 Subject: [PATCH 1/3] fix stream resetting --- pkg/pipeline/watch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 0e2166c8..c1bfc0b0 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -166,7 +166,7 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { case element == elementGstRtmp2Sink: if strings.HasPrefix(gErr.Error(), "Connection error") && !c.eos.IsBroken() { // try reconnecting - ok, err := c.streamBin.ResetStream(name, gErr) + ok, err := c.streamBin.ResetStream(name[10:], gErr) if err != nil { logger.Errorw("failed to reset stream", err) } else if ok { From 8d6f6287aaa841e101ce322d8c31360b9edd894c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 14 Sep 2023 12:19:55 -0700 Subject: [PATCH 2/3] split name first --- pkg/pipeline/builder/stream.go | 5 +---- pkg/pipeline/watch.go | 4 +++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index be6af961..f129ad74 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -16,7 +16,6 @@ package builder import ( "fmt" - "strings" "sync" "github.com/tinyzimmer/go-gst/gst" @@ -98,9 +97,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St return sb, b, nil } -func (sb *StreamBin) GetStreamUrl(element string) (string, error) { - name := strings.Split(element, "_")[1] - +func (sb *StreamBin) GetStreamUrl(name string) (string, error) { sb.mu.RLock() url, ok := sb.urls[name] sb.mu.RUnlock() diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index c1bfc0b0..4d7d20a6 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -164,9 +164,11 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { switch { case element == elementGstRtmp2Sink: + name = strings.Split(name, "_")[1] + if strings.HasPrefix(gErr.Error(), "Connection error") && !c.eos.IsBroken() { // try reconnecting - ok, err := c.streamBin.ResetStream(name[10:], gErr) + ok, err := c.streamBin.ResetStream(name, gErr) if err != nil { logger.Errorw("failed to reset stream", err) } else if ok { From d9e36814297eab5fbc4095c70c4c8fa4bc497f78 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 14 Sep 2023 12:41:42 -0700 Subject: [PATCH 3/3] add logging, close src on build failure --- pkg/pipeline/controller.go | 2 ++ pkg/pipeline/source/web.go | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index eb93574b..d0cdff01 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -94,12 +94,14 @@ func New(ctx context.Context, conf *config.PipelineConfig) (*Controller, error) // create sinks c.sinks, err = sink.CreateSinks(conf, c.callbacks) if err != nil { + c.src.Close() return nil, err } // create pipeline <-c.callbacks.GstReady if err = c.BuildPipeline(); err != nil { + c.src.Close() return nil, err } diff --git a/pkg/pipeline/source/web.go b/pkg/pipeline/source/web.go index 5e868379..ae0ac3ce 100644 --- a/pkg/pipeline/source/web.go +++ b/pkg/pipeline/source/web.go @@ -105,11 +105,13 @@ func (s *WebSource) GetEndedAt() int64 { func (s *WebSource) Close() { if s.chromeCancel != nil { + logger.Debugw("closing chrome") s.chromeCancel() s.chromeCancel = nil } if s.xvfb != nil { + logger.Debugw("closing X display") err := s.xvfb.Process.Signal(os.Interrupt) if err != nil { logger.Errorw("failed to kill xvfb", err) @@ -118,6 +120,7 @@ func (s *WebSource) Close() { } if s.pulseSink != "" { + logger.Debugw("unloading pulse module") err := exec.Command("pactl", "unload-module", s.pulseSink).Run() if err != nil { logger.Errorw("failed to unload pulse sink", err) @@ -139,6 +142,7 @@ func (s *WebSource) createPulseSink(ctx context.Context, p *config.PipelineConfi ctx, span := tracer.Start(ctx, "WebInput.createPulseSink") defer span.End() + logger.Debugw("creating pulse sink") cmd := exec.Command("pactl", "load-module", "module-null-sink", fmt.Sprintf("sink_name=\"%s\"", p.Info.EgressId), @@ -162,7 +166,7 @@ func (s *WebSource) launchXvfb(ctx context.Context, p *config.PipelineConfig) er defer span.End() dims := fmt.Sprintf("%dx%dx%d", p.Width, p.Height, p.Depth) - logger.Debugw("launching xvfb", "display", p.Display, "dims", dims) + logger.Debugw("creating X display", "display", p.Display, "dims", dims) xvfb := exec.Command("Xvfb", p.Display, "-screen", "0", dims, "-ac", "-nolisten", "tcp", "-nolisten", "unix") xvfb.Stderr = &errorLogger{cmd: "xvfb"} if err := xvfb.Start(); err != nil { @@ -193,7 +197,7 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig, webUrl = inputUrl.String() } - logger.Debugw("launching chrome", "url", webUrl, "enableChomeSandbox", p.EnableChromeSandbox, "insecure", p.Insecure) + logger.Debugw("launching chrome", "url", webUrl, "sandbox", p.EnableChromeSandbox, "insecure", p.Insecure) opts := []chromedp.ExecAllocatorOption{ chromedp.NoFirstRun,