Skip to content

Commit

Permalink
Support redis LB for large scale use scenarios.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 29, 2024
1 parent 7252ea1 commit b4b65e4
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 4 deletions.
11 changes: 7 additions & 4 deletions proxy/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (v *rtmpServer) serve(ctx context.Context, conn *net.TCPConn) error {
logger.Df(ctx, "RTMP connect app %v", tcUrl)

// Expect RTMP command to identify the client, a publisher or viewer.
var currentStreamID int
var currentStreamID, nextStreamID int
var streamName string
var clientType RTMPClientType
for clientType == "" {
Expand All @@ -170,8 +170,8 @@ func (v *rtmpServer) serve(ctx context.Context, conn *net.TCPConn) error {
identifyRes := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
response = identifyRes

currentStreamID = 1
identifyRes.StreamID = *rtmp.NewAmf0Number(float64(currentStreamID))
nextStreamID = 1
identifyRes.StreamID = *rtmp.NewAmf0Number(float64(nextStreamID))
} else {
// For releaseStream, FCPublish, etc.
identifyRes := rtmp.NewCallPacket()
Expand All @@ -180,7 +180,7 @@ func (v *rtmpServer) serve(ctx context.Context, conn *net.TCPConn) error {
identifyRes.TransactionID = pkt.TransactionID
identifyRes.CommandName = "_result"
identifyRes.CommandObject = rtmp.NewAmf0Null()
identifyRes.Args = rtmp.NewAmf0Null()
identifyRes.Args = rtmp.NewAmf0Undefined()
}
case *rtmp.PublishPacket:
identifyRes := rtmp.NewCallPacket()
Expand All @@ -203,6 +203,9 @@ func (v *rtmpServer) serve(ctx context.Context, conn *net.TCPConn) error {
return errors.Wrapf(err, "write identify res for req=%v, stream=%v",
identifyReq, currentStreamID)
}

// Update the stream ID for next request.
currentStreamID = nextStreamID
}
}
logger.Df(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v",
Expand Down
14 changes: 14 additions & 0 deletions proxy/srs.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ func (v *srsMemoryLoadBalancer) Initialize(ctx context.Context) error {
if err := v.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update default SRS %+v", server)
}

// Keep alive.
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(30 * time.Second):
if err := v.Update(ctx, server); err != nil {
logger.Wf(ctx, "update default SRS %+v failed, %+v", server, err)
}
}
}
}()
logger.Df(ctx, "MemoryLB: Initialize default SRS media server, %+v", server)
}
return nil
Expand Down
24 changes: 24 additions & 0 deletions trunk/conf/origin1-for-proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

listen 19351;
max_connections 1000;
pid objs/origin1.pid;
daemon off;
srs_log_tank console;
http_server {
enabled on;
listen 8081;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 19851;
}
heartbeat {
enabled on;
interval 3;
url http://127.0.0.1:12025/api/v1/srs/register;
device_id origin2;
ports on;
}
vhost __defaultVhost__ {
}
24 changes: 24 additions & 0 deletions trunk/conf/origin2-for-proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

listen 19352;
max_connections 1000;
pid objs/origin2.pid;
daemon off;
srs_log_tank console;
http_server {
enabled on;
listen 8082;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 19853;
}
heartbeat {
enabled on;
interval 3;
url http://127.0.0.1:12025/api/v1/srs/register;
device_id origin2;
ports on;
}
vhost __defaultVhost__ {
}
24 changes: 24 additions & 0 deletions trunk/conf/origin3-for-proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

listen 19353;
max_connections 1000;
pid objs/origin3.pid;
daemon off;
srs_log_tank console;
http_server {
enabled on;
listen 8083;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 19852;
}
heartbeat {
enabled on;
interval 3;
url http://127.0.0.1:12025/api/v1/srs/register;
device_id origin3;
ports on;
}
vhost __defaultVhost__ {
}

0 comments on commit b4b65e4

Please sign in to comment.