From e392d77d3442f7f5fffea033c9de334c7e6dcae9 Mon Sep 17 00:00:00 2001 From: Matjaz Debelak Date: Thu, 28 Nov 2024 10:38:48 +0100 Subject: [PATCH 1/2] Merge and import subs based on CSV spec --- activities/transcode.go | 16 ++ common/merge.go | 9 +- go.mod | 1 + go.sum | 2 + services/ffmpeg/progress.go | 2 +- services/transcode/merge.go | 82 +++++++++ services/transcode/merge_test.go | 65 +++++++ services/transcode/testdata/sub1.srt | 19 ++ .../subtitles_merge_by_offset_result.srt | 40 +++++ .../testdata/subtitles_merge_result.srt | 16 ++ workflows/misc/merge_import_subs.go | 165 ++++++++++++++++++ workflows/workflows.go | 2 + 12 files changed, 414 insertions(+), 5 deletions(-) create mode 100644 services/transcode/testdata/sub1.srt create mode 100644 services/transcode/testdata/subtitles_merge_by_offset_result.srt create mode 100644 services/transcode/testdata/subtitles_merge_result.srt create mode 100644 workflows/misc/merge_import_subs.go diff --git a/activities/transcode.go b/activities/transcode.go index 92d50861..ff51494f 100644 --- a/activities/transcode.go +++ b/activities/transcode.go @@ -204,6 +204,22 @@ func (aa AudioActivities) TranscodeMergeAudio(ctx context.Context, params common return result, nil } +func (aa AudioActivities) MergeSubtitlesByOffset(ctx context.Context, params common.MergeInput) (*common.MergeResult, error) { + log := activity.GetLogger(ctx) + activity.RecordHeartbeat(ctx, "MergeSubtitlesByOffset") + log.Info("Starting MergeSubtitlesByOffsetActivity") + + // No easy way of reporting progress, so this just triggers heartbeats + stopChan, progressCallback := registerProgressCallback(ctx) + defer close(stopChan) + + result, err := transcode.MergeSubtitlesByOffset(params, progressCallback) + if err != nil { + return nil, err + } + return result, nil +} + func (va VideoActivities) TranscodeMergeSubtitles(ctx context.Context, params common.MergeInput) (*common.MergeResult, error) { log := activity.GetLogger(ctx) activity.RecordHeartbeat(ctx, "TranscodeMergeSubtitles") diff --git a/common/merge.go b/common/merge.go index 827eca92..22bbe267 100644 --- a/common/merge.go +++ b/common/merge.go @@ -6,10 +6,11 @@ import ( ) type MergeInputItem struct { - Path paths.Path - Start float64 - End float64 - Streams []vidispine.AudioStream + Path paths.Path + Start float64 + End float64 + StartOffset float64 + Streams []vidispine.AudioStream } type MergeInput struct { diff --git a/go.mod b/go.mod index 2a0ec1e9..621823da 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/glebarez/go-sqlite v1.21.2 github.com/go-resty/resty/v2 v2.11.0 + github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 github.com/jlaffaye/ftp v0.2.0 github.com/orsinium-labs/enum v1.3.0 diff --git a/go.sum b/go.sum index 2e7f1af4..d20a6e8c 100644 --- a/go.sum +++ b/go.sum @@ -863,6 +863,8 @@ github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaC github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8= github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 h1:FWNFq4fM1wPfcK40yHE5UO3RUdSNPaBC+j3PokzA6OQ= +github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= diff --git a/services/ffmpeg/progress.go b/services/ffmpeg/progress.go index b3663191..88065c3b 100644 --- a/services/ffmpeg/progress.go +++ b/services/ffmpeg/progress.go @@ -148,7 +148,7 @@ func parseProgressCallback(command []string, info StreamInfo, cb func(Progress)) } else if parts[0] == "speed" { progress.Speed = parts[1] } - if parts[0] == "progress" { + if parts[0] == "progress" && cb != nil { cb(progress) } } diff --git a/services/transcode/merge.go b/services/transcode/merge.go index c39d0619..95bc94de 100644 --- a/services/transcode/merge.go +++ b/services/transcode/merge.go @@ -237,6 +237,88 @@ func formatDuration(seconds float64) string { return fmt.Sprintf("%02d:%02d:%02d,%03d", hours, minutes, wholeSeconds, milliseconds) } +// MergeSubtitlesByOffset merges subtitles based on a specified offset +// +// This is used for example when you have several movies played in a feast. +// this way the offset indicates the offset from the start of the event, and the subtitles will be placed there +func MergeSubtitlesByOffset(input common.MergeInput, progressCallback ffmpeg.ProgressCallback) (*common.MergeResult, error) { + var files []string + + for index, item := range input.Items { + fileOut := filepath.Join(input.WorkDir.Local(), fmt.Sprintf("%s-%d-out.srt", input.Title, index)) + path := item.Path.Local() + + cmd := exec.Command("ffmpeg", + "-hide_banner", + "-itsoffset", fmt.Sprintf("%f", item.StartOffset), + "-i", path, + "-y", fileOut, + ) + _, err := utils.ExecuteCmd(cmd, nil) + if err != nil { + return nil, err + } + + files = append(files, fileOut) + } + + // the files have to be present in a text file for ffmpeg to concatenate them. + // #subtitles.txt + // file /path/to/file/0.srt + // file /path/to/file/1.srt + var content string + for _, f := range files { + content += fmt.Sprintf("file '%s'\n", f) + } + + subtitlesFile := filepath.Join(input.WorkDir.Local(), input.Title+"-subtitles.txt") + + err := os.WriteFile(subtitlesFile, []byte(content), os.ModePerm) + if err != nil { + return nil, err + } + + for _, f := range files { + err = ensureValidSrtFile(f) + if err != nil { + return nil, err + } + } + + concatStr := fmt.Sprintf("concat:%s", strings.Join(files, "|")) + + outputFilePath := filepath.Join(input.OutputDir.Local(), filepath.Clean(input.Title)+".srt") + params := []string{ + "-hide_banner", + "-progress", "pipe:1", + "-hide_banner", + "-i", concatStr, + "-c", "copy", + "-y", + outputFilePath, + } + + _, err = ffmpeg.Do(params, ffmpeg.StreamInfo{}, progressCallback) + if err != nil { + return nil, err + } + + err = ensureValidSrtFile(outputFilePath) + if err != nil { + return nil, err + } + + outputPath, err := paths.Parse(outputFilePath) + if err != nil { + return nil, err + } + + return &common.MergeResult{ + Path: outputPath, + }, err +} + +// MergeSubtitles does the merging of subtitles for the mormal mediabanken export func MergeSubtitles(input common.MergeInput, progressCallback ffmpeg.ProgressCallback) (*common.MergeResult, error) { var files []string // for each file, extract the specified range and save the result to a file. diff --git a/services/transcode/merge_test.go b/services/transcode/merge_test.go index 31da46eb..181d5967 100644 --- a/services/transcode/merge_test.go +++ b/services/transcode/merge_test.go @@ -1,6 +1,7 @@ package transcode import ( + "os" "testing" "github.com/bcc-code/bcc-media-flows/common" @@ -101,3 +102,67 @@ func Test_mergeItemsToStereoStream_64Chan(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "[0:a]pan=stereo|c0=c5|c1=c6[a0]", str) } + +func Test_MergeSubtitles(t *testing.T) { + output := paths.MustParse("./testdata/generated/") + subPath := paths.MustParse("./testdata/sub1.srt") + + input := common.MergeInput{ + OutputDir: output, + WorkDir: output, + Title: t.Name(), + Items: []common.MergeInputItem{ + common.MergeInputItem{ + Path: subPath, + Start: 0, + End: 5, + }, + common.MergeInputItem{ + Path: subPath, + Start: 10, + End: 15, + }, + }, + } + + res, err := MergeSubtitles(input, nil) + assert.NoError(t, err) + assert.Equal(t, paths.MustParse("./testdata/generated/Test_MergeSubtitles.srt"), res.Path) + assert.FileExists(t, res.Path.Local()) + + actual, _ := os.ReadFile(res.Path.Local()) + expected, _ := os.ReadFile("./testdata/subtitles_merge_result.srt") + + assert.Equal(t, expected, actual) +} + +func Test_MergeSubtitlesByOffset(t *testing.T) { + output := paths.MustParse("./testdata/generated/") + subPath := paths.MustParse("./testdata/sub1.srt") + + input := common.MergeInput{ + OutputDir: output, + WorkDir: output, + Title: t.Name(), + Items: []common.MergeInputItem{ + common.MergeInputItem{ + Path: subPath, + StartOffset: 0, + }, + common.MergeInputItem{ + Path: subPath, + StartOffset: 100, + }, + }, + } + + res, err := MergeSubtitlesByOffset(input, nil) + assert.NoError(t, err) + assert.Equal(t, paths.MustParse("./testdata/generated/Test_MergeSubtitlesByOffset.srt"), res.Path) + assert.FileExists(t, res.Path.Local()) + + actual, _ := os.ReadFile(res.Path.Local()) + expected, _ := os.ReadFile("./testdata/subtitles_merge_by_offset_result.srt") + + assert.Equal(t, expected, actual) +} diff --git a/services/transcode/testdata/sub1.srt b/services/transcode/testdata/sub1.srt new file mode 100644 index 00000000..cf176b7e --- /dev/null +++ b/services/transcode/testdata/sub1.srt @@ -0,0 +1,19 @@ +1 +00:00:01,000 --> 00:00:04,000 +Hello, this is the first subtitle +Second line of the first subtitle + +2 +00:00:04,500 --> 00:00:07,800 +This is the second subtitle +It can also have multiple lines +And even more lines + +3 +00:00:08,000 --> 00:00:12,000 +Third subtitle entry + +4 +00:00:13,000 --> 00:00:15,500 +- Multiple speakers can be shown +- Using dashes like this diff --git a/services/transcode/testdata/subtitles_merge_by_offset_result.srt b/services/transcode/testdata/subtitles_merge_by_offset_result.srt new file mode 100644 index 00000000..db7cc774 --- /dev/null +++ b/services/transcode/testdata/subtitles_merge_by_offset_result.srt @@ -0,0 +1,40 @@ +1 +00:00:01,000 --> 00:00:04,000 +Hello, this is the first subtitle +Second line of the first subtitle + +2 +00:00:04,500 --> 00:00:07,800 +This is the second subtitle +It can also have multiple lines +And even more lines + +3 +00:00:08,000 --> 00:00:12,000 +Third subtitle entry + +4 +00:00:13,000 --> 00:00:15,500 +- Multiple speakers can be shown +- Using dashes like this + +5 +00:01:41,000 --> 00:01:44,000 +Hello, this is the first subtitle +Second line of the first subtitle + +6 +00:01:44,500 --> 00:01:47,800 +This is the second subtitle +It can also have multiple lines +And even more lines + +7 +00:01:48,000 --> 00:01:52,000 +Third subtitle entry + +8 +00:01:53,000 --> 00:01:55,500 +- Multiple speakers can be shown +- Using dashes like this + diff --git a/services/transcode/testdata/subtitles_merge_result.srt b/services/transcode/testdata/subtitles_merge_result.srt new file mode 100644 index 00000000..439e6968 --- /dev/null +++ b/services/transcode/testdata/subtitles_merge_result.srt @@ -0,0 +1,16 @@ +1 +00:00:01,000 --> 00:00:04,000 +Hello, this is the first subtitle +Second line of the first subtitle + +2 +00:00:04,500 --> 00:00:07,800 +This is the second subtitle +It can also have multiple lines +And even more lines + +3 +00:00:18,000 --> 00:00:20,500 +- Multiple speakers can be shown +- Using dashes like this + diff --git a/workflows/misc/merge_import_subs.go b/workflows/misc/merge_import_subs.go new file mode 100644 index 00000000..82b9eae5 --- /dev/null +++ b/workflows/misc/merge_import_subs.go @@ -0,0 +1,165 @@ +package miscworkflows + +import ( + "encoding/csv" + "fmt" + "io" + "strings" + "time" + + "github.com/bcc-code/bcc-media-flows/activities" + vsactivity "github.com/bcc-code/bcc-media-flows/activities/vidispine" + "github.com/bcc-code/bcc-media-flows/common" + "github.com/bcc-code/bcc-media-flows/paths" + "github.com/bcc-code/bcc-media-flows/services/telegram" + wfutils "github.com/bcc-code/bcc-media-flows/utils/workflows" + "github.com/gocarina/gocsv" + "go.temporal.io/sdk/workflow" +) + +type MergeAndImportSubtitlesFromCSVParams struct { + TargetVXID string + CSVData string + Title string + Separator rune +} + +func convertCSVTimestamp(timestamp string) (float64, error) { + parts := strings.Split(timestamp, ":") + if len(parts) != 4 { + return 0, fmt.Errorf("invalid timestamp format: %s", timestamp) + } + + // Parse the time components + t, err := time.Parse("15:04:05", strings.Join(parts[:3], ":")) + if err != nil { + return 0, err + } + + // Parse milliseconds + ms, err := time.ParseDuration(parts[3] + "ms") + if err != nil { + return 0, err + } + + return float64(t.Hour()*3600+t.Minute()*60+t.Second()) + ms.Seconds(), nil +} + +func MergeAndImportSubtitlesFromCSV(ctx workflow.Context, params MergeAndImportSubtitlesFromCSVParams) (bool, error) { + + logger := workflow.GetLogger(ctx) + + options := wfutils.GetDefaultActivityOptions() + ctx = workflow.WithActivityOptions(ctx, options) + + logger.Info("Starting sub merge and import") + wfutils.SendTelegramText(ctx, telegram.ChatOther, "🟦 Starting sub merge and import to VXID: "+params.TargetVXID) + + tempPath, _ := wfutils.GetWorkflowTempFolder(ctx) + outputPath, _ := wfutils.GetWorkflowAuxOutputFolder(ctx) + + entries, err := parseSubMergeData([]byte(params.CSVData), params.Separator) + if err != nil { + return false, err + } + + mergeData := map[string]*common.MergeInput{} + + for _, entry := range entries { + offset, err := convertCSVTimestamp(entry.TimecodeStr) + if err != nil { + return false, err + } + + res, err := wfutils.Execute(ctx, activities.Util.GetSubtitlesActivity, activities.GetSubtitlesInput{ + SubtransID: entry.SubtransID, + Format: "srt", + ApprovedOnly: false, + DestinationFolder: tempPath, + }).Result(ctx) + + for lang, sub := range res { + + if _, ok := mergeData[lang]; !ok { + mergeData[lang] = &common.MergeInput{ + Title: params.Title, + WorkDir: tempPath, + OutputDir: outputPath, + } + } + + mergeData[lang].Items = append(mergeData[lang].Items, common.MergeInputItem{ + StartOffset: offset, + Path: sub, + }) + } + + if err != nil { + return false, err + } + } + + merged := map[string]paths.Path{} + + langs, err := wfutils.GetMapKeysSafely(ctx, mergeData) + if err != nil { + return false, err + } + + for _, lang := range langs { + merge := mergeData[lang] + res, err := wfutils.Execute(ctx, activities.Audio.MergeSubtitlesByOffset, *merge).Result(ctx) + if err != nil { + return false, err + } + merged[lang] = res.Path + } + + for _, lang := range langs { + sub := merged[lang] + lang := strings.ToLower(lang) + + jobRes := &vsactivity.JobResult{} + err = wfutils.Execute(ctx, activities.Vidispine.ImportFileAsShapeActivity, vsactivity.ImportFileAsShapeParams{ + AssetID: params.TargetVXID, + FilePath: sub, + ShapeTag: fmt.Sprintf("sub_%s_%s", lang, "srt"), + Replace: true, + }).Get(ctx, jobRes) + + if jobRes.JobID == "" { + logger.Info("No job created for importing subtitle shape", "lang", lang, "file", sub) + continue + } + + langs = append(langs, lang) + + _ = wfutils.Execute(ctx, activities.Vidispine.WaitForJobCompletion, vsactivity.WaitForJobCompletionParams{ + JobID: jobRes.JobID, + SleepTime: 10, + }).Get(ctx, nil) + } + + return true, nil +} + +type SubtitleEntry struct { + SubtransID string `csv:"Subtrans ID"` + TimecodeStr string `csv:"Timecode start"` +} + +func parseSubMergeData(input []byte, separator rune) ([]SubtitleEntry, error) { + var entries []SubtitleEntry + + gocsv.SetCSVReader(func(in io.Reader) gocsv.CSVReader { + r := csv.NewReader(in) + r.Comma = separator + return r + }) + + if err := gocsv.UnmarshalBytes(input, &entries); err != nil { + return nil, err + } + + return entries, nil +} diff --git a/workflows/workflows.go b/workflows/workflows.go index 4ad7174b..2856aaa9 100644 --- a/workflows/workflows.go +++ b/workflows/workflows.go @@ -19,6 +19,7 @@ var TriggerableWorkflows = []any{ miscworkflows.HandleMultitrackFile, export.ExportTimedMetadata, miscworkflows.ImportSubtitlesFromSubtrans, + miscworkflows.MergeAndImportSubtitlesFromCSV, miscworkflows.UpdateAssetRelations, miscworkflows.NormalizeAudioLevelWorkflow, scheduled.CleanupTemp, @@ -42,6 +43,7 @@ var WorkerWorkflows = []any{ export.ExportTimedMetadata, miscworkflows.ExecuteFFmpeg, miscworkflows.ImportSubtitlesFromSubtrans, + miscworkflows.MergeAndImportSubtitlesFromCSV, miscworkflows.UpdateAssetRelations, ingestworkflows.Asset, ingestworkflows.RawMaterial, From c868a7029bd1693325fada64c29072b1e3bf0afc Mon Sep 17 00:00:00 2001 From: Matjaz Debelak Date: Thu, 28 Nov 2024 11:00:40 +0100 Subject: [PATCH 2/2] Minor fixes --- workflows/misc/merge_import_subs.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/workflows/misc/merge_import_subs.go b/workflows/misc/merge_import_subs.go index 82b9eae5..6952f3d7 100644 --- a/workflows/misc/merge_import_subs.go +++ b/workflows/misc/merge_import_subs.go @@ -137,9 +137,11 @@ func MergeAndImportSubtitlesFromCSV(ctx workflow.Context, params MergeAndImportS _ = wfutils.Execute(ctx, activities.Vidispine.WaitForJobCompletion, vsactivity.WaitForJobCompletionParams{ JobID: jobRes.JobID, SleepTime: 10, - }).Get(ctx, nil) + }).Wait(ctx) } + wfutils.SendTelegramText(ctx, telegram.ChatOther, "🟩 CSV based sub merge and import for VXID: "+params.TargetVXID+" finished") + return true, nil }