Skip to content

Commit

Permalink
Fixed bug in register, and bump version for gosip.
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwebrtc committed Mar 24, 2021
1 parent cb97c36 commit 9e39ed6
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 27 deletions.
6 changes: 0 additions & 6 deletions examples/b2bua/b2bua/b2bua.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func NewB2BUA() *B2BUA {

// Could not found any records
sess.Reject(404, fmt.Sprintf("%v Not found", called))
break

// Handle re-INVITE or UPDATE.
case session.ReInviteReceived:
Expand All @@ -176,7 +175,6 @@ func NewB2BUA() *B2BUA {
case session.Outgoing:
//TODO: Need to provide correct answer.
}
break

// Handle 1XX
case session.EarlyMedia:
Expand All @@ -188,7 +186,6 @@ func NewB2BUA() *B2BUA {
call.src.ProvideAnswer(answer)
call.src.Provisional((*resp).StatusCode(), (*resp).Reason())
}
break

// Handle 200OK or ACK
case session.Confirmed:
Expand All @@ -199,7 +196,6 @@ func NewB2BUA() *B2BUA {
call.src.ProvideAnswer(answer)
call.src.Accept(200)
}
break

// Handle 4XX+
case session.Failure:
Expand All @@ -217,7 +213,6 @@ func NewB2BUA() *B2BUA {
}
}
b.removeCall(sess)
break

}
}
Expand Down Expand Up @@ -339,7 +334,6 @@ func (b *B2BUA) handleRegister(request sip.Request, tx sip.ServerTransaction) {
resp := sip.NewResponseFromRequest(request.MessageID(), request, 200, reason, "")
sip.CopyHeaders("Expires", request, resp)
util.BuildContactHeader("Contact", request, resp, &expires)
sip.CopyHeaders("Content-Length", request, resp)
tx.Respond(resp)

}
Expand Down
183 changes: 183 additions & 0 deletions examples/b2bua/registry/expire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package registry

import (
"log"
"sync"
"sync/atomic"
"time"
)

type val struct {
data interface{}
expiredTime int64
}

const delChannelCap = 100

type ExpiredMap struct {
m map[interface{}]*val
timeMap map[int64][]interface{}
lck *sync.Mutex
stop chan struct{}
needStop int32
}

func NewExpiredMap() *ExpiredMap {
e := ExpiredMap{
m: make(map[interface{}]*val),
lck: new(sync.Mutex),
timeMap: make(map[int64][]interface{}),
stop: make(chan struct{}),
}
atomic.StoreInt32(&e.needStop, 0)
go e.run(time.Now().Unix())
return &e
}

type delMsg struct {
keys []interface{}
t int64
}

func (e *ExpiredMap) run(now int64) {
t := time.NewTicker(time.Second * 1)
delCh := make(chan *delMsg, delChannelCap)
go func() {
for v := range delCh {
if atomic.LoadInt32(&e.needStop) == 1 {
log.Printf("---del stop---")
return
}
e.multiDelete(v.keys, v.t)
}
}()
for {
select {
case <-t.C:
now++
if keys, found := e.timeMap[now]; found {
delCh <- &delMsg{keys: keys, t: now}
}
case <-e.stop:
log.Printf("=== STOP ===")
atomic.StoreInt32(&e.needStop, 1)
delCh <- &delMsg{keys: []interface{}{}, t: 0}
return
}
}
}

func (e *ExpiredMap) Set(key, value interface{}, expireSeconds int64) {
if expireSeconds <= 0 {
return
}
log.Printf("ExpiredMap: Set %s ttl[%d] => %v", key, expireSeconds, value)
e.lck.Lock()
defer e.lck.Unlock()
expiredTime := time.Now().Unix() + expireSeconds
e.m[key] = &val{
data: value,
expiredTime: expiredTime,
}
e.timeMap[expiredTime] = append(e.timeMap[expiredTime], key)
}

func (e *ExpiredMap) Get(key interface{}) (found bool, value interface{}) {
e.lck.Lock()
defer e.lck.Unlock()
if found = e.checkDeleteKey(key); !found {
return
}
value = e.m[key].data
return
}

func (e *ExpiredMap) Delete(key interface{}) {
e.lck.Lock()
delete(e.m, key)
e.lck.Unlock()
}

func (e *ExpiredMap) Remove(key interface{}) {
e.Delete(key)
}

func (e *ExpiredMap) multiDelete(keys []interface{}, t int64) {
e.lck.Lock()
defer e.lck.Unlock()
delete(e.timeMap, t)
for _, key := range keys {
delete(e.m, key)
}
}

func (e *ExpiredMap) Length() int {
e.lck.Lock()
defer e.lck.Unlock()
return len(e.m)
}

func (e *ExpiredMap) Size() int {
return e.Length()
}

func (e *ExpiredMap) TTL(key interface{}) int64 {
e.lck.Lock()
defer e.lck.Unlock()
if !e.checkDeleteKey(key) {
return -1
}
return e.m[key].expiredTime - time.Now().Unix()
}

func (e *ExpiredMap) Clear() {
e.lck.Lock()
defer e.lck.Unlock()
e.m = make(map[interface{}]*val)
e.timeMap = make(map[int64][]interface{})
}

func (e *ExpiredMap) Close() {
e.lck.Lock()
defer e.lck.Unlock()
e.stop <- struct{}{}
}

func (e *ExpiredMap) Stop() {
e.Close()
}

func (e *ExpiredMap) DoForEach(handler func(interface{}, interface{})) {
e.lck.Lock()
defer e.lck.Unlock()
for k, v := range e.m {
if !e.checkDeleteKey(k) {
continue
}
handler(k, v)
}
}

