Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PSVAMB-69845: [Dreamforce 2024][ESTT] Buffering and Errors on Live Streams During Extended Streaming-Time Test #215

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5d690b4
handle media_info change.
igorshevach Sep 8, 2024
d541c54
check for media info self assignment
igorshevach Sep 9, 2024
ec4a5dd
simulate codec change every X sec
igorshevach Sep 9, 2024
5a2946a
KMP_PACKET_MEDIA_INFO: dump media info (including extradata in hex fo…
igorshevach Sep 10, 2024
c5f12ec
kbps = 1024 bps
igorshevach Sep 10, 2024
81e9e37
pass correct context to KMP_log_mediainfo
igorshevach Sep 10, 2024
5a35aea
pass correct context to KMP_log_mediainfo
igorshevach Sep 12, 2024
fbb7a5d
transcoder - undo promiscuous handle media info
igorshevach Sep 15, 2024
49235cb
transcoder - undo promiscuous handle media info
igorshevach Sep 15, 2024
19c5a6c
fix order of events called; we must have rtmp_codec first
igorshevach Sep 15, 2024
37b1f5d
attempt to force rtmp not to send old media info
igorshevach Sep 15, 2024
abe566d
attempt to force rtmp not to send old media info
igorshevach Sep 15, 2024
c0d3848
undo
igorshevach Sep 15, 2024
d34bebe
undo
igorshevach Sep 15, 2024
a55e641
send ack for media info (?)
igorshevach Sep 16, 2024
bab813b
undo
igorshevach Sep 16, 2024
8a11b5c
- fix warnings
igorshevach Sep 16, 2024
1cdd266
fix
igorshevach Sep 17, 2024
ec212e9
add \n
igorshevach Sep 17, 2024
1a94715
remove static from .h file
igorshevach Sep 17, 2024
55c3225
fix _S macro
igorshevach Sep 17, 2024
4a2950b
build test failed
igorshevach Sep 17, 2024
12731f2
code style
igorshevach Sep 17, 2024
e911997
code style
igorshevach Sep 17, 2024
f27d3a7
code style
igorshevach Sep 17, 2024
200f8c5
fix log line
igorshevach Sep 18, 2024
a798d11
- fix logs
igorshevach Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions nginx-kmp-out-module/src/ngx_kmp_out_track.c
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,61 @@ ngx_kmp_out_track_write(ngx_kmp_out_track_t *track, u_char *data,
return ngx_kmp_out_track_write_chain(track, &in, data);
}

static ngx_int_t
ngx_kmp_out_track_log_media_info(ngx_kmp_out_track_t *track)
{
u_char *buf = NULL,
*p;

if(track->extra_data.len > 0)
{
buf = ngx_pnalloc(track->pool, 2 * track->extra_data.len + 1);
if(!buf)
{
ngx_log_error(NGX_LOG_ERR, &track->log, 0,
"ngx_kmp_out_track_log_media_info:"
"failed to allocate buffer for extra data size %uD",
2 * track->extra_data.len + 1);
return NGX_ERROR;
}

p = ngx_hex_dump(buf, track->extra_data.data, track->extra_data.len);
*p = 0;
}

ngx_log_error(NGX_LOG_INFO, &track->log, 0,
"ngx_kmp_out_track_log_media_info:"
" media_type: %uD codec_id: %uD"
" timescale: %uD bitrate: %uD"
" extra_data: %s",
track->media_info.media_type, track->media_info.codec_id,
track->media_info.timescale, track->media_info.bitrate, buf);
switch(track->media_info.media_type)
{
case KMP_MEDIA_VIDEO:
ngx_log_error(NGX_LOG_INFO, &track->log, 0,
"ngx_kmp_out_track_log_media_info: video"
" width: %uD height: %uD frame_rate: %uD/%uD cea_captions: %s",
track->media_info.u.video.width, track->media_info.u.video.height,
track->media_info.u.video.frame_rate.num, track->media_info.u.video.frame_rate.denom,
track->media_info.u.video.cea_captions ? "true" : "false");
break;
case KMP_MEDIA_AUDIO:
ngx_log_error(NGX_LOG_INFO, &track->log, 0,
"ngx_kmp_out_track_write_media_info: audio"
" channels: %uD bits_per_sample: %uD sample_rate: %uD channel_layout: %uD",
track->media_info.u.audio.channels, track->media_info.u.audio.bits_per_sample,
track->media_info.u.audio.sample_rate, track->media_info.u.audio.channel_layout);
break;
};

if(buf)
{
ngx_pfree(track->pool, buf);
}

return NGX_OK;
}

ngx_int_t
ngx_kmp_out_track_write_media_info(ngx_kmp_out_track_t *track)
Expand Down Expand Up @@ -1238,6 +1293,8 @@ ngx_kmp_out_track_write_media_info(ngx_kmp_out_track_t *track)
return NGX_ERROR;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to log before the other calls in the function in case one fails?

ngx_kmp_out_track_log_media_info(track);

return NGX_OK;
}

