Skip to content

Commit

Permalink
Fix init order for inbound calls and improve logging. (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Nov 30, 2023
1 parent e1f155d commit 074c99d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 30 deletions.
6 changes: 6 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -79,12 +80,17 @@ func (c *Client) Stop() error {

func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) {
if req.CallTo == "" {
logger.Infow("Disconnect SIP participant",
"roomName", req.RoomName, "participant", req.ParticipantId)
// Disconnect participant
if call := c.getCall(req.ParticipantId); call != nil {
call.Close()
}
return &rpc.InternalUpdateSIPParticipantResponse{}, nil
}
logger.Infow("Updating SIP participant",
"roomName", req.RoomName, "participant", req.ParticipantId,
"from", req.Number, "to", req.CallTo, "address", req.Address)
err := c.getOrCreateCall(req.ParticipantId).Update(ctx, sipOutboundConfig{
address: req.Address,
from: req.Number,
Expand Down
41 changes: 21 additions & 20 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,20 @@ type inboundCall struct {
src string
rtpConn *MediaConn
audioHandler atomic.Pointer[rtp.Handler]
dtmf chan byte // buffered; DTMF digits as characters
lkRoom atomic.Pointer[Room] // LiveKit room; only populated after correct pin is entered
dtmf chan byte // buffered; DTMF digits as characters
lkRoom *Room // LiveKit room; only active after correct pin is entered
done atomic.Bool
}

func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall {
c := &inboundCall{
s: s,
tag: tag,
from: from,
to: to,
src: src,
dtmf: make(chan byte, 10),
s: s,
tag: tag,
from: from,
to: to,
src: src,
dtmf: make(chan byte, 10),
lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts
}
return c
}
Expand Down Expand Up @@ -195,6 +196,12 @@ func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error
if err := offer.Unmarshal(offerData); err != nil {
return nil, err
}

// Encoding pipeline (LK -> SIP)
// Need to be created earlier to send the pin prompts.
s := rtp.NewMediaStreamOut[ulaw.Sample](conn, rtpPacketDur)
c.lkRoom.SetOutput(ulaw.Decode(s))

return sdpGenerateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
}

Expand Down Expand Up @@ -253,9 +260,9 @@ func (c *inboundCall) Close() error {

func (c *inboundCall) closeMedia() {
c.audioHandler.Store(nil)
if p := c.lkRoom.Load(); p != nil {
if p := c.lkRoom; p != nil {
p.Close()
c.lkRoom.Store(nil)
c.lkRoom = nil
}
c.rtpConn.Close()
close(c.dtmf)
Expand All @@ -274,27 +281,22 @@ func (c *inboundCall) HandleRTP(p *rtp.Packet) error {
}

func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error {
room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity)
err := c.lkRoom.Connect(c.s.conf, roomName, participantIdentity)
if err != nil {
return err
}
local, err := room.NewParticipant()
local, err := c.lkRoom.NewParticipant()
if err != nil {
_ = room.Close()
_ = c.lkRoom.Close()
return err
}
c.lkRoom.Store(room)

// Decoding pipeline (SIP -> LK)
lpcm := media.DecodePCM(local)
law := ulaw.Encode(lpcm)
var h rtp.Handler = rtp.NewMediaStreamIn(law)
c.audioHandler.Store(&h)

// Encoding pipeline (LK -> SIP)
s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur)
room.SetOutput(ulaw.Decode(s))

return nil
}

Expand All @@ -307,8 +309,7 @@ func (c *inboundCall) joinRoom(roomName, identity string) {
}

func (c *inboundCall) playAudio(frames []media.PCM16Sample) {
r := c.lkRoom.Load()
t := r.NewTrack()
t := c.lkRoom.NewTrack()
defer t.Close()
t.PlayAudio(frames)
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,28 @@ func (c *outboundCall) Update(ctx context.Context, sipNew sipOutboundConfig, lkN
if c.sipCur == sipNew && c.lkCur == lkNew {
return nil
}
if c.sipCur.address == "" || c.sipCur.to == "" {
if sipNew.address == "" || sipNew.to == "" {
logger.Infow("Shutdown of outbound SIP call",
"roomName", lkNew.roomName, "from", sipNew.from, "to", sipNew.to, "address", sipNew.address)
// shutdown the call
c.close()
return nil
}
if err := c.startMedia(); err != nil {
c.close()
return err
return fmt.Errorf("start media failed: %w", err)
}
if err := c.updateRoom(lkNew); err != nil {
c.close()
return err
return fmt.Errorf("update room failed: %w", err)
}
if err := c.updateSIP(sipNew); err != nil {
c.close()
return err
return fmt.Errorf("update SIP failed: %w", err)
}
c.relinkMedia()
logger.Infow("Outbound SIP update complete",
"roomName", lkNew.roomName, "from", sipNew.from, "to", sipNew.to, "address", sipNew.address)
return nil
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ type lkRoomConfig struct {
identity string
}

func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room, error) {
r := &Room{
identity: identity,
}
func NewRoom() *Room {
r := &Room{}
r.mix = mixer.NewMixer(func(data []byte) {
sample := media.LPCM16Sample(data)
if out := r.Output(); out == nil {
if out := r.Output(); out != nil {
_ = out.WriteSample(sample)
}
}, sampleRate)
return r
}

func (r *Room) Connect(conf *config.Config, roomName string, identity string) error {
r.identity = identity

room, err := lksdk.ConnectToRoom(conf.WsUrl,
lksdk.ConnectInfo{
Expand Down Expand Up @@ -82,9 +85,17 @@ func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room
},
)
if err != nil {
return nil, err
return err
}
r.room = room
return nil
}

func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room, error) {
r := NewRoom()
if err := r.Connect(conf, roomName, identity); err != nil {
return nil, err
}
return r, nil
}

Expand Down

0 comments on commit 074c99d

Please sign in to comment.