Skip to content

Commit

Permalink
transmux: use stream based concatenation for clips
Browse files Browse the repository at this point in the history
Occasionally, the mp4s of very short clips might stutter as it crosses
segment boundaries. This was caused by incorrect PTS between segments
which in turn is a result of us clipping and re-encoding the first and
last segments "locally". Using a stream based concatenation instead of
file based concatenation readjusts the timestamps and prevents these
issues.

Note: Eventually, all mp4s will be generated using stream based
concatenation if this works well for clipping.
  • Loading branch information
emranemran committed Nov 9, 2023
1 parent 15bd8b2 commit 6661ab8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 18 deletions.
17 changes: 12 additions & 5 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,35 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st

var concatFiles []string
for rendition, segments := range renditionList.RenditionSegmentTable {
// a. create folder to hold transmux-ed files in local storage temporarily
// Create folder to hold transmux-ed files in local storage temporarily
err := os.MkdirAll(TransmuxStorageDir, 0700)
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}

// b. create a single .ts file for a given rendition by concatenating all segments in order
// Create a single .ts file for a given rendition by concatenating all segments in order
if rendition == "low-bitrate" {
// skip mp4 generation for low-bitrate profile
continue
}
concatTsFileName := filepath.Join(TransmuxStorageDir, transcodeRequest.RequestID+"_"+rendition+".ts")
concatFiles = append(concatFiles, concatTsFileName)
defer os.Remove(concatTsFileName)
totalBytes, err := video.ConcatTS(concatTsFileName, segments)
// For now, use the stream based concat for clipping only and file based concat for everything else.
// Eventually, all mp4 generation can be moved to stream based concat once proven effective.
var totalBytes int64
if transcodeRequest.IsClip {
totalBytes, err = video.ConcatTS(concatTsFileName, segments, true)
} else {
totalBytes, err = video.ConcatTS(concatTsFileName, segments, false)
}
if err != nil {
log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err)
continue
}

// c. Verify the total bytes written for the single .ts file for a given rendition matches the total # of bytes we received from T
// Verify the total bytes written for the single .ts file for a given rendition matches the total # of bytes we received from T
renditionIndex := getProfileIndex(transcodeProfiles, rendition)
var rendBytesWritten int64 = -1
for _, v := range transcodedStats {
Expand All @@ -227,7 +234,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
break
}

// d. Mux the .ts file to generate either a regular MP4 (w/ faststart) or fMP4 packaged with HLS/DASH
// Mux the .ts file to generate either a regular MP4 (w/ faststart) or fMP4 packaged with HLS/DASH
if enableStandardMp4 {
// Transmux the single .ts file into an mp4 file
mp4OutputFileName := concatTsFileName[:len(concatTsFileName)-len(filepath.Ext(concatTsFileName))] + ".mp4"
Expand Down
95 changes: 82 additions & 13 deletions video/transmux.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package video

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"time"

ffmpeg "github.com/u2takey/ffmpeg-go"
Expand Down Expand Up @@ -95,21 +97,88 @@ func MuxTStoFMP4(fmp4ManifestOutputFile string, inputs ...string) error {
return nil
}

func ConcatTS(tsFileName string, segmentsList *TSegmentList) (int64, error) {
func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConcat bool) (int64, error) {
var totalBytes int64
// 1. create a .ts file for a given rendition
tsFile, err := os.Create(tsFileName)
if err != nil {
return totalBytes, fmt.Errorf("error creating file (%s) err: %w", tsFileName, err)
}
defer tsFile.Close()
// 2. for a given rendition, write all segment indices in ascending order to the single .ts file
for _, k := range segmentsList.GetSortedSegments() {
segBytes, err := tsFile.Write(segmentsList.SegmentDataTable[k])
if !useStreamBasedConcat {
// Create a .ts file for a given rendition
tsFile, err := os.Create(tsFileName)
if err != nil {
return totalBytes, fmt.Errorf("error writing segment %d err: %w", k, err)
return totalBytes, fmt.Errorf("error creating file (%s) err: %w", tsFileName, err)
}
defer tsFile.Close()
// For a given rendition, write all segment indices in ascending order to the single .ts file
for _, k := range segmentsList.GetSortedSegments() {
segBytes, err := tsFile.Write(segmentsList.SegmentDataTable[k])
if err != nil {
return totalBytes, fmt.Errorf("error writing segment %d err: %w", k, err)
}
totalBytes = totalBytes + int64(segBytes)
}
return totalBytes, nil
} else {
// Strip the '.ts' from filename
fileBaseWithoutExt := tsFileName[:len(tsFileName)-len(".ts")]

// Create a text file containing filenames of the segments
segmentListTxtFileName := fileBaseWithoutExt + ".txt"
segmentListTxtFile, err := os.Create(segmentListTxtFileName)
if err != nil {
return totalBytes, fmt.Errorf("error creating segment text file (%s) err: %w", segmentListTxtFileName, err)
}
defer segmentListTxtFile.Close()
defer os.Remove(segmentListTxtFileName)
w := bufio.NewWriter(segmentListTxtFile)

// Write each segment to disk and add segment filename to the text file
for segName, segData := range segmentsList.GetSortedSegments() {
// Open a new file to write each segment to disk
segmentFileName := fileBaseWithoutExt + "_" + strconv.Itoa(segName) + ".ts"
segmentFile, err := os.Create(segmentFileName)
if err != nil {
return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFileName, err)
}
defer segmentFile.Close()
defer os.Remove(segmentFileName)
// Write the segment data to disk
segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData])
if err != nil {
return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err)
}
totalBytes = totalBytes + int64(segBytes)
// Add filename to the text file
line := fmt.Sprintf("file '%s'\n", segmentFileName)
if _, err = w.WriteString(line); err != nil {
return totalBytes, fmt.Errorf("error writing segment %d to text file err: %w", segName, err)
}
// Flush to make sure all buffered operations are applied
if err = w.Flush(); err != nil {
return totalBytes, fmt.Errorf("error flushing text file %s err: %w", segmentFileName, err)
}
}
totalBytes = totalBytes + int64(segBytes)
// Create a .ts file for a given rendition
tsFile, err := os.Create(tsFileName)
if err != nil {
return totalBytes, fmt.Errorf("error creating file (%s) err: %w", tsFileName, err)
}
defer tsFile.Close()
// transmux the individual .ts files into a combined single ts file using stream based concatenation
err = ffmpeg.Input(segmentListTxtFileName, ffmpeg.KwArgs{
"f": "concat", // Use stream based concatenation (instead of file based concatenation)
"safe": "0"}). // Must be 0 since relative paths to segments are used in segmentListTxtFileName
Output(tsFileName, ffmpeg.KwArgs{
"c": "copy", // Don't accidentally transcode
}).
OverWriteOutput().ErrorToStdOut().Run()
if err != nil {
return totalBytes, fmt.Errorf("failed to transmux multiple ts files from %s into a ts file: %w", segmentListTxtFileName, err)
}
// Verify the ts output file was created
_, err = os.Stat(tsFileName)
if err != nil {
return totalBytes, fmt.Errorf("transmux error: failed to stat .ts media file: %w", err)
}

return totalBytes, nil
}
return totalBytes, nil

}

0 comments on commit 6661ab8

Please sign in to comment.