Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node version and orch addr to transcoded metadata #3165

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## v0.X.X

- [#3165](https://github.com/livepeer/go-livepeer/pull/3165) Add node version and orch addr to transcoded metadata

### Breaking Changes 🚨🚨

### Features ⚒
Expand All @@ -22,6 +24,8 @@

#### Broadcaster

- [#3164](https://github.com/livepeer/go-livepeer/pull/3164) Fix video compatibility check

#### Orchestrator

#### Transcoder
1 change: 1 addition & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
n.RecipientAddr = recipientAddr.Hex()

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, timeWatcher)
Expand Down
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type LivepeerNode struct {
// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Recipient pm.Recipient
RecipientAddr string
SelectionAlgorithm common.SelectionAlgorithm
OrchestratorPool common.OrchestratorPool
OrchPerfScore *common.PerfScore
Expand Down
13 changes: 13 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,17 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,
}
md.Fname = url

orchId := "offchain"
if n.RecipientAddr != "" {
orchId = n.RecipientAddr
}
if isRemote {
// huge hack to thread the orch id down to the transcoder
md.Metadata = map[string]string{"orchId": orchId}
} else {
md.Metadata = MakeMetadata(orchId)
}

//Do the transcoding
start := time.Now()
tData, err := transcoder.Transcode(ctx, md)
Expand Down Expand Up @@ -767,6 +778,7 @@ type RemoteTranscoder struct {
addr string
capacity int
load int
orchId string
}

// RemoteTranscoderFatalError wraps error to indicate that error is fatal
Expand Down Expand Up @@ -816,6 +828,7 @@ func (rt *RemoteTranscoder) Transcode(logCtx context.Context, md *SegTranscoding
msg := &net.NotifySegment{
Url: fname,
TaskId: taskID,
OrchId: md.Metadata["orchId"],
SegData: segData,
// Triggers failure on Os that don't know how to use SegData
Profiles: []byte("invalid"),
Expand Down
9 changes: 9 additions & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SegTranscodingMetadata struct {
AuthToken *net.AuthToken
CalcPerceptualHash bool
SegmentParameters *SegmentParameters
Metadata map[string]string
}

func (md *SegTranscodingMetadata) Flatten() []byte {
Expand Down Expand Up @@ -192,3 +193,11 @@ func (id StreamID) String() string {
func RandomManifestID() ManifestID {
return ManifestID(common.RandomIDGenerator(DefaultManifestIDLength))
}

func MakeMetadata(id string) map[string]string {
s := fmt.Sprintf("Livepeer Transcoder %s (%s)", LivepeerVersion, id)
return map[string]string{
"service_provider": s, // for mpegts
"comment": "Processed by " + s, // for mp4
}
}
16 changes: 11 additions & 5 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
Accel: ffmpeg.Software,
}
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -100,7 +100,7 @@ func (nv *NetintTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -135,7 +135,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -429,8 +429,13 @@ func resToTranscodeData(ctx context.Context, res *ffmpeg.TranscodeResults, opts
}, nil
}

func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profiles []ffmpeg.VideoProfile, calcPHash bool,
segPar *SegmentParameters) []ffmpeg.TranscodeOptions {
func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, md *SegTranscodingMetadata) []ffmpeg.TranscodeOptions {
var (
profiles []ffmpeg.VideoProfile = md.Profiles
calcPHash bool = md.CalcPerceptualHash
segPar *SegmentParameters = md.SegmentParameters
metadata map[string]string = md.Metadata
)

opts := make([]ffmpeg.TranscodeOptions, len(profiles))
for i := range profiles {
Expand All @@ -440,6 +445,7 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
Metadata: metadata,
}
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85
github.com/livepeer/lpms v0.0.0-20240906035551-beda797c2cbd
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-sqlite3 v1.14.18
github.com/olekukonko/tablewriter v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cO
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18 h1:4oH3NqV0NvcdS44Ld3zK2tO8IUiNozIggm74yobQeZg=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18/go.mod h1:Jpf4jHK+fbWioBHRDRM1WadNT1qmY27g2YicTdO0Rtc=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85 h1:E8hJhT1nEW1jneK+Re3KyJcsITFYS9oa0vgyA6bLmKE=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/lpms v0.0.0-20240906035551-beda797c2cbd h1:jSuHiaWNkVEB3wCZicQ+MNFYgM1orgm8EGKEUeldfoI=
github.com/livepeer/lpms v0.0.0-20240906035551-beda797c2cbd/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
Expand Down
Loading
Loading