Skip to content

Commit

Permalink
Send analytics events (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Nov 13, 2024
1 parent 3950d6c commit d1c09b3
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 120 deletions.
2 changes: 1 addition & 1 deletion cmd/livekit-sip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func runService(c *cli.Context) error {
return err
}

sipsrv, err := sip.NewService(conf, mon, log)
sipsrv, err := sip.NewService(conf, mon, log, func(projectID string) rpc.IOInfoClient { return psrpcClient })
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/livekit/sip

go 1.22.7

toolchain go1.22.8
toolchain go1.23.1

require (
github.com/at-wat/ebml-go v0.17.1
Expand All @@ -13,7 +13,7 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929
github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
Expand Down Expand Up @@ -115,7 +115,7 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929 h1:qunGP1opJF4NfcJW5tLyV/cLE/npfnh7P3nyyLECb68=
github.com/livekit/protocol v1.27.1-0.20241025171050-32abc4d3e929/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c h1:hDKTElZzPu6nFf0OdF7mTnF7ZqiqwXwavuY9u/P/9O8=
github.com/livekit/protocol v1.27.2-0.20241112203928-f558b991de7c/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down Expand Up @@ -264,8 +264,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=
go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ=
go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U=
go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
74 changes: 68 additions & 6 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"context"
"fmt"
"log/slog"
"net/netip"
"sync"
"time"

"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/frostbyte73/core"
"golang.org/x/exp/maps"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
Expand All @@ -47,17 +50,19 @@ type Client struct {
activeCalls map[LocalTag]*outboundCall
byRemote map[RemoteTag]*outboundCall

handler Handler
handler Handler
getIOClient GetIOInfoClient
}

func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Client {
func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Client {
if log == nil {
log = logger.GetLogger()
}
c := &Client{
conf: conf,
log: log,
mon: mon,
getIOClient: getIOClient,
activeCalls: make(map[LocalTag]*outboundCall),
byRemote: make(map[RemoteTag]*outboundCall),
}
Expand Down Expand Up @@ -120,7 +125,7 @@ func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCrea
return c.createSIPParticipant(ctx, req)
}

func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) {
func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (resp *rpc.InternalCreateSIPParticipantResponse, retErr error) {
if !c.mon.CanAccept() {
return nil, siperrors.ErrUnavailable
}
Expand Down Expand Up @@ -150,6 +155,27 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
"toHost", req.Address,
"toUser", req.CallTo,
)

ioClient := c.getIOClient(req.ProjectId)

callInfo := c.createSIPCallInfo(req)
defer func() {
switch retErr {
case nil:
callInfo.CallStatus = livekit.SIPCallStatus_SCS_PARTICIPANT_JOINED
default:
callInfo.CallStatus = livekit.SIPCallStatus_SCS_ERROR
callInfo.DisconnectReason = livekit.DisconnectReason_UNKNOWN_REASON
callInfo.Error = retErr.Error()
}

if ioClient != nil {
ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{
CallInfo: callInfo,
})
}
}()

roomConf := RoomConfig{
WsUrl: req.WsUrl,
Token: req.Token,
Expand Down Expand Up @@ -178,13 +204,28 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
enabledFeatures: req.EnabledFeatures,
}
log.Infow("Creating SIP participant")
call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf)
call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, callInfo, ioClient)
if err != nil {
return nil, err
}
p := call.Participant()
// Start actual SIP call async.
go call.Start(context.WithoutCancel(ctx))
go func() {
call.Start(context.WithoutCancel(ctx))

if callInfo.Error != "" {
callInfo.CallStatus = livekit.SIPCallStatus_SCS_ERROR
} else {
callInfo.CallStatus = livekit.SIPCallStatus_SCS_DISCONNECTED
}

if ioClient != nil {
ioClient.UpdateSIPCallState(context.WithoutCancel(ctx), &rpc.UpdateSIPCallStateRequest{
CallInfo: callInfo,
})
}

}()

return &rpc.InternalCreateSIPParticipantResponse{
ParticipantId: p.ID,
Expand All @@ -194,6 +235,27 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea

}

func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo {
toUri := CreateURIFromUserAndAddress(req.CallTo, req.Address, TransportFrom(req.Transport))
fromiUri := URI{
User: req.Number,
Host: req.Hostname,
Addr: netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)),
}

callInfo := &livekit.SIPCallInfo{
CallId: req.SipCallId,
TrunkId: req.SipTrunkId,
RoomName: req.RoomName,
ParticipantIdentity: req.ParticipantIdentity,
ToUri: toUri.ToSIPUri(),
FromUri: fromiUri.ToSIPUri(),
CreatedAt: time.Now().UnixNano(),
}

return callInfo
}

func (c *Client) OnRequest(req *sip.Request, tx sip.ServerTransaction) bool {
switch req.Method {
default:
Expand All @@ -215,7 +277,7 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool {
}
call.log.Infow("BYE")
go func(call *outboundCall) {
call.CloseWithReason(CallHangup, "bye")
call.CloseWithReason(CallHangup, "bye", livekit.DisconnectReason_CLIENT_INITIATED)
}(call)
return true
}
Expand Down
Loading

0 comments on commit d1c09b3

Please sign in to comment.