Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for handler to register #555

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -202,7 +203,7 @@ func runHandler(c *cli.Context) error {
if err != nil {
return err
}
handler, err := service.NewHandler(conf, bus, ioClient)
h, err := handler.NewHandler(conf, bus, ioClient)
if err != nil {
if errors.IsFatal(err) {
// service will send info update and shut down
Expand All @@ -217,8 +218,8 @@ func runHandler(c *cli.Context) error {
go func() {
sig := <-killChan
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
handler.Kill()
h.Kill()
}()

return handler.Run()
return h.Run()
}
13 changes: 6 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/livekit/livekit-server v1.5.1-0.20231026153736-8b16db227070
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876
github.com/livekit/psrpc v0.5.2
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/rtp v1.8.3
Expand All @@ -32,7 +32,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/exp v0.0.0-20231127185646-65229373498e
google.golang.org/api v0.130.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -79,7 +79,6 @@ require (
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand All @@ -98,7 +97,7 @@ require (
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/redis/go-redis/v9 v9.3.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
Expand All @@ -107,12 +106,12 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
26 changes: 12 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258 h1:LiuPyPbV1WVBn7RBhrwSicPgDgkts/xjL06JR2ELriM=
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258/go.mod h1:Lzra2vdFeXkKy5oFb7oXRKVqCZ+uT8w9hbZO4mEy0+Q=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876 h1:NnbpPgxDHOcSdgW0JzBkc4QzzLVAe4sOaiYqUUH0/K4=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876/go.mod h1:SzrmeWw8sbf99laJJNMwp+5izlvh/ynlMbVOX0JUoes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY=
Expand All @@ -198,8 +198,6 @@ github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
Expand Down Expand Up @@ -272,8 +270,8 @@ github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLq
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -332,11 +330,11 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -371,8 +369,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
Expand Down Expand Up @@ -412,8 +410,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
132 changes: 132 additions & 0 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"path"
"strings"
"time"

"github.com/frostbyte73/core"
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"
)

type Handler struct {
ipc.UnimplementedEgressHandlerServer

conf *config.PipelineConfig
pipeline *pipeline.Controller
rpcServer rpc.EgressHandlerServer
ipcHandlerServer *grpc.Server
ipcServiceClient ipc.EgressServiceClient
ioClient rpc.IOInfoClient
kill core.Fuse
}

func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Handler, error) {
ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID))
if err != nil {
return nil, err
}

h := &Handler{
conf: conf,
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
kill: core.NewFuse(),
}

ipc.RegisterEgressHandlerServer(h.ipcHandlerServer, h)
if err = ipc.StartHandlerListener(h.ipcHandlerServer, conf.TmpDir); err != nil {
return nil, err
}

rpcServer, err := rpc.NewEgressHandlerServer(h, bus)
if err != nil {
return nil, errors.Fatal(err)
}
if err = rpcServer.RegisterUpdateStreamTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
}
if err = rpcServer.RegisterStopEgressTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
}
h.rpcServer = rpcServer

_, err = h.ipcServiceClient.HandlerReady(context.Background(), &ipc.HandlerReadyRequest{EgressId: conf.Info.EgressId})
if err != nil {
logger.Errorw("failed to notify service", err)
return nil, err
}

h.pipeline, err = pipeline.New(context.Background(), conf, h.ioClient)
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
now := time.Now().UnixNano()
conf.Info.UpdatedAt = now
conf.Info.EndedAt = now
conf.Info.Status = livekit.EgressStatus_EGRESS_FAILED
conf.Info.Error = err.Error()
_, _ = h.ioClient.UpdateEgress(context.Background(), conf.Info)
}
return nil, err
}

return h, nil
}

func (h *Handler) Run() error {
ctx, span := tracer.Start(context.Background(), "Handler.Run")
defer span.End()

// start egress
result := make(chan *livekit.EgressInfo, 1)
go func() {
result <- h.pipeline.Run(ctx)
}()

kill := h.kill.Watch()
for {
select {
case <-kill:
// kill signal received
h.pipeline.SendEOS(ctx)

case res := <-result:
// recording finished
_, _ = h.ioClient.UpdateEgress(ctx, res)
h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
return nil
}
}
}

func (h *Handler) Kill() {
h.kill.Break()
}
113 changes: 113 additions & 0 deletions pkg/handler/handler_ipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/pprof"
"github.com/livekit/protocol/tracer"
)

func (h *Handler) GetPipelineDot(ctx context.Context, _ *ipc.GstPipelineDebugDotRequest) (*ipc.GstPipelineDebugDotResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}

res := make(chan string, 1)
go func() {
res <- h.pipeline.GetGstPipelineDebugDot()
}()

select {
case r := <-res:
return &ipc.GstPipelineDebugDotResponse{
DotFile: r,
}, nil

case <-time.After(2 * time.Second):
return nil, status.New(codes.DeadlineExceeded, "timed out requesting pipeline debug info").Err()
}
}

func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PProfResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPProf")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}

b, err := pprof.GetProfileData(ctx, req.ProfileName, int(req.Timeout), int(req.Debug))
if err != nil {
return nil, err
}

return &ipc.PProfResponse{
PprofFile: b,
}, nil
}

// GetMetrics implement the handler-side gathering of metrics to return over IPC
func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc.MetricsResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetMetrics")
defer span.End()

metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return nil, err
}

metricsAsString, err := renderMetrics(metrics)
if err != nil {
return nil, err
}

return &ipc.MetricsResponse{
Metrics: metricsAsString,
}, nil
}

func renderMetrics(metrics []*dto.MetricFamily) (string, error) {
// Create a StringWriter to render the metrics into text format
writer := &strings.Builder{}
totalCnt := 0
for _, metric := range metrics {
// Write each metric family to text
cnt, err := expfmt.MetricFamilyToText(writer, metric)
if err != nil {
logger.Errorw("error writing metric family", err)
return "", err
}
totalCnt += cnt
}

// Get the rendered metrics as a string from the StringWriter
return writer.String(), nil
}
Loading