Expand Down
11 changes: 11 additions & 0 deletions nginx-rtmp-module/src/ngx_rtmp_codec_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,7 @@ ngx_rtmp_codec_postconfiguration(ngx_conf_t *cf)
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_handler_pt *h;
ngx_rtmp_amf_handler_t *ch;
ngx_uint_t nelts;

cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);

Expand All @@ -1464,13 +1465,23 @@ ngx_rtmp_codec_postconfiguration(ngx_conf_t *cf)
return NGX_ERROR;
}

nelts = cmcf->events[NGX_RTMP_MSG_AUDIO].nelts;
for(;nelts > 1;nelts--, h--) {
*h = h[-1];
}

*h = ngx_rtmp_codec_av;

h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
if (h == NULL) {
return NGX_ERROR;
}

nelts = cmcf->events[NGX_RTMP_MSG_VIDEO].nelts;
for(;nelts > 1;nelts--, h--) {
*h = h[-1];
}

*h = ngx_rtmp_codec_av;

h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
Expand Down
64 changes: 64 additions & 0 deletions transcoder/KMP/KMP.c
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,67 @@ bool KMP_read_ack(KMP_session_t *context,uint64_t* frame_id)
}
return false;
}

static const char *hex = "0123456789ABCDEF";

int KMP_log_mediainfo(KMP_session_t *context,
const char *category, int level,
transcode_mediaInfo_t* transcodeMediaInfo) {

AVCodecParameters* params = transcodeMediaInfo->codecParams;

if(!params){
return -1;
}

char *ex = NULL;
if(params->extradata_size > 0 ){
ex = av_malloc(2 * params->extradata_size + 1);
char *walk = ex;
for(int i = 0; i < params->extradata_size; i++) {
*walk++ = hex[params->extradata[i] >> 4];
*walk++ = hex[params->extradata[i] & 0x0f];
}
*walk = '\0';
}

if (params->codec_type == AVMEDIA_TYPE_AUDIO) {

LOGGER(category,level,"[%s] KMP_PACKET_MEDIA_INFO type=audio"
" codec_id=%s (0x%x) samplerate=%d bps=%d channels=%d channel_layout=%d"
" bitrate=%.3f kbps timescale=%d:%d"
" extra_data=%s",
context->sessionName,
avcodec_get_name(params->codec_id),
params->codec_tag,
params->sample_rate,
params->bits_per_coded_sample,
params->channels,
params->channel_layout,
params->bit_rate / 1024.0, transcodeMediaInfo->timeScale.num, transcodeMediaInfo->timeScale.den,
ex);
}
else if (params->codec_type == AVMEDIA_TYPE_VIDEO) {

LOGGER(category,level,"[%s] KMP_PACKET_MEDIA_INFO type=video"
" codec_id=%s (0x%x) width=%d height=%d frame_rate=%d:%d"
" bitrate=%.3f kbps timescale=%d:%d"
" cc=%s extra_data=%s",
context->sessionName,
avcodec_get_name(params->codec_id),
params->codec_tag,
params->width,
params->height,
transcodeMediaInfo->frameRate.num, transcodeMediaInfo->frameRate.den,
params->bit_rate / 1024.0, transcodeMediaInfo->timeScale.num, transcodeMediaInfo->timeScale.den,
transcodeMediaInfo->closed_captions ? "yes" : "no",
ex);
}

if(ex) {
av_free(ex);
}

return 0;
}

2 changes: 2 additions & 0 deletions transcoder/KMP/KMP.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ int KMP_send_ack( KMP_session_t *context,kmp_frame_position_t *cur_pos);

int KMP_close( KMP_session_t *context);

int KMP_log_mediainfo(KMP_session_t *context, const char *category,int level, transcode_mediaInfo_t* mediaInfo);


