From d1c09b3d89d99c4348f011608d532e93044bf781 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 13 Nov 2024 07:58:21 -0800 Subject: [PATCH] Send analytics events (#216) --- cmd/livekit-sip/main.go | 2 +- go.mod | 6 +- go.sum | 8 +- pkg/sip/client.go | 74 ++++++++++++-- pkg/sip/inbound.go | 182 ++++++++++++++++++++++++----------- pkg/sip/outbound.go | 106 +++++++++++--------- pkg/sip/server.go | 4 +- pkg/sip/service.go | 8 +- pkg/sip/service_test.go | 2 +- pkg/sip/types.go | 39 ++++++++ test/integration/sip_test.go | 2 +- 11 files changed, 313 insertions(+), 120 deletions(-) diff --git a/cmd/livekit-sip/main.go b/cmd/livekit-sip/main.go index e0b7c50..977de9c 100644 --- a/cmd/livekit-sip/main.go +++ b/cmd/livekit-sip/main.go @@ -91,7 +91,7 @@ func runService(c *cli.Context) error { return err } - sipsrv, err := sip.NewService(conf, mon, log) + sipsrv, err := sip.NewService(conf, mon, log, func(projectID string) rpc.IOInfoClient { return psrpcClient }) if err != nil { return err } diff --git a/go.mod b/go.mod index 98d57f8..c0c9951 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/livekit/sip go 1.22.7 -toolchain go1.22.8 +toolchain go1.23.1 require ( github.com/at-wat/ebml-go v0.17.1 @@ -13,7 +13,7 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929 + github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 @@ -115,7 +115,7 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - go.uber.org/zap/exp v0.2.0 // indirect + go.uber.org/zap/exp v0.3.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.30.0 // indirect diff --git a/go.sum b/go.sum index de17767..9d3a350 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929 h1:qunGP1opJF4NfcJW5tLyV/cLE/npfnh7P3nyyLECb68= -github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c h1:hDKTElZzPu6nFf0OdF7mTnF7ZqiqwXwavuY9u/P/9O8= +github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= @@ -264,8 +264,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= -go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= +go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U= +go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 274fe20..10a44d9 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -18,13 +18,16 @@ import ( "context" "fmt" "log/slog" + "net/netip" "sync" + "time" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/frostbyte73/core" "golang.org/x/exp/maps" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" @@ -47,10 +50,11 @@ type Client struct { activeCalls map[LocalTag]*outboundCall byRemote map[RemoteTag]*outboundCall - handler Handler + handler Handler + getIOClient GetIOInfoClient } -func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Client { +func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Client { if log == nil { log = logger.GetLogger() } @@ -58,6 +62,7 @@ func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Clie conf: conf, log: log, mon: mon, + getIOClient: getIOClient, activeCalls: make(map[LocalTag]*outboundCall), byRemote: make(map[RemoteTag]*outboundCall), } @@ -120,7 +125,7 @@ func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCrea return c.createSIPParticipant(ctx, req) } -func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) { +func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (resp *rpc.InternalCreateSIPParticipantResponse, retErr error) { if !c.mon.CanAccept() { return nil, siperrors.ErrUnavailable } @@ -150,6 +155,27 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea "toHost", req.Address, "toUser", req.CallTo, ) + + ioClient := c.getIOClient(req.ProjectId) + + callInfo := c.createSIPCallInfo(req) + defer func() { + switch retErr { + case nil: + callInfo.CallStatus = livekit.SIPCallStatus_SCS_PARTICIPANT_JOINED + default: + callInfo.CallStatus = livekit.SIPCallStatus_SCS_ERROR + callInfo.DisconnectReason = livekit.DisconnectReason_UNKNOWN_REASON + callInfo.Error = retErr.Error() + } + + if ioClient != nil { + ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{ + CallInfo: callInfo, + }) + } + }() + roomConf := RoomConfig{ WsUrl: req.WsUrl, Token: req.Token, @@ -178,13 +204,28 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea enabledFeatures: req.EnabledFeatures, } log.Infow("Creating SIP participant") - call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf) + call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, callInfo, ioClient) if err != nil { return nil, err } p := call.Participant() // Start actual SIP call async. - go call.Start(context.WithoutCancel(ctx)) + go func() { + call.Start(context.WithoutCancel(ctx)) + + if callInfo.Error != "" { + callInfo.CallStatus = livekit.SIPCallStatus_SCS_ERROR + } else { + callInfo.CallStatus = livekit.SIPCallStatus_SCS_DISCONNECTED + } + + if ioClient != nil { + ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{ + CallInfo: callInfo, + }) + } + + }() return &rpc.InternalCreateSIPParticipantResponse{ ParticipantId: p.ID, @@ -194,6 +235,27 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea } +func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo { + toUri := CreateURIFromUserAndAddress(req.CallTo, req.Address, TransportFrom(req.Transport)) + fromiUri := URI{ + User: req.Number, + Host: req.Hostname, + Addr: netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)), + } + + callInfo := &livekit.SIPCallInfo{ + CallId: req.SipCallId, + TrunkId: req.SipTrunkId, + RoomName: req.RoomName, + ParticipantIdentity: req.ParticipantIdentity, + ToUri: toUri.ToSIPUri(), + FromUri: fromiUri.ToSIPUri(), + CreatedAt: time.Now().UnixNano(), + } + + return callInfo +} + func (c *Client) OnRequest(req *sip.Request, tx sip.ServerTransaction) bool { switch req.Method { default: @@ -215,7 +277,7 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool { } call.log.Infow("BYE") go func(call *outboundCall) { - call.CloseWithReason(CallHangup, "bye") + call.CloseWithReason(CallHangup, "bye", livekit.DisconnectReason_CLIENT_INITIATED) }(call) return true } diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 00223d5..3629c2d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -16,7 +16,6 @@ package sip import ( "context" - "errors" "fmt" "math" "net/netip" @@ -28,6 +27,7 @@ import ( "github.com/emiago/sipgo/sip" "github.com/frostbyte73/core" "github.com/icholy/digest" + "github.com/pkg/errors" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -119,13 +119,33 @@ func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.Se } func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { + callInfo, ioClient, err := s.processInvite(req, tx) + + if callInfo != nil { + if err != nil { + callInfo.CallStatus = livekit.SIPCallStatus_SCS_ERROR + callInfo.Error = err.Error() + } else { + callInfo.CallStatus = livekit.SIPCallStatus_SCS_DISCONNECTED + } + callInfo.EndedAt = time.Now().UnixNano() + + if ioClient != nil { + ioClient.UpdateSIPCallState(context.Background(), &rpc.UpdateSIPCallStateRequest{ + CallInfo: callInfo, + }) + } + } +} + +func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (*livekit.SIPCallInfo, rpc.IOInfoClient, error) { ctx := context.Background() s.mon.InviteReqRaw(stats.Inbound) src, err := netip.ParseAddrPort(req.Source()) if err != nil { tx.Terminate() s.log.Errorw("cannot parse source IP", err, "fromIP", src) - return + return nil, nil, psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP")) } callID := lksip.NewCallID() log := s.log.WithValues( @@ -140,13 +160,21 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { log = LoggerWithHeaders(log, cc) log.Infow("processing invite") + callInfo := &livekit.SIPCallInfo{ + CallId: string(cc.ID()), + FromUri: CreateURIFromUserAndAddress(cc.From().User, src.String(), tr).ToSIPUri(), + ToUri: CreateURIFromUserAndAddress(cc.To().User, cc.To().Host, tr).ToSIPUri(), + CallStatus: livekit.SIPCallStatus_SCS_CALL_INCOMING, + CreatedAt: time.Now().UnixNano(), + } + if err := cc.ValidateInvite(); err != nil { if s.conf.HideInboundPort { cc.Drop() } else { cc.RespondAndDrop(sip.StatusBadRequest, "Bad request") } - return + return callInfo, nil, psrpc.NewError(psrpc.InvalidArgument, errors.Wrap(err, "invite validation failed")) } ctx, span := tracer.Start(ctx, "Server.onInvite") defer span.End() @@ -167,25 +195,34 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { cmon.InviteErrorShort("auth-error") log.Warnw("Rejecting inbound, auth check failed", err) cc.RespondAndDrop(sip.StatusServiceUnavailable, "Try again later") - return + return callInfo, nil, psrpc.NewError(psrpc.PermissionDenied, errors.Wrap(err, "rejecting inbound, auth check failed")) } if r.ProjectID != "" { log = log.WithValues("projectID", r.ProjectID) } if r.TrunkID != "" { log = log.WithValues("sipTrunk", r.TrunkID) + callInfo.TrunkId = r.TrunkID + } + + ioClient := s.getIOClient(r.ProjectID) + if ioClient != nil { + ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{ + CallInfo: callInfo, + }) } + switch r.Result { case AuthDrop: cmon.InviteErrorShort("flood") log.Debugw("Dropping inbound flood") cc.Drop() - return + return callInfo, ioClient, psrpc.NewErrorf(psrpc.PermissionDenied, "call was not authorized by trunk configuration") case AuthNotFound: cmon.InviteErrorShort("no-rule") log.Warnw("Rejecting inbound, doesn't match any Trunks", nil) cc.RespondAndDrop(sip.StatusNotFound, "Does not match any SIP Trunks") - return + return callInfo, ioClient, psrpc.NewErrorf(psrpc.NotFound, "no trunk configuration for call") case AuthPassword: if s.conf.HideInboundPort { // We will send password request anyway, so might as well signal that the progress is made. @@ -194,16 +231,21 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password) { cmon.InviteErrorShort("unauthorized") // handleInviteAuth will generate the SIP Response as needed - return + return callInfo, ioClient, psrpc.NewErrorf(psrpc.PermissionDenied, "invalid crendentials were provided") } fallthrough case AuthAccept: // ok } - call := s.newInboundCall(log, cmon, cc, src, nil) + call := s.newInboundCall(log, cmon, cc, src, callInfo, ioClient, nil) call.joinDur = joinDur - call.handleInvite(call.ctx, req, r.TrunkID, s.conf) + err = call.handleInvite(call.ctx, req, r.TrunkID, s.conf) + if err != nil { + return callInfo, ioClient, err + } + + return callInfo, ioClient, nil } func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) { @@ -267,9 +309,11 @@ type inboundCall struct { log logger.Logger cc *sipInbound mon *stats.CallMonitor + ioClient rpc.IOInfoClient extraAttrs map[string]string ctx context.Context cancel func() + callInfo *livekit.SIPCallInfo src netip.AddrPort media *MediaPort dtmf chan dtmf.Event // buffered @@ -286,6 +330,8 @@ func (s *Server) newInboundCall( mon *stats.CallMonitor, cc *sipInbound, src netip.AddrPort, + callInfo *livekit.SIPCallInfo, + ioClient rpc.IOInfoClient, extra map[string]string, ) *inboundCall { @@ -296,6 +342,8 @@ func (s *Server) newInboundCall( mon: mon, cc: cc, src: src, + callInfo: callInfo, + ioClient: ioClient, extraAttrs: extra, dtmf: make(chan dtmf.Event, 10), lkRoom: NewRoom(log), // we need it created earlier so that the audio mixer is available for pin prompts @@ -308,7 +356,7 @@ func (s *Server) newInboundCall( return c } -func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkID string, conf *config.Config) { +func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkID string, conf *config.Config) error { c.mon.InviteAccept() c.mon.CallStart() defer c.mon.CallEnd() @@ -336,23 +384,28 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI if disp.DispatchRuleID != "" { c.log = c.log.WithValues("sipRule", disp.DispatchRuleID) } + + c.callInfo.RoomName = disp.Room.RoomName + c.callInfo.ParticipantIdentity = disp.Room.Participant.Identity + var pinPrompt bool switch disp.Result { default: - c.log.Errorw("Rejecting inbound call", fmt.Errorf("unexpected dispatch result: %v", disp.Result)) + err := fmt.Errorf("unexpected dispatch result: %v", disp.Result) + c.log.Errorw("Rejecting inbound call", err) c.cc.RespondAndDrop(sip.StatusNotImplemented, "") c.close(true, callDropped, "unexpected-result") - return + return psrpc.NewError(psrpc.Unimplemented, err) case DispatchNoRuleDrop: c.log.Debugw("Rejecting inbound flood") c.cc.Drop() c.close(false, callFlood, "flood") - return + return psrpc.NewErrorf(psrpc.PermissionDenied, "call was not authorized by trunk configuration") case DispatchNoRuleReject: c.log.Infow("Rejecting inbound call, doesn't match any Dispatch Rules") c.cc.RespondAndDrop(sip.StatusNotFound, "Does not match Trunks or Dispatch Rules") c.close(false, callDropped, "no-dispatch") - return + return psrpc.NewErrorf(psrpc.NotFound, "no trunk configuration for call") case DispatchAccept: pinPrompt = false case DispatchRequestPin: @@ -365,30 +418,30 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI c.log.Errorw("Cannot start media", err) c.cc.RespondAndDrop(sip.StatusInternalServerError, "") c.close(true, callDropped, "media-failed") - return + return err } - acceptCall := func() bool { + acceptCall := func() (bool, error) { c.log.Infow("Accepting the call", "headers", disp.Headers) if err = c.cc.Accept(ctx, answerData, disp.Headers); err != nil { c.log.Errorw("Cannot respond to INVITE", err) - return false + return false, err } c.media.EnableTimeout(true) - if !c.waitMedia(ctx) { - return false + if ok, err := c.waitMedia(ctx); !ok { + return false, err } - return true + return true, nil } + ok := false if pinPrompt { // Accept the call first on the SIP side, so that we can send audio prompts. - if !acceptCall() { - return // already sent a response + if ok, err = acceptCall(); !ok { + return err // could be success if the caller hung up } - var ok bool - disp, ok = c.pinPrompt(ctx, trunkID) + disp, ok, err = c.pinPrompt(ctx, trunkID) if !ok { - return // already sent a response + return err // already sent a response. Could be success if user hung up } } if len(disp.HeadersToAttributes) != 0 { @@ -411,38 +464,52 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI } ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) defer cancel() - if !c.joinRoom(ctx, disp.Room) { - return // already sent a response + if err := c.joinRoom(ctx, disp.Room); err != nil { + return errors.Wrap(err, "failed joining room") } // Publish our own track. if err := c.publishTrack(); err != nil { c.log.Errorw("Cannot publish track", err) c.close(true, callDropped, "publish-failed") - return + return errors.Wrap(err, "publishing track to room failed") } c.lkRoom.Subscribe() if !pinPrompt { c.log.Infow("Waiting for track subscription(s)") // For dispatches without pin, we first wait for LK participant to become available, // and also for at least one track subscription. In the meantime we keep ringing. - if !c.waitSubscribe(ctx, disp.RingingTimeout) { - return // already sent a response + if ok, err := c.waitSubscribe(ctx, disp.RingingTimeout); !ok { + return err // already sent a response. Could be success if caller hung up } - if !acceptCall() { - return // already sent a response + if ok, err := acceptCall(); !ok { + return err // already sent a response. Could be success if caller hung up } } + c.callInfo.RoomId = c.lkRoom.room.SID() + c.callInfo.StartedAt = time.Now().UnixNano() + c.callInfo.CallStatus = livekit.SIPCallStatus_SCS_ACTIVE + c.started.Break() + if c.ioClient != nil { + c.ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{ + CallInfo: c.callInfo, + }) + } + // Wait for the caller to terminate the call. select { case <-ctx.Done(): c.closeWithHangup() + return nil case <-c.lkRoom.Closed(): + c.callInfo.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED c.close(false, callDropped, "removed") + return nil case <-c.media.Timeout(): c.closeWithTimeout() + return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout") } } @@ -488,7 +555,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config, featur return answerData, nil } -func (c *inboundCall) waitMedia(ctx context.Context) bool { +func (c *inboundCall) waitMedia(ctx context.Context) (bool, error) { // Wait for either a first RTP packet or a predefined delay. // // If the delay kicks in earlier than the caller is ready, they might miss some audio packets. @@ -503,23 +570,23 @@ func (c *inboundCall) waitMedia(ctx context.Context) bool { select { case <-c.cc.Cancelled(): c.closeWithCancelled() - return false + return false, nil // caller hung up case <-ctx.Done(): c.closeWithHangup() - return false + return false, nil // caller hung up case <-c.lkRoom.Closed(): c.closeWithHangup() - return false + return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): c.closeWithTimeout() - return false + return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out") case <-c.media.Received(): case <-delay.C: } - return true + return true, nil } -func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) bool { +func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) (bool, error) { ctx, span := tracer.Start(ctx, "inboundCall.waitSubscribe") defer span.End() timer := time.NewTimer(timeout) @@ -527,25 +594,25 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) select { case <-c.cc.Cancelled(): c.closeWithCancelled() - return false + return false, nil case <-ctx.Done(): c.closeWithHangup() - return false + return false, nil case <-c.lkRoom.Closed(): c.closeWithHangup() - return false + return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): c.closeWithTimeout() - return false + return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out") case <-timer.C: c.close(false, callDropped, "cannot-subscribe") - return false + return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "room subscription timed out") case <-c.lkRoom.Subscribed(): - return true + return true, nil } } -func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallDispatch, _ bool) { +func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallDispatch, _ bool, _ error) { ctx, span := tracer.Start(ctx, "inboundCall.pinPrompt") defer span.End() c.log.Infow("Requesting Pin for SIP call") @@ -557,17 +624,18 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD select { case <-c.cc.Cancelled(): c.closeWithCancelled() - return disp, false + return disp, false, nil case <-ctx.Done(): c.closeWithHangup() - return disp, false + c.callInfo.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED + return disp, false, nil case <-c.media.Timeout(): c.closeWithTimeout() - return disp, false + return disp, false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout") case b, ok := <-c.dtmf: if !ok { c.Close() - return disp, false + return disp, false, psrpc.NewErrorf(psrpc.Canceled, "failed reading DTMF event") } if b.Digit == 0 { continue // unrecognized @@ -600,17 +668,17 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD c.log.Infow("Rejecting call", "pin", pin, "noPin", noPin) c.playAudio(ctx, c.s.res.wrongPin) c.close(false, callDropped, "wrong-pin") - return disp, false + return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin") } c.playAudio(ctx, c.s.res.roomJoin) - return disp, true + return disp, true, nil } // Gather pin numbers pin += string(b.Digit) if len(pin) > pinLimit { c.playAudio(ctx, c.s.res.wrongPin) c.close(false, callDropped, "wrong-pin") - return disp, false + return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin") } } } @@ -652,10 +720,12 @@ func (c *inboundCall) closeWithTimeout() { } func (c *inboundCall) closeWithCancelled() { + c.callInfo.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED c.close(false, CallHangup, "cancelled") } func (c *inboundCall) closeWithHangup() { + c.callInfo.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED c.close(false, CallHangup, "hangup") } @@ -729,7 +799,7 @@ func (c *inboundCall) publishTrack() error { return nil } -func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) bool { +func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) error { if c.joinDur != nil { c.joinDur() } @@ -743,9 +813,9 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) bool { if err := c.createLiveKitParticipant(ctx, rconf); err != nil { c.log.Errorw("Cannot create LiveKit participant", err) c.close(true, callDropped, "participant-failed") - return false + return errors.Wrap(err, "cannot create LiveKit participant") } - return true + return nil } func (c *inboundCall) playAudio(ctx context.Context, frames []media.PCM16Sample) { diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 9ddc57d..ca61c84 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -16,7 +16,6 @@ package sip import ( "context" - "errors" "fmt" "math" "sort" @@ -26,6 +25,7 @@ import ( "github.com/emiago/sipgo/sip" "github.com/frostbyte73/core" "github.com/icholy/digest" + "github.com/pkg/errors" "golang.org/x/exp/maps" "github.com/livekit/protocol/livekit" @@ -60,13 +60,15 @@ type sipOutboundConfig struct { } type outboundCall struct { - c *Client - log logger.Logger - cc *sipOutbound - media *MediaPort - started core.Fuse - stopped core.Fuse - closing core.Fuse + c *Client + log logger.Logger + ioClient rpc.IOInfoClient + cc *sipOutbound + media *MediaPort + callInfo *livekit.SIPCallInfo + started core.Fuse + stopped core.Fuse + closing core.Fuse mu sync.RWMutex mon *stats.CallMonitor @@ -75,13 +77,14 @@ type outboundCall struct { sipConf sipOutboundConfig } -func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) { +func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, callInfo *livekit.SIPCallInfo, ioClient rpc.IOInfoClient) (*outboundCall, error) { if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration { sipConf.maxCallDuration = maxCallDuration } if sipConf.ringingTimeout <= 0 { sipConf.ringingTimeout = defaultRingingTimeout } + tr := TransportFrom(sipConf.transport) contact := c.ContactURI(tr) if sipConf.host == "" { @@ -96,10 +99,14 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo Addr: contact.Addr, Transport: tr, }, contact), - sipConf: sipConf, + sipConf: sipConf, + callInfo: callInfo, + ioClient: ioClient, } + call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address) var err error + call.media, err = NewMediaPort(call.log, call.mon, &MediaConfig{ IP: c.sconf.SignalingIP, Ports: conf.RTPPort, @@ -107,12 +114,12 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo MediaTimeout: c.conf.MediaTimeout, }, RoomSampleRate) if err != nil { - call.close(true, callDropped, "media-failed") + call.close(errors.Wrap(err, "media failed"), callDropped, "media-failed", livekit.DisconnectReason_UNKNOWN_REASON) return nil, err } call.media.SetDTMFAudio(conf.AudioDTMF) if err := call.connectToRoom(ctx, room); err != nil { - call.close(true, callDropped, "join-failed") + call.close(errors.Wrap(err, "room join failed"), callDropped, "join-failed", livekit.DisconnectReason_UNKNOWN_REASON) return nil, fmt.Errorf("update room failed: %w", err) } @@ -128,17 +135,27 @@ func (c *outboundCall) Start(ctx context.Context) { defer cancel() c.mon.CallStart() defer c.mon.CallEnd() - err := c.ConnectSIP(ctx) - if err != nil { - c.log.Infow("SIP call failed", "error", err) - c.CloseWithReason(callDropped, "connect error") - return + ok := c.ConnectSIP(ctx) + if !ok { + return // ConnectSIP updates the error code on the callInfo } + + c.callInfo.RoomId = c.lkRoom.room.SID() + c.callInfo.StartedAt = time.Now().UnixNano() + c.callInfo.CallStatus = livekit.SIPCallStatus_SCS_ACTIVE + + if c.ioClient != nil { + c.ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{ + CallInfo: c.callInfo, + }) + } + select { case <-c.Disconnected(): - c.CloseWithReason(callDropped, "removed") + c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED) case <-c.media.Timeout(): c.closeWithTimeout() + c.callInfo.Error = psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout").Error() case <-c.Closed(): } } @@ -155,37 +172,39 @@ func (c *outboundCall) Close() error { c.closing.Break() c.mu.Lock() defer c.mu.Unlock() - c.close(false, callDropped, "shutdown") + c.close(nil, callDropped, "shutdown", livekit.DisconnectReason_SERVER_SHUTDOWN) return nil } -func (c *outboundCall) CloseWithReason(status CallStatus, reason string) { +func (c *outboundCall) CloseWithReason(status CallStatus, description string, reason livekit.DisconnectReason) { c.mu.Lock() defer c.mu.Unlock() - c.close(false, status, reason) + c.close(nil, status, description, reason) } func (c *outboundCall) closeWithTimeout() { c.mu.Lock() defer c.mu.Unlock() - c.close(true, callDropped, "media-timeout") + c.close(psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), callDropped, "media-timeout", livekit.DisconnectReason_UNKNOWN_REASON) } -func (c *outboundCall) close(error bool, status CallStatus, reason string) { +func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) { c.stopped.Once(func() { c.setStatus(status) - if error { - c.log.Warnw("Closing outbound call with error", nil, "reason", reason) + if err != nil { + c.log.Warnw("Closing outbound call with error", nil, "reason", description) + c.callInfo.Error = err.Error() } else { - c.log.Infow("Closing outbound call", "reason", reason) + c.log.Infow("Closing outbound call", "reason", description) } + c.callInfo.DisconnectReason = reason c.media.Close() _ = c.lkRoom.CloseOutput() _ = c.lkRoom.CloseWithReason(status.DisconnectReason()) c.lkRoomIn = nil - c.stopSIP(reason) + c.stopSIP(description) c.c.cmu.Lock() delete(c.c.activeCalls, c.cc.ID()) @@ -204,33 +223,34 @@ func (c *outboundCall) Participant() ParticipantInfo { return c.lkRoom.Participant() } -func (c *outboundCall) ConnectSIP(ctx context.Context) error { +func (c *outboundCall) ConnectSIP(ctx context.Context) bool { ctx, span := tracer.Start(ctx, "outboundCall.ConnectSIP") defer span.End() c.mu.Lock() defer c.mu.Unlock() if err := c.dialSIP(ctx); err != nil { - status, reason := callDropped, "invite-failed" - isError := true + c.log.Infow("SIP call failed", "error", err) + + status, desc, reason := callDropped, "invite-failed", livekit.DisconnectReason_UNKNOWN_REASON var e *ErrorStatus if errors.As(err, &e) { switch e.StatusCode { case int(sip.StatusTemporarilyUnavailable): - status, reason = callUnavailable, "unavailable" - isError = false + status, desc, reason = callUnavailable, "unavailable", livekit.DisconnectReason_USER_UNAVAILABLE + err = nil case int(sip.StatusBusyHere): - status, reason = callRejected, "busy" - isError = false + status, desc, reason = callRejected, "busy", livekit.DisconnectReason_USER_REJECTED + err = nil } } - c.close(isError, status, reason) - return fmt.Errorf("update SIP failed: %w", err) + c.close(err, status, desc, reason) + return false } c.connectMedia() c.started.Break() c.lkRoom.Subscribe() c.log.Infow("Outbound SIP call established") - return nil + return true } func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) error { @@ -379,11 +399,10 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { joinDur := c.mon.JoinDur() c.mon.InviteReq() - sdpResp, err := c.cc.Invite(ctx, URI{ - User: c.sipConf.to, - Host: c.sipConf.address, - Transport: TransportFrom(c.sipConf.transport), - }, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer) + + toUri := CreateURIFromUserAndAddress(c.sipConf.to, c.sipConf.address, TransportFrom(c.sipConf.transport)) + + sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer) if err != nil { // TODO: should we retry? maybe new offer will work var e *ErrorStatus @@ -477,7 +496,7 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial c.log.Infow("outbound call tranferred", "transferTo", transferTo) // Give time for the peer to hang up first, but hang up ourselves if this doesn't happen within 1 second - time.AfterFunc(referByeTimeout, func() { c.CloseWithReason(CallHangup, "call transferred") }) + time.AfterFunc(referByeTimeout, func() { c.CloseWithReason(CallHangup, "call transferred", livekit.DisconnectReason_CLIENT_INITIATED) }) return nil } @@ -563,7 +582,6 @@ func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, hea defer span.End() c.mu.Lock() defer c.mu.Unlock() - to = to.Normalize() toHeader := &sip.ToHeader{Address: *to.GetURI()} dest := to.GetDest() diff --git a/pkg/sip/server.go b/pkg/sip/server.go index b57be6c..cbb83a9 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -113,6 +113,7 @@ type Server struct { log logger.Logger mon *stats.Monitor sipSrv *sipgo.Server + getIOClient GetIOInfoClient sipListeners []io.Closer sipUnhandled RequestHandler @@ -135,7 +136,7 @@ type inProgressInvite struct { challenge digest.Challenge } -func NewServer(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Server { +func NewServer(conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Server { if log == nil { log = logger.GetLogger() } @@ -143,6 +144,7 @@ func NewServer(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Serv log: log, conf: conf, mon: mon, + getIOClient: getIOClient, activeCalls: make(map[RemoteTag]*inboundCall), byLocal: make(map[LocalTag]*inboundCall), } diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 123b120..8d2ee77 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -58,7 +58,9 @@ type transferKey struct { TransferTo string } -func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) (*Service, error) { +type GetIOInfoClient func(projectID string) rpc.IOInfoClient + +func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger, getIOClient GetIOInfoClient) (*Service, error) { if log == nil { log = logger.GetLogger() } @@ -66,8 +68,8 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) (*Se conf: conf, log: log, mon: mon, - cli: NewClient(conf, log, mon), - srv: NewServer(conf, log, mon), + cli: NewClient(conf, log, mon, getIOClient), + srv: NewServer(conf, log, mon, getIOClient), pendingTransfers: make(map[transferKey]chan struct{}), } var err error diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index 7e2c9e9..abfe25b 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -95,7 +95,7 @@ func testInvite(t *testing.T, h Handler, hidden bool, from, to string, test func SIPPort: sipPort, SIPPortListen: sipPort, RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax}, - }, mon, logger.GetLogger()) + }, mon, logger.GetLogger(), func(projectID string) rpc.IOInfoClient { return nil }) require.NoError(t, err) require.NotNil(t, s) t.Cleanup(s.Stop) diff --git a/pkg/sip/types.go b/pkg/sip/types.go index 6c476da..83a84fa 100644 --- a/pkg/sip/types.go +++ b/pkg/sip/types.go @@ -49,6 +49,19 @@ func TransportFrom(t livekit.SIPTransport) Transport { return "" } +func SIPTransportFrom(t Transport) livekit.SIPTransport { + switch t { + case TransportUDP: + return livekit.SIPTransport_SIP_TRANSPORT_UDP + case TransportTCP: + return livekit.SIPTransport_SIP_TRANSPORT_TCP + case TransportTLS: + return livekit.SIPTransport_SIP_TRANSPORT_TLS + } + + return livekit.SIPTransport_SIP_TRANSPORT_AUTO +} + type Transport string const ( @@ -64,6 +77,18 @@ type URI struct { Transport Transport } +func CreateURIFromUserAndAddress(user string, address string, transport Transport) URI { + uri := URI{ + User: user, + Host: address, + Transport: transport, + } + + uri = uri.Normalize() + + return uri +} + func (u URI) Normalize() URI { if addr, sport, err := net.SplitHostPort(u.Host); err == nil { if port, err := strconv.Atoi(sport); err == nil { @@ -140,6 +165,20 @@ func (u URI) GetContactURI() *sip.Uri { return su } +func (u URI) ToSIPUri() *livekit.SIPUri { + url := &livekit.SIPUri{ + User: u.User, + Host: u.GetHost(), + Port: uint32(u.GetPort()), + Transport: SIPTransportFrom(u.Transport), + } + + if u.Addr.Addr().IsValid() { + url.Ip = u.Addr.Addr().String() + } + return url +} + type LocalTag string type RemoteTag string diff --git a/test/integration/sip_test.go b/test/integration/sip_test.go index 2199d13..5a0b3c0 100644 --- a/test/integration/sip_test.go +++ b/test/integration/sip_test.go @@ -79,7 +79,7 @@ func runSIPServer(t testing.TB, lk *LiveKit) *SIPServer { if err != nil { t.Fatal(err) } - sipsrv, err := sip.NewService(conf, mon, log) + sipsrv, err := sip.NewService(conf, mon, log, func(projectID string) rpc.IOInfoClient { return psrpcCli }) if err != nil { t.Fatal(err) }