Skip to content

Commit

Permalink
- add "jumpOffsetSec": xxx to config.json where xx < 0 means jump back
Browse files Browse the repository at this point in the history
in time, xx > 0 forward.
- extend metadata with pts. a decoded frame pts is reset with its original value. this is because decoder would not respect pts jump baclwards
- set async=0 for audio resampler filter. this is to prevent it from adding/removing samples due to pts jump.
  • Loading branch information
igorshevach committed Sep 18, 2023
1 parent 6f81220 commit 8fa070e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 20 deletions.
30 changes: 25 additions & 5 deletions transcoder/debug/file_streamer.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ void* thread_stream_from_file(void *vargp)
file_streamer_t* args=(file_streamer_t*)vargp;
AVFormatContext *ifmt_ctx=NULL;
int oldLevel= get_log_level(NULL);
int64_t jumpAtTimestamp = AV_NOPTS_VALUE;

log_init(AV_LOG_WARNING);
int ret = avformat_open_input(&ifmt_ctx, args->source_file_name, NULL, NULL);
Expand Down Expand Up @@ -48,6 +49,9 @@ void* thread_stream_from_file(void *vargp)
char channelId[KMP_MAX_CHANNEL_ID];
json_get_string(GetConfig(),"input.channelId","1_abcdefgh",channelId,sizeof(channelId));

int jumpOffsetSec=0;
json_get_int64(GetConfig(),"input.jumpoffsetsec",0,&jumpOffsetSec);

AVPacket packet;
av_init_packet(&packet);

Expand Down Expand Up @@ -105,6 +109,19 @@ void* thread_stream_from_file(void *vargp)
AVStream *in_stream=ifmt_ctx->streams[packet.stream_index];

av_packet_rescale_ts(&packet,in_stream->time_base, standard_timebase);
if(jumpOffsetSec != 0 ) {
const auto jumpOffset = jumpOffsetSec * standard_timebase.den;
if(AV_NOPTS_VALUE == jumpAtTimestamp) {
jumpAtTimestamp = packet.pts + abs(jumpOffset);
}

if(packet.pts > jumpAtTimestamp) {
packet.pts += jumpOffset;
packet.dts += jumpOffset;
LOGGER(CATEGORY_DEFAULT,AV_LOG_INFO,"pts %s shifted pts,dts by %s. jump ts %s",
pts2str(packet.pts),pts2str(jumpOffset),pts2str(jumpAtTimestamp));
}
}
packet.pts+=cumulativeDuration;
packet.dts+=cumulativeDuration;
packet.pos=createTime +packet.dts;
Expand All @@ -123,13 +140,13 @@ void* thread_stream_from_file(void *vargp)
}
}

if (realTime) {
if (realTime && lastDts > 0) {

int64_t timePassed=av_rescale_q(packet.dts-cumulativeDuration,standard_timebase,AV_TIME_BASE_Q);
int64_t timePassed=av_rescale_q(packet.dts-lastDts,standard_timebase,AV_TIME_BASE_Q);
//LOGGER("SENDER",AV_LOG_DEBUG,"XXXX dt=%ld dd=%ld", (av_gettime_relative() - start_time),timePassed);
while ((av_gettime_relative() - start_time) < timePassed) {

// LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms");
LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms");
av_usleep(10*1000);//10ms
}
}
Expand Down Expand Up @@ -159,20 +176,23 @@ void* thread_stream_from_file(void *vargp)
}


/*
LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d",
ts2str(packet.pts,true),
ts2str(packet.dts,true),
packet.dts,packet.size);*/
packet.dts,packet.size);


av_packet_unref(&packet);

}
KMP_send_eof(&kmp);

LOGGER0("SENDER",AV_LOG_DEBUG,"sent EOF");

KMP_close(&kmp);
avformat_close_input(&ifmt_ctx);

LOGGER0("SENDER",AV_LOG_DEBUG,"exiting");
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions transcoder/receiver_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section
samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size);
pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section
LOGGER(CATEGORY_RECEIVER,AV_LOG_DEBUG,"[%s] received packet %s (%p) #: %lld",session->stream_name,getPacketDesc(packet),transcode_session,received_frame_id);
if(add_packet_frame_id(packet,received_frame_id)){
if(add_packet_frame_id(packet,received_frame_id,packet->pts)){
LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id);
}
LOGGER(CATEGORY_RECEIVER,AV_LOG_DEBUG,"[%s] received packet %s (%p) #: %lld",session->stream_name,getPacketDesc(packet),transcode_session,received_frame_id);
_S(transcode_session_async_send_packet(transcode_session, packet));
av_packet_free(&packet);
received_frame_id++;
Expand Down
14 changes: 9 additions & 5 deletions transcoder/transcode/transcode_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void get_filter_config(transcode_session_t *pSession,char *filterConfig, transc
char buf[64];
av_get_channel_layout_string(buf,sizeof(buf),
codec->ctx->channels,codec->ctx->channel_layout);
sprintf(filterConfig,"aresample=async=1000:out_sample_rate=%d:out_channel_layout=%s",
sprintf(filterConfig,"aresample=async=0:out_sample_rate=%d:out_channel_layout=%s",
codec->ctx->sample_rate,buf);
}
}
Expand Down Expand Up @@ -383,13 +383,12 @@ int encodeFrame(transcode_session_t *pContext,int encoderId,int outputId,AVFrame
goto encoder_error;
}

