Skip to content

Commit

Permalink
Add EgressInfo details field (#627)
Browse files Browse the repository at this point in the history
* Add EgressInfo details field

* general cleaning
  • Loading branch information
frostbyte73 authored Mar 7, 2024
1 parent cb2116b commit be776ad
Show file tree
Hide file tree
Showing 18 changed files with 61 additions and 67 deletions.
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ require (
github.com/aws/aws-sdk-go v1.44.296
github.com/chromedp/cdproto v0.0.0-20230625224106-7fafe342e117
github.com/chromedp/chromedp v0.9.1
github.com/frostbyte73/core v0.0.9
github.com/frostbyte73/core v0.0.10
github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4
github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c
github.com/go-logr/logr v1.4.1
github.com/googleapis/gax-go/v2 v2.12.0
github.com/gorilla/websocket v1.5.1
github.com/livekit/livekit-server v1.5.1-0.20231026153736-8b16db227070
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58
github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4
github.com/livekit/server-sdk-go/v2 v2.0.1
github.com/pion/rtp v1.8.3
github.com/pion/webrtc/v3 v3.2.28
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/stretchr/testify v1.8.4
github.com/prometheus/common v0.48.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.25.7
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -57,7 +57,7 @@ require (
github.com/elliotchance/orderedmap/v2 v2.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
Expand All @@ -79,7 +79,6 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down
34 changes: 17 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/frostbyte73/core v0.0.9 h1:AmE9GjgGpPsWk9ZkmY3HsYUs2hf2tZt+/W6r49URBQI=
github.com/frostbyte73/core v0.0.9/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU=
github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU=
github.com/frostbyte73/core v0.0.10/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
Expand All @@ -84,8 +84,8 @@ github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4 h1:c2y2vC6HUoWfFsnT
github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4/go.mod h1:rXuKU+tCN7pN+b/7oIyWv6MpnlGy+QWd7jRhWUNstjU=
github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c h1:tnXVeRaPX/g10eWPqoCN46oxRcXvB2AkTBKcaxOALvA=
github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c/go.mod h1:yPBGJ0tPALScq484ccEfiSom85fK+ccSZjY+Ua1cHi8=
github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA=
github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-jose/go-jose/v3 v3.0.2 h1:2Edjn8Nrb44UvTdp84KU0bBPs1cO7noRCybtS3eJEUQ=
github.com/go-jose/go-jose/v3 v3.0.2/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -124,6 +124,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
Expand Down Expand Up @@ -173,10 +174,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg=
github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569 h1:lVAQS1tF/NMxcmRSAgm7bE4FkAm0gSHrS2skfltZZyo=
github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569/go.mod h1:Ppvb/6576ksmTTHATnG+73nk65U7CxDnFVsRQ7+zAkc=
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 h1:yH55rBGLRO+ict2mu6bKZ5iPwTIrIwU1i0ydgThi4+k=
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0 h1:On0btT/HpWFvs1RhoRdnAaiUA0uKHSaSCvQnMqWE/qM=
github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0/go.mod h1:XpJ2t2wFnnQghPpkxXAzMZhYMDnm8wWxdxYJK4fP9gM=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.0.1 h1:qwuMK7WUd30DM7IJ2sOqpQcZcHqP02tzs5Y6CRsV4Lg=
github.com/livekit/server-sdk-go/v2 v2.0.1/go.mod h1:l9mRrCvR7H2AAJjs/624duhvuKUTjtVddjqiIQ6YcZw=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand All @@ -189,8 +190,6 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
Expand Down Expand Up @@ -257,13 +256,13 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY=
Expand All @@ -279,13 +278,14 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU=
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
Expand Down Expand Up @@ -321,7 +321,6 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -415,6 +414,7 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
10 changes: 0 additions & 10 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"fmt"
"strings"

"github.com/go-gst/go-gst/gst"

"github.com/livekit/psrpc"
)

Expand Down Expand Up @@ -120,18 +118,10 @@ func ErrUploadFailed(location string, err error) error {
return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err)
}

func ErrWebsocketClosed(addr string) error {
return psrpc.NewErrorf(psrpc.Internal, "websocket closed: %s", addr)
}

func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrStateChangeFailed(bin string, state gst.State) error {
return psrpc.NewErrorf(psrpc.Internal, "%s failed to change state to %s", bin, state.String())
}

type ErrArray struct {
errs []error
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
initialized: core.NewFuse(),
kill: core.NewFuse(),
}

ipc.RegisterEgressHandlerServer(h.ipcHandlerServer, h)
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/handler_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr
}

// GetMetrics implement the handler-side gathering of metrics to return over IPC
func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc.MetricsResponse, error) {
func (h *Handler) GetMetrics(ctx context.Context, _ *ipc.MetricsRequest) (*ipc.MetricsResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetMetrics")
defer span.End()

Expand All @@ -84,7 +84,7 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc
}, nil
}

