From be776ad7e49cd7fdc6d895b226a20c53ac2dd7c8 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 7 Mar 2024 14:12:20 -0800 Subject: [PATCH] Add EgressInfo details field (#627) * Add EgressInfo details field * general cleaning --- go.mod | 15 ++++++------ go.sum | 34 ++++++++++++++-------------- pkg/errors/errors.go | 10 -------- pkg/handler/handler.go | 2 -- pkg/handler/handler_ipc.go | 4 ++-- pkg/pipeline/controller.go | 21 ++++++++++------- pkg/pipeline/sink/image.go | 2 +- pkg/pipeline/sink/segments.go | 1 - pkg/pipeline/source/sdk.go | 2 -- pkg/pipeline/source/sdk/appwriter.go | 4 ---- pkg/pipeline/source/web.go | 11 +++++++++ pkg/pipeline/watch.go | 8 +++---- pkg/service/process.go | 1 - pkg/service/service.go | 3 +-- pkg/service/service_ipc.go | 4 ++-- pkg/service/service_rpc.go | 2 +- test/ioserver.go | 2 +- test/segments.go | 2 +- 18 files changed, 61 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 1123143f..5a1de426 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.44.296 github.com/chromedp/cdproto v0.0.0-20230625224106-7fafe342e117 github.com/chromedp/chromedp v0.9.1 - github.com/frostbyte73/core v0.0.9 + github.com/frostbyte73/core v0.0.10 github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4 github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c github.com/go-logr/logr v1.4.1 @@ -19,16 +19,16 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/livekit/livekit-server v1.5.1-0.20231026153736-8b16db227070 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569 - github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 + github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0 + github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 github.com/livekit/server-sdk-go/v2 v2.0.1 github.com/pion/rtp v1.8.3 github.com/pion/webrtc/v3 v3.2.28 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.18.0 + github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.5.0 - github.com/prometheus/common v0.45.0 - github.com/stretchr/testify v1.8.4 + github.com/prometheus/common v0.48.0 + github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.25.7 go.uber.org/atomic v1.11.0 go.uber.org/zap v1.27.0 @@ -57,7 +57,7 @@ require ( github.com/elliotchance/orderedmap/v2 v2.2.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gammazero/deque v0.2.1 // indirect - github.com/go-jose/go-jose/v3 v3.0.1 // indirect + github.com/go-jose/go-jose/v3 v3.0.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect @@ -79,7 +79,6 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-pointer v0.0.1 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/nats-io/nats.go v1.31.0 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect diff --git a/go.sum b/go.sum index 15033baf..53fe3a94 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= -github.com/frostbyte73/core v0.0.9 h1:AmE9GjgGpPsWk9ZkmY3HsYUs2hf2tZt+/W6r49URBQI= -github.com/frostbyte73/core v0.0.9/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= +github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU= +github.com/frostbyte73/core v0.0.10/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= @@ -84,8 +84,8 @@ github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4 h1:c2y2vC6HUoWfFsnT github.com/go-gst/go-glib v0.0.0-20230906175327-b2d34240bcb4/go.mod h1:rXuKU+tCN7pN+b/7oIyWv6MpnlGy+QWd7jRhWUNstjU= github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c h1:tnXVeRaPX/g10eWPqoCN46oxRcXvB2AkTBKcaxOALvA= github.com/go-gst/go-gst v0.0.0-20231009181223-aa872b0f6c0c/go.mod h1:yPBGJ0tPALScq484ccEfiSom85fK+ccSZjY+Ua1cHi8= -github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA= -github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= +github.com/go-jose/go-jose/v3 v3.0.2 h1:2Edjn8Nrb44UvTdp84KU0bBPs1cO7noRCybtS3eJEUQ= +github.com/go-jose/go-jose/v3 v3.0.2/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -124,6 +124,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= @@ -173,10 +174,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f h1:XHrwGwLNGQB3ZqolH1YdMH/22hgXKr4vm+2M7JKMMGg= github.com/livekit/mediatransportutil v0.0.0-20231213075826-cccbf2b93d3f/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= -github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569 h1:lVAQS1tF/NMxcmRSAgm7bE4FkAm0gSHrS2skfltZZyo= -github.com/livekit/protocol v1.9.10-0.20240226212446-6e689b5c3569/go.mod h1:Ppvb/6576ksmTTHATnG+73nk65U7CxDnFVsRQ7+zAkc= -github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 h1:yH55rBGLRO+ict2mu6bKZ5iPwTIrIwU1i0ydgThi4+k= -github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= +github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0 h1:On0btT/HpWFvs1RhoRdnAaiUA0uKHSaSCvQnMqWE/qM= +github.com/livekit/protocol v1.11.1-0.20240307194236-37d8b0515da0/go.mod h1:XpJ2t2wFnnQghPpkxXAzMZhYMDnm8wWxdxYJK4fP9gM= +github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= +github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.0.1 h1:qwuMK7WUd30DM7IJ2sOqpQcZcHqP02tzs5Y6CRsV4Lg= github.com/livekit/server-sdk-go/v2 v2.0.1/go.mod h1:l9mRrCvR7H2AAJjs/624duhvuKUTjtVddjqiIQ6YcZw= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -189,8 +190,6 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= @@ -257,13 +256,13 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= -github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY= @@ -279,13 +278,14 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= @@ -321,7 +321,6 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -415,6 +414,7 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index f427f6dc..01c51f87 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -19,8 +19,6 @@ import ( "fmt" "strings" - "github.com/go-gst/go-gst/gst" - "github.com/livekit/psrpc" ) @@ -120,18 +118,10 @@ func ErrUploadFailed(location string, err error) error { return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err) } -func ErrWebsocketClosed(addr string) error { - return psrpc.NewErrorf(psrpc.Internal, "websocket closed: %s", addr) -} - func ErrProcessStartFailed(err error) error { return psrpc.NewError(psrpc.Internal, err) } -func ErrStateChangeFailed(bin string, state gst.State) error { - return psrpc.NewErrorf(psrpc.Internal, "%s failed to change state to %s", bin, state.String()) -} - type ErrArray struct { errs []error } diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index f36284a1..fca4add4 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -58,8 +58,6 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. ioClient: ioClient, ipcHandlerServer: grpc.NewServer(), ipcServiceClient: ipcClient, - initialized: core.NewFuse(), - kill: core.NewFuse(), } ipc.RegisterEgressHandlerServer(h.ipcHandlerServer, h) diff --git a/pkg/handler/handler_ipc.go b/pkg/handler/handler_ipc.go index d13b0b76..522cb7c2 100644 --- a/pkg/handler/handler_ipc.go +++ b/pkg/handler/handler_ipc.go @@ -70,7 +70,7 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr } // GetMetrics implement the handler-side gathering of metrics to return over IPC -func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc.MetricsResponse, error) { +func (h *Handler) GetMetrics(ctx context.Context, _ *ipc.MetricsRequest) (*ipc.MetricsResponse, error) { ctx, span := tracer.Start(ctx, "Handler.GetMetrics") defer span.End() @@ -84,7 +84,7 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc }, nil } -func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) { +func (h *Handler) GenerateMetrics(_ context.Context) (string, error) { metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { return "", err diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 3f7eb773..6cd69f4d 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -77,9 +77,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl ioClient: ioClient, gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), - playing: core.NewFuse(), - eos: core.NewFuse(), - stopped: core.NewFuse(), } c.callbacks.SetOnError(c.OnError) @@ -215,7 +212,9 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { select { case <-c.stopped.Watch(): c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - c.Info.Error = "Start signal not received" + if c.Info.Details == "" { + c.Info.Details = "Start signal not received" + } return c.Info case <-start: // continue @@ -395,7 +394,9 @@ func (c *Controller) SendEOS(ctx context.Context) { switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - c.Info.Error = "Stop called before pipeline could start" + if c.Info.Details == "" { + c.Info.Details = "Stop called before pipeline could start" + } fallthrough case livekit.EgressStatus_EGRESS_ABORTED, @@ -463,7 +464,9 @@ func (c *Controller) Close() { switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - c.Info.Error = "Stop called before pipeline could start" + if c.Info.Details == "" { + c.Info.Details = "Stop called before pipeline could start" + } case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: @@ -502,10 +505,12 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - c.Info.Error = "Session limit reached before start signal" + if c.Info.Details == "" { + c.Info.Details = "Session limit reached before start signal" + } case livekit.EgressStatus_EGRESS_ACTIVE: c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED - c.Info.Error = "Session limit reached" + c.Info.Details = "Session limit reached" } if c.playing.IsBroken() { c.SendEOS(ctx) diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index 33a62ba6..4b10ad91 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -22,6 +22,7 @@ import ( "time" "github.com/frostbyte73/core" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/pipeline/sink/uploader" @@ -61,7 +62,6 @@ func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Image manifest: createImageManifest(p), createdImages: make(chan *imageUpdate, maxPendingUploads), - done: core.NewFuse(), }, nil } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 6a4939b1..784a704b 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -102,7 +102,6 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg openSegmentsStartTime: make(map[string]uint64), closedSegments: make(chan SegmentUpdate, maxPendingUploads), playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), - done: core.NewFuse(), } // Register gauges that track the number of segments and playlist updates pending upload diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index ddc2f7c9..0ad1a31f 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -75,10 +75,8 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr sync: synchronizer.NewSynchronizer(func() { close(startRecording) }), - initialized: core.NewFuse(), filenameReplacements: make(map[string]string), writers: make(map[string]*sdk.AppWriter), - closed: core.NewFuse(), startRecording: startRecording, endRecording: make(chan struct{}), } diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 571334f3..800a37ba 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -103,10 +103,6 @@ func NewAppWriter( callbacks: callbacks, sync: sync, TrackSynchronizer: sync.AddTrack(track, rp.Identity()), - playing: core.NewFuse(), - draining: core.NewFuse(), - endStream: core.NewFuse(), - finished: core.NewFuse(), } if logFilename != "" { diff --git a/pkg/pipeline/source/web.go b/pkg/pipeline/source/web.go index 3586d398..7ba54622 100644 --- a/pkg/pipeline/source/web.go +++ b/pkg/pipeline/source/web.go @@ -31,6 +31,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" ) @@ -47,6 +48,8 @@ type WebSource struct { startRecording chan struct{} endRecording chan struct{} + + info *livekit.EgressInfo } func init() { @@ -61,6 +64,7 @@ func NewWebSource(ctx context.Context, p *config.PipelineConfig) (*WebSource, er s := &WebSource{ endRecording: make(chan struct{}), + info: p.Info, } if p.AwaitStartSignal { s.startRecording = make(chan struct{}) @@ -292,6 +296,13 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig, case *runtime.EventExceptionThrown: logChrome("exception", ev) + if s.info.Details == "" { + if exceptionDetails := ev.ExceptionDetails; exceptionDetails != nil { + if exception := exceptionDetails.Exception; exception != nil { + s.info.Details = fmt.Sprintf("Uncaught chrome exception: %s", exception.Description) + } + } + } } }) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 8294efab..108b934d 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -69,7 +69,7 @@ const ( msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation." ) -func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line int, obj *glib.Object, message string) { +func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line int, _ *glib.Object, message string) { var lvl string switch level { case gst.LevelNone: @@ -291,12 +291,12 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { } logger.Debugw("received GstMultiFileSink message", "location", location, "timestamp", ts, "source", msg.Source()) - sink := c.getImageSink(msg.Source()) - if sink == nil { + imageSink := c.getImageSink(msg.Source()) + if imageSink == nil { return errors.ErrSinkNotFound } - err = sink.NewImage(location, ts) + err = imageSink.NewImage(location, ts) if err != nil { return err } diff --git a/pkg/service/process.go b/pkg/service/process.go index 49e98b21..59c793b3 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -60,7 +60,6 @@ func NewProcess( cmd: cmd, ipcHandlerClient: ipcClient, ready: make(chan struct{}), - closed: core.NewFuse(), } return p, nil diff --git a/pkg/service/service.go b/pkg/service/service.go index 1907765a..8f9b84c6 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -63,7 +63,6 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service conf: conf, ipcServiceServer: grpc.NewServer(), ioClient: ioClient, - shutdown: core.NewFuse(), activeHandlers: make(map[string]*Process), } @@ -153,7 +152,7 @@ func (s *Service) Reset() { s.Stop(false) } - s.shutdown = core.NewFuse() + s.shutdown = core.Fuse{} } func (s *Service) Status() ([]byte, error) { diff --git a/pkg/service/service_ipc.go b/pkg/service/service_ipc.go index 0421430e..a0f5dc4b 100644 --- a/pkg/service/service_ipc.go +++ b/pkg/service/service_ipc.go @@ -23,7 +23,7 @@ import ( "github.com/livekit/egress/pkg/ipc" ) -func (s *Service) HandlerReady(ctx context.Context, req *ipc.HandlerReadyRequest) (*emptypb.Empty, error) { +func (s *Service) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) (*emptypb.Empty, error) { s.mu.RLock() p, ok := s.activeHandlers[req.EgressId] s.mu.RUnlock() @@ -36,7 +36,7 @@ func (s *Service) HandlerReady(ctx context.Context, req *ipc.HandlerReadyRequest return &emptypb.Empty{}, nil } -func (s *Service) HandlerShuttingDown(ctx context.Context, req *ipc.HandlerShuttingDownRequest) (*emptypb.Empty, error) { +func (s *Service) HandlerShuttingDown(_ context.Context, req *ipc.HandlerShuttingDownRequest) (*emptypb.Empty, error) { err := s.storeProcessEndedMetrics(req.EgressId, req.Metrics) if err != nil { return nil, err diff --git a/pkg/service/service_rpc.go b/pkg/service/service_rpc.go index 05cefff6..576375d7 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/service/service_rpc.go @@ -74,7 +74,7 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) return p.Info, nil } -func (s *Service) StartEgressAffinity(ctx context.Context, req *rpc.StartEgressRequest) float32 { +func (s *Service) StartEgressAffinity(_ context.Context, req *rpc.StartEgressRequest) float32 { if !s.CanAcceptRequest(req) { // cannot accept return -1 diff --git a/test/ioserver.go b/test/ioserver.go index 102a65be..73247967 100644 --- a/test/ioserver.go +++ b/test/ioserver.go @@ -45,7 +45,7 @@ func newIOTestServer(bus psrpc.MessageBus, updates chan *livekit.EgressInfo) (*i return s, nil } -func (s *ioTestServer) CreateEgress(_ context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { +func (s *ioTestServer) CreateEgress(_ context.Context, _ *livekit.EgressInfo) (*emptypb.Empty, error) { return &emptypb.Empty{}, nil } diff --git a/test/segments.go b/test/segments.go index 8086d8d0..048e097a 100644 --- a/test/segments.go +++ b/test/segments.go @@ -97,7 +97,7 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil if plType == m3u8.PlaylistTypeEvent { // Only download segments once base := storedPlaylistPath[:len(storedPlaylistPath)-5] - for i := 0; i < int(segmentCount); i++ { + for i := 0; i < segmentCount; i++ { cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) download(t, uploadConfig, localPath, cloudPath)