Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up: pass the URL object as we are already parsing it #860

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions middleware/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestItErrorsWhenNoJobCapacityAvailable(t *testing.T) {

pipeFfmpeg, release := pipeline.NewBlockingStubHandler()
defer release()
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil, "")
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil)
coordinator.InputCopy = &clients.StubInputCopy{}

// Create a lot of in-flight jobs
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestItTakesIntoAccountInFlightHTTPRequests(t *testing.T) {

pipeFfmpeg, release := pipeline.NewBlockingStubHandler()
defer release()
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil, "")
coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil)
coordinator.InputCopy = &clients.StubInputCopy{}

// Set up the HTTP handler
Expand Down
8 changes: 4 additions & 4 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string,
strategy: strategy,
statusClient: statusClient,
pipeFfmpeg: &ffmpeg{
SourceOutputUrl: sourceOutputURL,
SourceOutputURL: sourceOutput,
Broadcaster: broadcaster,
probe: video.Probe{},
sourcePlaybackHosts: sourcePlaybackHosts,
Expand All @@ -203,18 +203,18 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string,
}

func NewStubCoordinator() *Coordinator {
return NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, nil, nil, "")
return NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, nil, nil)
}

func NewStubCoordinatorOpts(strategy Strategy, statusClient clients.TranscodeStatusClient, pipeFfmpeg, pipeExternal Handler, sourceOutputUrl string) *Coordinator {
func NewStubCoordinatorOpts(strategy Strategy, statusClient clients.TranscodeStatusClient, pipeFfmpeg, pipeExternal Handler) *Coordinator {
if strategy == "" {
strategy = StrategyCatalystFfmpegDominance
}
if statusClient == nil {
statusClient = clients.TranscodeStatusFunc(func(tsm clients.TranscodeStatusMessage) error { return nil })
}
if pipeFfmpeg == nil {
pipeFfmpeg = &ffmpeg{SourceOutputUrl: sourceOutputUrl, probe: video.Probe{}}
pipeFfmpeg = &ffmpeg{SourceOutputURL: &url.URL{}, probe: video.Probe{}}
}
if pipeExternal == nil {
pipeExternal = &external{}
Expand Down
22 changes: 11 additions & 11 deletions pipeline/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCoordinatorDoesNotBlock(t *testing.T) {
return nil, errors.New("test error")
},
}
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler, "")
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler)
inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
job := testJob
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestCoordinatorResistsPanics(t *testing.T) {
panic("oh no!")
},
}
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler, "")
coord := NewStubCoordinatorOpts("", callbackHandler, blockHandler, blockHandler)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -136,7 +136,7 @@ func TestCoordinatorCatalystDominance(t *testing.T) {

ffmpeg, calls := recordingHandler(nil)
external := allFailingHandler(t)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -156,7 +156,7 @@ func TestCoordinatorSourceCopy(t *testing.T) {

ffmpeg, calls := recordingHandler(nil)
external := allFailingHandler(t)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand All @@ -180,7 +180,7 @@ func TestCoordinatorFallbackStrategySuccess(t *testing.T) {
ffmpeg, ffmpegCalls := recordingHandler(nil)
external, externalCalls := recordingHandler(nil)

coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)

// Start a job that will complete successfully on ffmpeg, which should not
// trigger the external pipeline
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestCoordinatorFallbackStrategyFailure(t *testing.T) {
},
}

coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)

// Start a job which ffmpeg will fail and only then call the external one
inputFile, _, cleanup := setupTransferDir(t, coord)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestAllowsOverridingStrategyOnRequest(t *testing.T) {
external, externalCalls := recordingHandler(nil)

// create coordinator with strategy catalyst dominance (external should never be called)
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, nil, ffmpeg, external)

inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestPipelineCollectedMetrics(t *testing.T) {

db, dbMock, err := sqlmock.New()
require.NoError(err)
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "")
coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external)
coord.MetricsDB = db

inputFile, transferDir, cleanup := setupTransferDir(t, coord)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestPipelineCollectedMetrics(t *testing.T) {

func Test_EmptyFile(t *testing.T) {
callbackHandler, callbacks := callbacksRecorder()
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil, "")
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil)
inputFile, _, cleanup := setupTransferDir(t, coord)
defer cleanup()

Expand Down Expand Up @@ -422,7 +422,7 @@ func Test_ProbeErrors(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
callbackHandler, callbacks := callbacksRecorder()
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil, "")
coord := NewStubCoordinatorOpts("", callbackHandler, nil, nil)
inputFile, transferDir, cleanup := setupTransferDir(t, coord)
defer cleanup()
coord.InputCopy = &clients.InputCopy{
Expand Down Expand Up @@ -456,7 +456,7 @@ func Test_InputCopiedToTransferLocation(t *testing.T) {
return testHandlerResult, nil
},
}
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, callbackHandler, ffmpeg, nil, "")
coord := NewStubCoordinatorOpts(StrategyCatalystFfmpegDominance, callbackHandler, ffmpeg, nil)
f, transferDir, cleanup := setupTransferDir(t, coord)
defer cleanup()

Expand Down
8 changes: 2 additions & 6 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const LocalSourceFilePattern = "sourcevideo*"

type ffmpeg struct {
// The base of where to output source segments to
SourceOutputUrl string
SourceOutputURL *url.URL
// Broadcaster for local transcoding
Broadcaster clients.BroadcasterClient
probe video.Prober
Expand All @@ -45,11 +45,7 @@ func (f *ffmpeg) Name() string {
func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
log.Log(job.RequestID, "Handling job via FFMPEG/Livepeer pipeline")

sourceOutputBaseURL, err := url.Parse(f.SourceOutputUrl)
if err != nil {
return nil, fmt.Errorf("cannot create sourceOutputUrl: %w", err)
}
sourceOutputURL := sourceOutputBaseURL.JoinPath(job.RequestID)
sourceOutputURL := f.SourceOutputURL.JoinPath(job.RequestID)
segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST)

job.SegmentingTargetURL = segmentingTargetURL.String()
Expand Down