func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) {
func (h *Handler) GenerateMetrics(_ context.Context) (string, error) {
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return "", err
Expand Down
21 changes: 13 additions & 8 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl
ioClient: ioClient,
gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)),
monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId),
playing: core.NewFuse(),
eos: core.NewFuse(),
stopped: core.NewFuse(),
}
c.callbacks.SetOnError(c.OnError)

Expand Down Expand Up @@ -215,7 +212,9 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
select {
case <-c.stopped.Watch():
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
c.Info.Error = "Start signal not received"
if c.Info.Details == "" {
c.Info.Details = "Start signal not received"
}
return c.Info
case <-start:
// continue
Expand Down Expand Up @@ -395,7 +394,9 @@ func (c *Controller) SendEOS(ctx context.Context) {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
c.Info.Error = "Stop called before pipeline could start"
if c.Info.Details == "" {
c.Info.Details = "Stop called before pipeline could start"
}
fallthrough

case livekit.EgressStatus_EGRESS_ABORTED,
Expand Down Expand Up @@ -463,7 +464,9 @@ func (c *Controller) Close() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
c.Info.Error = "Stop called before pipeline could start"
if c.Info.Details == "" {
c.Info.Details = "Stop called before pipeline could start"
}

case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
Expand Down Expand Up @@ -502,10 +505,12 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
c.Info.Error = "Session limit reached before start signal"
if c.Info.Details == "" {
c.Info.Details = "Session limit reached before start signal"
}
case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED
c.Info.Error = "Session limit reached"
c.Info.Details = "Session limit reached"
}
if c.playing.IsBroken() {
c.SendEOS(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/frostbyte73/core"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
Expand Down Expand Up @@ -61,7 +62,6 @@ func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Image

manifest: createImageManifest(p),
createdImages: make(chan *imageUpdate, maxPendingUploads),
done: core.NewFuse(),
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg
openSegmentsStartTime: make(map[string]uint64),
closedSegments: make(chan SegmentUpdate, maxPendingUploads),
playlistUpdates: make(chan SegmentUpdate, maxPendingUploads),
done: core.NewFuse(),
}

// Register gauges that track the number of segments and playlist updates pending upload
Expand Down
2 changes: 0 additions & 2 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr
sync: synchronizer.NewSynchronizer(func() {
close(startRecording)
}),
initialized: core.NewFuse(),
filenameReplacements: make(map[string]string),
writers: make(map[string]*sdk.AppWriter),
closed: core.NewFuse(),
startRecording: startRecording,
endRecording: make(chan struct{}),
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ func NewAppWriter(
callbacks: callbacks,
sync: sync,
TrackSynchronizer: sync.AddTrack(track, rp.Identity()),
playing: core.NewFuse(),
draining: core.NewFuse(),
endStream: core.NewFuse(),
finished: core.NewFuse(),
}

if logFilename != "" {
Expand Down
11 changes: 11 additions & 0 deletions pkg/pipeline/source/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/tracer"
)
Expand All @@ -47,6 +48,8 @@ type WebSource struct {

startRecording chan struct{}
endRecording chan struct{}

info *livekit.EgressInfo
}

func init() {
Expand All @@ -61,6 +64,7 @@ func NewWebSource(ctx context.Context, p *config.PipelineConfig) (*WebSource, er

s := &WebSource{
endRecording: make(chan struct{}),
info: p.Info,
}
if p.AwaitStartSignal {
s.startRecording = make(chan struct{})
Expand Down Expand Up @@ -292,6 +296,13 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig,

case *runtime.EventExceptionThrown:
logChrome("exception", ev)
if s.info.Details == "" {
if exceptionDetails := ev.ExceptionDetails; exceptionDetails != nil {
if exception := exceptionDetails.Exception; exception != nil {
s.info.Details = fmt.Sprintf("Uncaught chrome exception: %s", exception.Description)
}
}
}
}
})

Expand Down
8 changes: 4 additions & 4 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const (
msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation."
)

func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line int, obj *glib.Object, message string) {
func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line int, _ *glib.Object, message string) {
var lvl string
switch level {
case gst.LevelNone:
Expand Down Expand Up @@ -291,12 +291,12 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
}
logger.Debugw("received GstMultiFileSink message", "location", location, "timestamp", ts, "source", msg.Source())

sink := c.getImageSink(msg.Source())
if sink == nil {
imageSink := c.getImageSink(msg.Source())
if imageSink == nil {
return errors.ErrSinkNotFound
}

err = sink.NewImage(location, ts)
err = imageSink.NewImage(location, ts)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewProcess(
cmd: cmd,
ipcHandlerClient: ipcClient,
ready: make(chan struct{}),
closed: core.NewFuse(),
}

return p, nil
Expand Down
Loading

0 comments on commit be776ad

Please sign in to comment.