output_frame_id = pContext->transcoded_frame_first_id+pOutput->stats.totalFrames;
add_packet_frame_id(pOutPacket,output_frame_id);

LOGGER(CATEGORY_TRANSCODING_SESSION,AV_LOG_DEBUG,"[%s] received encoded frame %s from encoder Id %d",
pOutput->track_id,
getPacketDesc(pOutPacket),
encoderId);
output_frame_id = pContext->transcoded_frame_first_id+pOutput->stats.totalFrames;
add_packet_frame_id(pOutPacket,output_frame_id,pOutPacket->pts);

pOutPacket->pos=clock_estimator_get_clock(&pContext->clock_estimator,pOutPacket->dts);

Expand Down Expand Up @@ -572,6 +571,7 @@ static void shift_audio_samples(AVFrame *frame,int shift_by) {

int OnDecodedFrame(transcode_session_t *ctx,AVCodecContext* decoderCtx, AVFrame *frame)
{
uint64_t pts;
for (int outputId=0;outputId<ctx->outputs;outputId++) {
transcode_session_output_t *pOutput=&ctx->output[outputId];
if (!pOutput->passthrough && pOutput->encoderId==-1) {
Expand All @@ -591,7 +591,11 @@ int OnDecodedFrame(transcode_session_t *ctx,AVCodecContext* decoderCtx, AVFrame
return 0;
}


// we do not rely on decoder timestamps since it does not
// take into account arbitrary jumps.
if(!get_frame_pts(frame,&pts)) {
frame->pts = frame->pkt_dts = pts;
}

if(ctx->offset > 0){
if(decoderCtx->codec_type == AVMEDIA_TYPE_AUDIO) {
Expand Down
54 changes: 47 additions & 7 deletions transcoder/utils/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ const char* pict_type_to_string(int pt) {
char *av_get_frame_desc(char* buf, int size,const AVFrame * pFrame)
{
uint64_t frame_id;
int64_t pts = AV_NOPTS_VALUE;
if (pFrame==NULL) {
return "<NULL>";
}
get_frame_id(pFrame,&frame_id);
get_frame_pts(pFrame,&pts);
if (pFrame->width>0) {
snprintf(buf,size,"pts=%s;samples=%d;clock=%s;key=%s;data=%p;hwctx=%p;format=%s;pictype=%s;width=%d;height=%d;ar=%d/%d;has_53cc=%d;frame_id=%ld",
snprintf(buf,size,"pts=%s;samples=%d;clock=%s;key=%s;data=%p;hwctx=%p;format=%s;pictype=%s;width=%d;height=%d;ar=%d/%d;has_53cc=%d;frame_id=%ld;pts=%s",
pts2str(pFrame->pts),
pFrame->nb_samples,
pFrame->pkt_pos != 0 ? ts2str(pFrame->pkt_pos,false) : "N/A",
Expand All @@ -156,23 +158,26 @@ char *av_get_frame_desc(char* buf, int size,const AVFrame * pFrame)
pFrame->height,
pFrame->sample_aspect_ratio.num,
pFrame->sample_aspect_ratio.den,
av_frame_get_side_data(pFrame,AV_FRAME_DATA_A53_CC) != NULL, frame_id);
av_frame_get_side_data(pFrame,AV_FRAME_DATA_A53_CC) != NULL, frame_id, pts2str(pts));
} else {
snprintf(buf,size,"pts=%s;channels=%d;sampleRate=%d;format=%d;size=%d;channel_layout=%ld;frame_id=%ld",
snprintf(buf,size,"pts=%s;channels=%d;sampleRate=%d;format=%d;size=%d;channel_layout=%ld;frame_id=%ld;pts=%s",
pts2str(pFrame->pts),
pFrame->channels,pFrame->sample_rate,pFrame->format,pFrame->nb_samples,pFrame->channel_layout,frame_id);
pFrame->channels,pFrame->sample_rate,pFrame->format,pFrame->nb_samples,pFrame->channel_layout,frame_id,pts2str(pts));
}
return buf;
}

char *av_get_packet_desc(char *buf,int len,const AVPacket * packet)
{
int64_t frame_id;
int64_t pts = AV_NOPTS_VALUE;
if (packet==NULL) {
return "<NULL>";
}
get_packet_frame_id(packet,&frame_id);
snprintf(buf,len,"mem=%p;data=%p;pts=%s;dts=%s;dur=%s;clock=%s;key=%s;size=%d;flags=%d;frame_id=%ld",
get_packet_pts(packet,&pts);

snprintf(buf,len,"mem=%p;data=%p;pts=%s;dts=%s;dur=%s;clock=%s;key=%s;size=%d;flags=%d;frame_id=%ld;pts=%s",
packet,
packet->data,
pts2str(packet->pts),
Expand All @@ -182,7 +187,8 @@ char *av_get_packet_desc(char *buf,int len,const AVPacket * packet)
(packet->flags & AV_PKT_FLAG_KEY)==AV_PKT_FLAG_KEY ? "Yes" : "No",
packet->size,
packet->flags,
frame_id);
frame_id,
pts2str(pts));
return buf;
}

Expand Down Expand Up @@ -252,13 +258,15 @@ void log_frame_side_data(const char* category,const AVFrame *pFrame)
}
}

int add_packet_frame_id(AVPacket *packet,int64_t frame_id) {
int add_packet_frame_id(AVPacket *packet,int64_t frame_id,int64_t pts) {
AVDictionary * frameDict = NULL;
int frameDictSize = 0;
char buf[sizeof("9223372036854775807")];
uint8_t *frameDictData = NULL;
sprintf(buf,"%lld",frame_id);
_S(av_dict_set(&frameDict, "frame_id", buf, 0));
sprintf(buf,"%lld",pts);
_S(av_dict_set(&frameDict, "pts", buf, 0));
// Pack dictionary to be able to use it as a side data in AVPacket
frameDictData = av_packet_pack_dictionary(frameDict, &frameDictSize);
if(!frameDictData)
Expand Down Expand Up @@ -287,6 +295,25 @@ int get_packet_frame_id(const AVPacket *packet,int64_t *frame_id_ptr)
return 0;
}

int get_packet_pts(const AVPacket *packet,int64_t *pts_ptr)
{
const char *pts_str;
AVDictionary * frameDict = NULL;
int frameDictSize = 0;
uint8_t *frameDictData = av_packet_get_side_data(packet, AV_PKT_DATA_STRINGS_METADATA, &frameDictSize);
*pts_ptr = AV_NOPTS_VALUE;
if (!frameDictData)
return AVERROR(EINVAL);
_S(av_packet_unpack_dictionary(frameDictData,frameDictSize,&frameDict));
pts_str = av_dict_get(frameDict, "pts", NULL, 0)->value;
if(!pts_str)
return AVERROR(EINVAL);
*pts_ptr = strtoull(pts_str,NULL,10);
av_dict_free(&frameDict);
return 0;
}


int get_frame_id(const AVFrame *frame,uint64_t *frame_id_ptr)
{
*frame_id_ptr = AV_NOPTS_VALUE;
Expand All @@ -298,4 +325,17 @@ int get_frame_id(const AVFrame *frame,uint64_t *frame_id_ptr)
return 0;
}
return AVERROR(EINVAL);
}

int get_frame_pts(const AVFrame *frame,int64_t *pts_ptr)
{
*pts_ptr = AV_NOPTS_VALUE;
if(frame->metadata) {
const char *pts_str = av_dict_get(frame->metadata, "pts", NULL, 0)->value;
if(!pts_str)
return AVERROR(EINVAL);
*pts_ptr = strtoull(pts_str,NULL,10);
return 0;
}
return AVERROR(EINVAL);
}
5 changes: 4 additions & 1 deletion transcoder/utils/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ char *av_get_frame_desc(char *buf,int len, const AVFrame * frame);
char *av_get_packet_desc(char *buf,int len, const AVPacket * packet);
char* av_socket_info(char* buf,int len,const struct sockaddr_in* sa);
void log_frame_side_data(const char* category,const AVFrame *pFrame);
int add_packet_frame_id(AVPacket *packet,int64_t frame_id);
int add_packet_frame_id(AVPacket *packet,int64_t frame_id,int64_t pts);
int get_frame_id(const AVFrame *frame,uint64_t *frame_id_ptr);
int get_packet_frame_id(const AVPacket *packet,int64_t *frame_id_ptr);
int add_packet_pts(AVPacket *packet,uint64_t pts);
int get_packet_pts(const AVPacket *packet,int64_t *pts_ptr);
int get_frame_pts(const AVFrame *frame,int64_t *pts_ptr);
/**
* Convenience macro, the return value should be used only directly in
* function arguments but never stand-alone.
Expand Down

0 comments on commit 8fa070e

Please sign in to comment.