Skip to content

Commit

Permalink
Pass the URL object as we are already parsing it (#860)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 authored Sep 13, 2023
1 parent b8dc66e commit 0794951
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 23 deletions.
4 changes: 2 additions & 2 deletions middleware/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestItErrorsWhenNoJobCapacityAvailable(t *testing.T) {

pipeFfmpeg, release := pipeline.NewBlockingStubHandler()
defer release()
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil, "")
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil)
coordinator.InputCopy = &clients.StubInputCopy{}

// Create a lot of in-flight jobs
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestItTakesIntoAccountInFlightHTTPRequests(t *testing.T) {

pipeFfmpeg, release := pipeline.NewBlockingStubHandler()
defer release()
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil, "")
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil)
coordinator.InputCopy = &clients.StubInputCopy{}

// Set up the HTTP handler
Expand Down
8 changes: 4 additions & 4 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string,
strategy: strategy,
statusClient: statusClient,
pipeFfmpeg: &ffmpeg{
SourceOutputUrl: sourceOutputURL,
SourceOutputURL: sourceOutput,
Broadcaster: broadcaster,
probe: video.Probe{},
sourcePlaybackHosts: sourcePlaybackHosts,
Expand All @@ -204,18 +204,18 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string,
}

func NewStubCoordinator() *Coordinator {
return NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, nil, nil, "")
return NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, nil, nil)
}

func NewStubCoordinatorOpts(strategy Strategy, statusClient clients.TranscodeStatusClient, pipeFfmpeg, pipeExternal Handler, sourceOutputUrl string) *Coordinator {
func NewStubCoordinatorOpts(strategy Strategy, statusClient clients.TranscodeStatusClient, pipeFfmpeg, pipeExternal Handler) *Coordinator {
if strategy == "" {
strategy = StrategyCatalystFfmpegDominance
}
if statusClient == nil {
statusClient = clients.TranscodeStatusFunc(func(tsm clients.TranscodeStatusMessage) error { return nil })
}
if pipeFfmpeg == nil {
pipeFfmpeg = &ffmpeg{SourceOutputUrl: sourceOutputUrl, probe: video.Probe{}}
pipeFfmpeg = &ffmpeg{SourceOutputURL: &url.URL{}, probe: video.Probe{}}
}
if pipeExternal == nil {
pipeExternal = &external{}
Expand Down
22 changes: 11 additions & 11 deletions pipeline/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCoordinatorDoesNotBlock(t *testing.T) {
return nil, errors.New("test error")
},
}
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler, "")
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler)
inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
job := testJob
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestCoordinatorResistsPanics(t *testing.T) {
panic("oh no!")
},
}
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler, "")
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -136,7 +136,7 @@ func TestCoordinatorCatalystDominance(t *testing.T) {

ffmpeg, calls := recordingHandler(nil)
external := allFailingHandler(t)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -156,7 +156,7 @@ func TestCoordinatorSourceCopy(t *testing.T) {

ffmpeg, calls := recordingHandler(nil)
external := allFailingHandler(t)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -180,7 +180,7 @@ func TestCoordinatorFallbackStrategySuccess(t *testing.T) {
ffmpeg, ffmpegCalls := recordingHandler(nil)
external, externalCalls := recordingHandler(nil)

coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)

// Start a job that will complete successfully on ffmpeg, which should not
// trigger the external pipeline
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestCoordinatorFallbackStrategyFailure(t *testing.T) {
},
}

coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)

// Start a job which ffmpeg will fail and only then call the external one
inputFile, _, cleanup := setupTransferDir(t, coord)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestAllowsOverridingStrategyOnRequest(t *testing.T) {
external, externalCalls := recordingHandler(nil)

// create coordinator with strategy catalyst dominance (external should never be called)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestPipelineCollectedMetrics(t *testing.T) {

db, dbMock, err := sqlmock.New()
require.NoError(err)
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)
coord.MetricsDB = db

inputFile, transferDir, cleanup := setupTransferDir(t, coord)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestPipelineCollectedMetrics(t *testing.T) {

func Test_EmptyFile(t *testing.T) {
callbackHandler, callbacks := callbacksRecorder()
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil, "")
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil)
inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()

Expand Down Expand Up @@ -422,7 +422,7 @@ func Test_ProbeErrors(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
callbackHandler, callbacks := callbacksRecorder()
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil, "")
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil)
inputFile, transferDir, cleanup := setupTransferDir(t, coord)
defer cleanup()
coord.InputCopy = &clients.InputCopy{
Expand Down Expand Up @@ -456,7 +456,7 @@ func Test_InputCopiedToTransferLocation(t *testing.T) {
return testHandlerResult, nil
},
}
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, callbackHandler, ffmpeg, nil, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, callbackHandler, ffmpeg, nil)
f, transferDir, cleanup := setupTransferDir(t, coord)
defer cleanup()

Expand Down
8 changes: 2 additions & 6 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const LocalSourceFilePattern = "sourcevideo*"

type ffmpeg struct {
// The base of where to output source segments to
SourceOutputUrl string
SourceOutputURL *url.URL
// Broadcaster for local transcoding
Broadcaster clients.BroadcasterClient
probe video.Prober
Expand All @@ -45,11 +45,7 @@ func (f *ffmpeg) Name() string {
func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
log.Log(job.RequestID, "Handling job via FFMPEG/Livepeer pipeline")

sourceOutputBaseURL, err := url.Parse(f.SourceOutputUrl)
if err != nil {
return nil, fmt.Errorf("cannot create sourceOutputUrl: %w", err)
}
sourceOutputURL := sourceOutputBaseURL.JoinPath(job.RequestID)
sourceOutputURL := f.SourceOutputURL.JoinPath(job.RequestID)
segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST)

job.SegmentingTargetURL = segmentingTargetURL.String()
Expand Down

0 comments on commit 0794951

Please sign in to comment.