int KMP_listen( KMP_session_t *context);
int KMP_accept( KMP_session_t *context, KMP_session_t *client);
Expand Down
4 changes: 2 additions & 2 deletions transcoder/common/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ typedef struct {
#define JSON_SERIALIZE_ARRAY_END() JSON_WRITE("]"); js->shouldAddComma=true;

#define JSON_SERIALIZE_STRING(key,value) ADD_COMMA() JSON_WRITE("\"%s\": \"%s\"",key,value); js->shouldAddComma=true;
#define JSON_SERIALIZE_INT(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %d",key,value); js->shouldAddComma=true;
#define JSON_SERIALIZE_INT64(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %lld",key,value); js->shouldAddComma=true;
#define JSON_SERIALIZE_INT(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %d",key,(int)value); js->shouldAddComma=true;
#define JSON_SERIALIZE_INT64(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %ld",key,(int64_t)value); js->shouldAddComma=true;
#define JSON_SERIALIZE_BOOL(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %s",key,value ? "true" : "false"); js->shouldAddComma=true;
#define JSON_SERIALIZE_DOUBLE(key,value) ADD_COMMA() JSON_WRITE("\"%s\": %.2lf",key,value); js->shouldAddComma=true;

Expand Down
34 changes: 30 additions & 4 deletions transcoder/debug/file_streamer.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ void* thread_stream_from_file(void *vargp)

json_get_int64(GetConfig(),"input.hiccupIntervalSec",0,&hiccupIntervalSec);

int64_t resendMediaInfoIntervalSec;
json_get_int64(GetConfig(),"input.resendMediaInfoIntervalSec",0,&resendMediaInfoIntervalSec);


AVPacket packet;
av_init_packet(&packet);

Expand Down Expand Up @@ -94,7 +98,11 @@ void* thread_stream_from_file(void *vargp)
int64_t start_time=av_gettime_relative(),
hiccup_duration = hiccupDurationSec * 1000 * 1000,
hiccup_interval = hiccupIntervalSec * 1000 * 1000,
next_hiccup = start_time + hiccup_interval;
next_hiccup = start_time + hiccup_interval,
resendMediaInfoInterval = resendMediaInfoIntervalSec * 1000 * 1000,
nextSentMediaInfoTime = av_gettime_relative() + resendMediaInfoInterval;

uint64_t frame_id_ack = createTime - 1;

samples_stats_t stats;
sample_stats_init(&stats,standard_timebase);
Expand Down Expand Up @@ -173,6 +181,23 @@ void* thread_stream_from_file(void *vargp)
}
}

if(resendMediaInfoIntervalSec > 0
&& (packet.flags & AV_PKT_FLAG_KEY)
&& nextSentMediaInfoTime < av_gettime_relative())
{
LOGGER0(CATEGORY_RECEIVER,AV_LOG_WARNING,"sending mediainfo");
if (KMP_send_mediainfo(&kmp,&extra)<0) {
LOGGER0(CATEGORY_RECEIVER,AV_LOG_FATAL,"couldn't send mediainfo!");
break;
}

if (KMP_read_ack(&kmp,&frame_id_ack)) {
LOGGER(CATEGORY_RECEIVER,AV_LOG_DEBUG,"received ack for packet id %lld",frame_id_ack);
}

nextSentMediaInfoTime = av_gettime_relative() + resendMediaInfoInterval;
}

lastDts=packet.dts;

samples_stats_add(&stats,packet.dts,packet.pos,packet.size);
Expand All @@ -188,20 +213,21 @@ void* thread_stream_from_file(void *vargp)
rate)*/


uint64_t frame_id_ack;

if (KMP_send_packet(&kmp,&packet)<0) {
LOGGER0(CATEGORY_RECEIVER,AV_LOG_FATAL,"couldn't send packet!");
break;
}
frame_id++;
if (KMP_read_ack(&kmp,&frame_id_ack)) {
LOGGER(CATEGORY_RECEIVER,AV_LOG_DEBUG,"received ack for packet id %lld",frame_id_ack);
}


LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d",
LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d frame_id=%lld frame_id_ack=%lld",
ts2str(packet.pts,true),
ts2str(packet.dts,true),
packet.size);
packet.size, frame_id, frame_id_ack);


av_packet_unref(&packet);
Expand Down
22 changes: 19 additions & 3 deletions transcoder/receiver_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,26 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
LOGGER(CATEGORY_RECEIVER,AV_LOG_FATAL,"[%s] Invalid mediainfo",session->stream_name);
break;
}
LOGGER(CATEGORY_RECEIVER,AV_LOG_INFO,"[%s] received packet KMP_PACKET_MEDIA_INFO",session->stream_name);

transcode_session_async_set_mediaInfo(transcode_session, newParams);
}
KMP_log_mediainfo(&session->kmpClient, CATEGORY_RECEIVER, AV_LOG_INFO, newParams);

if( (retVal = transcode_session_async_set_mediaInfo(transcode_session, newParams)) < 0) {
LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] transcode_session_async_set_mediaInfo failed",session->stream_name);

LOGGER(CATEGORY_RECEIVER,AV_LOG_INFO,"[%s] flushing",session->stream_name);
_S(transcode_session_async_send_packet(transcode_session, NULL));

if(!autoAckMode) {
transcode_session_get_ack_frame_id(transcode_session,&current_position);
// TODO: we cheat here, received_frame_id may have not been persisted yet
// so we should make sure all of the frames are consumed ny upstream
current_position.frame_id = current_position.transcoded_frame_id = received_frame_id;
LOGGER(CATEGORY_RECEIVER,AV_LOG_INFO,"[%s] sending ack for packet # : %lld",session->stream_name,current_position.frame_id);
_S(KMP_send_ack(&session->kmpClient,&current_position));
}
break;
}
}
if (header.packet_type==KMP_PACKET_FRAME)
{
AVPacket* packet=av_packet_alloc();
Expand Down
Loading
Loading