Skip to content

Commit

Permalink
clip/handlers/coordinator: add scaffolding to accept clipping requests
Browse files Browse the repository at this point in the history
This adds additional parameters to the vod endpoint to accept clipping
start/end times. If the fields are set with valid values, the input
manifest will be clipped and then a new clipped manifest will be used in
the VOD pipeline to generate assets as usual. The logic for clipping the
actual manifest will be done as a follow-on PR.
  • Loading branch information
emranemran committed Sep 12, 2023
1 parent c81f282 commit a65b02a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 2 deletions.
10 changes: 10 additions & 0 deletions handlers/schemas/UploadVOD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ properties:
required:
- "encrypted_key"
additionalProperties: false
clip_strategy:
type: "object"
properties:
start_time:
type: "integer"
end_time:
type: "integer"
required:
- "start_time"
additionalProperties: false
pipeline_strategy:
type: string
description:
Expand Down
31 changes: 29 additions & 2 deletions handlers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type UploadVODRequest struct {
TargetSegmentSizeSecs int64 `json:"target_segment_size_secs"`
Profiles []video.EncodedProfile `json:"profiles"`
PipelineStrategy pipeline.Strategy `json:"pipeline_strategy"`

// Forwarded to clipping stage:
ClipStrategy video.ClipStrategy `json:"clip_strategy"`
}

type UploadVODResponse struct {
Expand Down Expand Up @@ -101,6 +104,24 @@ func (r UploadVODRequest) IsProfileValid() bool {
return true
}

func (r UploadVODRequest) IsClipValid() bool {
startTime := r.ClipStrategy.StartTime
endTime := r.ClipStrategy.EndTime

if startTime < 0 || endTime <= 0 || startTime == endTime {
return false
}

epoch := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
start := epoch.Add(time.Duration(startTime) * time.Second)
end := epoch.Add(time.Duration(endTime) * time.Second)

if start.After(end) {
return false
}
return true
}

func (r UploadVODRequest) getTargetHlsOutput() UploadVODRequestOutputLocation {
for _, o := range r.OutputLocations {
if o.Outputs.HLS == "enabled" {
Expand Down Expand Up @@ -196,6 +217,12 @@ func (d *CatalystAPIHandlersCollection) handleUploadVOD(w http.ResponseWriter, r
}
log.AddContext(requestID, "target_segment_size_secs", uploadVODRequest.TargetSegmentSizeSecs)

// Check if this is a clipping request
if uploadVODRequest.IsClipValid() {
uploadVODRequest.ClipStrategy.Enabled = true
}

// Get target locatons for HLS, MP4, FMP4 outputs
hlsTargetOutput := uploadVODRequest.getTargetHlsOutput()
hlsTargetURL, err := toTargetURL(hlsTargetOutput, requestID)
if err != nil {
Expand All @@ -211,11 +238,11 @@ func (d *CatalystAPIHandlersCollection) handleUploadVOD(w http.ResponseWriter, r
if err != nil {
return false, errors.WriteHTTPBadRequest(w, "Invalid request payload", err)
}

if hlsTargetURL == nil && mp4TargetURL == nil && fragMp4TargetURL == nil {
return false, errors.WriteHTTPBadRequest(w, "Invalid request payload", errors2.New("none of output enabled: hls or mp4 or f-mp4"))
}

// Verify pipeline strategy
if strat := uploadVODRequest.PipelineStrategy; strat != "" && !strat.IsValid() {
return false, errors.WriteHTTPBadRequest(w, "Invalid request payload", fmt.Errorf("invalid value provided for pipeline strategy: %q", uploadVODRequest.PipelineStrategy))
}
Expand All @@ -224,7 +251,6 @@ func (d *CatalystAPIHandlersCollection) handleUploadVOD(w http.ResponseWriter, r

// Once we're happy with the request, do the rest of the Segmenting stage asynchronously to allow us to
// from the API call and free up the HTTP connection

d.VODEngine.StartUploadJob(pipeline.UploadJobPayload{
SourceFile: uploadVODRequest.Url,
CallbackURL: uploadVODRequest.CallbackUrl,
Expand All @@ -241,6 +267,7 @@ func (d *CatalystAPIHandlersCollection) handleUploadVOD(w http.ResponseWriter, r
TargetSegmentSizeSecs: uploadVODRequest.TargetSegmentSizeSecs,
Encryption: uploadVODRequest.Encryption,
SourceCopy: uploadVODRequest.getSourceCopyEnabled(),
ClipStrategy: uploadVODRequest.ClipStrategy,
})

respBytes, err := json.Marshal(UploadVODResponse{RequestID: requestID})
Expand Down
10 changes: 10 additions & 0 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type UploadJobPayload struct {
Encryption *EncryptionPayload
InputFileInfo video.InputVideo
SourceCopy bool
ClipStrategy video.ClipStrategy
}

type EncryptionPayload struct {
Expand Down Expand Up @@ -272,6 +273,15 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) {

osTransferURL := c.SourceOutputURL.JoinPath(p.RequestID, "transfer", path.Base(sourceURL.Path))
if clients.IsHLSInput(sourceURL) {
// Currently we only clip an HLS source (e.g recordings or transcoded asset)
if p.ClipStrategy.Enabled {
log.Log(p.RequestID, "clippity clipping the input")
// Use new clipped manifest as the source URL
sourceURL, err = video.ClipInput(p.RequestID, sourceURL)
if err != nil {
return nil, fmt.Errorf("error clipping input: %w", err)
}
}
osTransferURL = sourceURL
} else if p.SourceCopy {
log.Log(p.RequestID, "source copy enabled")
Expand Down
14 changes: 14 additions & 0 deletions video/clip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ import (
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/log"
ffmpeg "github.com/u2takey/ffmpeg-go"
"net/url"
"time"
)

type ClipStrategy struct {
Enabled bool
StartTime float64 `json:"start_time,omitempty"`
EndTime float64 `json:"end_time,omitempty"`
}

// format time in secs to be copatible with ffmpeg's expected time syntax
func formatTime(seconds float64) string {
duration := time.Duration(seconds * float64(time.Second))
Expand Down Expand Up @@ -55,6 +62,13 @@ func getRelevantSegment(allSegments []*m3u8.MediaSegment, playHeadTime float64,
return 0, fmt.Errorf("error clipping: did not find a segment that falls within %v seconds", playHeadTime)
}

// Function that will take a source URL manifest and return a new URL
// pointing to the clipped manifest
func ClipInput(requestID string, srcUrl *url.URL) (*url.URL, error) {
// TODO:*actually* do the clipping
return srcUrl, nil
}

// Function to find relevant segments that span from the clipping start and end times
func ClipManifest(requestID string, manifest *m3u8.MediaPlaylist, startTime, endTime float64) ([]*m3u8.MediaSegment, error) {
var startSegIdx, endSegIdx uint64
Expand Down

0 comments on commit a65b02a

Please sign in to comment.