Skip to content

Commit

Permalink
Wait for handler to register (#555)
Browse files Browse the repository at this point in the history
* wait for handler to register

* actually wait

* fix socket address

* refactor process launch

* update HandlerReady
  • Loading branch information
frostbyte73 authored Dec 7, 2023
1 parent 0995239 commit 8a54fdb
Show file tree
Hide file tree
Showing 17 changed files with 1,095 additions and 673 deletions.
7 changes: 4 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -202,7 +203,7 @@ func runHandler(c *cli.Context) error {
if err != nil {
return err
}
handler, err := service.NewHandler(conf, bus, ioClient)
h, err := handler.NewHandler(conf, bus, ioClient)
if err != nil {
if errors.IsFatal(err) {
// service will send info update and shut down
Expand All @@ -217,8 +218,8 @@ func runHandler(c *cli.Context) error {
go func() {
sig := <-killChan
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
handler.Kill()
h.Kill()
}()

return handler.Run()
return h.Run()
}
13 changes: 6 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/livekit/livekit-server v1.5.1-0.20231026153736-8b16db227070
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876
github.com/livekit/psrpc v0.5.2
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/rtp v1.8.3
Expand All @@ -32,7 +32,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/exp v0.0.0-20231127185646-65229373498e
google.golang.org/api v0.130.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -79,7 +79,6 @@ require (
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand All @@ -98,7 +97,7 @@ require (
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/redis/go-redis/v9 v9.3.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
Expand All @@ -107,12 +106,12 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
26 changes: 12 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258 h1:LiuPyPbV1WVBn7RBhrwSicPgDgkts/xjL06JR2ELriM=
github.com/livekit/protocol v1.9.4-0.20231205184613-4bd1ed6a0258/go.mod h1:Lzra2vdFeXkKy5oFb7oXRKVqCZ+uT8w9hbZO4mEy0+Q=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876 h1:NnbpPgxDHOcSdgW0JzBkc4QzzLVAe4sOaiYqUUH0/K4=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876/go.mod h1:SzrmeWw8sbf99laJJNMwp+5izlvh/ynlMbVOX0JUoes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY=
Expand All @@ -198,8 +198,6 @@ github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
Expand Down Expand Up @@ -272,8 +270,8 @@ github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLq
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -332,11 +330,11 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -371,8 +369,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
Expand Down Expand Up @@ -412,8 +410,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
132 changes: 132 additions & 0 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"path"
"strings"
"time"

"github.com/frostbyte73/core"
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"
)

type Handler struct {
ipc.UnimplementedEgressHandlerServer

conf *config.PipelineConfig
pipeline *pipeline.Controller
rpcServer rpc.EgressHandlerServer
ipcHandlerServer *grpc.Server
ipcServiceClient ipc.EgressServiceClient
ioClient rpc.IOInfoClient
kill core.Fuse
}

func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Handler, error) {
ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID))
if err != nil {
return nil, err
}

h := &Handler{
conf: conf,
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
kill: core.NewFuse(),
}

ipc.RegisterEgressHandlerServer(h.ipcHandlerServer, h)
if err = ipc.StartHandlerListener(h.ipcHandlerServer, conf.TmpDir); err != nil {
return nil, err
}

rpcServer, err := rpc.NewEgressHandlerServer(h, bus)
if err != nil {
return nil, errors.Fatal(err)
}
if err = rpcServer.RegisterUpdateStreamTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
}
if err = rpcServer.RegisterStopEgressTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
}
h.rpcServer = rpcServer

_, err = h.ipcServiceClient.HandlerReady(context.Background(), &ipc.HandlerReadyRequest{EgressId: conf.Info.EgressId})
if err != nil {
logger.Errorw("failed to notify service", err)
return nil, err
}

h.pipeline, err = pipeline.New(context.Background(), conf, h.ioClient)
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
now := time.Now().UnixNano()
conf.Info.UpdatedAt = now
conf.Info.EndedAt = now
conf.Info.Status = livekit.EgressStatus_EGRESS_FAILED
conf.Info.Error = err.Error()
_, _ = h.ioClient.UpdateEgress(context.Background(), conf.Info)
}
return nil, err
}

return h, nil
}

func (h *Handler) Run() error {
ctx, span := tracer.Start(context.Background(), "Handler.Run")
defer span.End()

// start egress
result := make(chan *livekit.EgressInfo, 1)
go func() {
result <- h.pipeline.Run(ctx)
}()

kill := h.kill.Watch()
for {
select {
case <-kill:
// kill signal received
h.pipeline.SendEOS(ctx)

case res := <-result:
// recording finished
_, _ = h.ioClient.UpdateEgress(ctx, res)
h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
return nil
}
}
}

func (h *Handler) Kill() {
h.kill.Break()
}
113 changes: 113 additions & 0 deletions pkg/handler/handler_ipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/pprof"
"github.com/livekit/protocol/tracer"
)

func (h *Handler) GetPipelineDot(ctx context.Context, _ *ipc.GstPipelineDebugDotRequest) (*ipc.GstPipelineDebugDotResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}

res := make(chan string, 1)
go func() {
res <- h.pipeline.GetGstPipelineDebugDot()
}()

select {
case r := <-res:
return &ipc.GstPipelineDebugDotResponse{
DotFile: r,
}, nil

case <-time.After(2 * time.Second):
return nil, status.New(codes.DeadlineExceeded, "timed out requesting pipeline debug info").Err()
}
}

func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PProfResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPProf")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}

b, err := pprof.GetProfileData(ctx, req.ProfileName, int(req.Timeout), int(req.Debug))
if err != nil {
return nil, err
}

return &ipc.PProfResponse{
PprofFile: b,
}, nil
}

// GetMetrics implement the handler-side gathering of metrics to return over IPC
func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc.MetricsResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetMetrics")
defer span.End()

metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return nil, err
}

metricsAsString, err := renderMetrics(metrics)
if err != nil {
return nil, err
}

return &ipc.MetricsResponse{
Metrics: metricsAsString,
}, nil
}

func renderMetrics(metrics []*dto.MetricFamily) (string, error) {
// Create a StringWriter to render the metrics into text format
writer := &strings.Builder{}
totalCnt := 0
for _, metric := range metrics {
// Write each metric family to text
cnt, err := expfmt.MetricFamilyToText(writer, metric)
if err != nil {
logger.Errorw("error writing metric family", err)
return "", err
}
totalCnt += cnt
}

// Get the rendered metrics as a string from the StringWriter
return writer.String(), nil
}
Loading

0 comments on commit 8a54fdb

Please sign in to comment.