func (e *ExpiredMap) DoForEachWithBreak(handler func(interface{}, interface{}) bool) {
e.lck.Lock()
defer e.lck.Unlock()
for k, v := range e.m {
if !e.checkDeleteKey(k) {
continue
}
if handler(k, v) {
break
}
}
}

func (e *ExpiredMap) checkDeleteKey(key interface{}) bool {
if val, found := e.m[key]; found {
if val.expiredTime <= time.Now().Unix() {
delete(e.m, key)
return false
}
return true
}
return false
}
7 changes: 3 additions & 4 deletions examples/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {
logger.Error(err)
}

go ua.SendRegister(profile, recipient, profile.Expires, nil)
register, _ := ua.SendRegister(profile, recipient, profile.Expires, nil)
time.Sleep(time.Second * 3)

udp = createUdp()
Expand All @@ -126,10 +126,9 @@ func main() {

go ua.Invite(profile, called, recipient, &sdp)

time.Sleep(time.Second * 3)
go ua.SendRegister(profile, recipient, 0, nil)

<-stop

register.SendRegister(0)

ua.Shutdown()
}
4 changes: 4 additions & 0 deletions examples/register/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func main() {

time.Sleep(time.Second * 5)

register.SendRegister(0)

time.Sleep(time.Second * 5)

register.SendRegister(300)

time.Sleep(time.Second * 5)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
cloud.google.com/go/firestore v1.4.0 // indirect
firebase.google.com/go v3.13.0+incompatible
github.com/c-bata/go-prompt v0.2.5
github.com/ghettovoice/gosip v0.0.0-20210220085524-cafc4013be1d
github.com/ghettovoice/gosip v0.0.0-20210322152317-7858ea172631
github.com/google/uuid v1.2.0
github.com/kr/pretty v0.2.0 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghettovoice/gosip v0.0.0-20210220085524-cafc4013be1d h1:WyXK6/eezdyMDFjmtCRW3GksfCg/Dykl86uslEwWiFU=
github.com/ghettovoice/gosip v0.0.0-20210220085524-cafc4013be1d/go.mod h1:yTr3BEYSFe9As6XM7ldyrVgqsPwlnw8Ahc4N28VFM2g=
github.com/ghettovoice/gosip v0.0.0-20210322152317-7858ea172631 h1:U4vU1rDtPKYCXl183LGxsXpoFZU3or2wq5S4qDKVusM=
github.com/ghettovoice/gosip v0.0.0-20210322152317-7858ea172631/go.mod h1:yTr3BEYSFe9As6XM7ldyrVgqsPwlnw8Ahc4N28VFM2g=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down
12 changes: 11 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package session
import (
"context"
"fmt"
"sync"

"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
Expand All @@ -12,6 +13,7 @@ import (
type RequestCallback func(ctx context.Context, request sip.Request, authorizer sip.Authorizer, waitForResult bool) (sip.Response, error)

type Session struct {
lock sync.Mutex
requestCallbck RequestCallback
status Status
callID sip.CallID
Expand Down Expand Up @@ -180,17 +182,25 @@ func (s *Session) StoreTransaction(tx sip.Transaction) {
}

func (s *Session) SetState(status Status) {
s.lock.Lock()
defer s.lock.Unlock()
s.status = status
}

func (s *Session) Status() Status {
s.lock.Lock()
defer s.lock.Unlock()
return s.status
}

func (s *Session) Direction() Direction {
return s.direction
}

func (s *Session) GetUserData() interface{} {
return s.userData
}

// GetEarlyMedia Get sdp for early media.
func (s *Session) GetEarlyMedia() string {
return s.answer
Expand Down Expand Up @@ -251,7 +261,7 @@ func (s *Session) Reject(statusCode sip.StatusCode, reason string) {
func (s *Session) End() error {

if s.status == Terminated {
err := fmt.Errorf("Invalid status: %v", s.status)
err := fmt.Errorf("invalid status: %v", s.status)
s.Log().Errorf("Session::End() %v", err)
return err
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/ua/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
)

type Register struct {
ua *UserAgent
timer *time.Timer
profile *account.Profile
recipient sip.SipUri
request *sip.Request
ctx context.Context
cancel context.CancelFunc
data interface{}
ua *UserAgent
timer *time.Timer
profile *account.Profile
authorizer *auth.ClientAuthorizer
recipient sip.SipUri
request *sip.Request
ctx context.Context
cancel context.CancelFunc
data interface{}
}

func NewRegister(ua *UserAgent, profile *account.Profile, recipient sip.SipUri, data interface{}) *Register {
Expand Down Expand Up @@ -70,12 +71,11 @@ func (r *Register) SendRegister(expires uint32) error {
(*r.request).AppendHeader(&expiresHeader)
}

var authorizer *auth.ClientAuthorizer = nil
if profile.AuthInfo != nil {
authorizer = auth.NewClientAuthorizer(profile.AuthInfo.AuthUser, profile.AuthInfo.Password)
if profile.AuthInfo != nil && r.authorizer == nil {
r.authorizer = auth.NewClientAuthorizer(profile.AuthInfo.AuthUser, profile.AuthInfo.Password)
}

resp, err := ua.RequestWithContext(r.ctx, *r.request, authorizer, true)
resp, err := ua.RequestWithContext(r.ctx, *r.request, r.authorizer, true)

if err != nil {
ua.Log().Errorf("Request [%s] failed, err => %v", sip.REGISTER, err)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (r *Register) SendRegister(expires uint32) error {
r.timer.Stop()
r.timer = nil
}
r.cancel()
r.request = nil
}
ua.RegisterStateHandler(state)
}
Expand Down

0 comments on commit 9e39ed6

Please sign in to comment.