Skip to content

Commit

Permalink
修复缓存溢出问题
Browse files Browse the repository at this point in the history
  • Loading branch information
lam2003 committed May 15, 2019
1 parent b8ac736 commit f001128
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 92 deletions.
4 changes: 2 additions & 2 deletions monitor/common/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ struct default_block_allocator_malloc_free
}
};

typedef default_block_allocator_malloc_free<256 * 1024> default_allocator;
typedef default_block_allocator_malloc_free<BUFFER_LEN> default_allocator;

template <typename BlockAllocator = default_allocator> //max 1MB
template <typename BlockAllocator = default_allocator> //max 512K
class Buffer
{
public:
Expand Down
1 change: 1 addition & 0 deletions monitor/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
#define DETECT_MEM_BLK_NUM 1 //检测模块内存块数
#define RECORD_DIR_FORMAT "%Y_%m_%d" //录制目录名称(日期格式)
#define RECORD_FILE_FORMAT "%H_%M_%S" //录制文件名称(日期格式)
#define BUFFER_LEN 524288 //缓存大小

#define NVR_ISP_DEV 0 //ISP设备
#define NVR_VI_DEV 0 //VI设备
Expand Down
50 changes: 31 additions & 19 deletions monitor/live/rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ int32_t RtmpLiveImpl::Initialize(const Params &params)

bool init = false;

while (run_)
uint8_t *temp_buf = (uint8_t *)malloc(BUFFER_LEN);
if (!temp_buf)
{
log_e("malloc buffer failed");
return;
}

while (run_)
{
if (!init)
{
wait_sps = true;
Expand All @@ -52,36 +58,42 @@ int32_t RtmpLiveImpl::Initialize(const Params &params)
}
init = true;
}

std::unique_lock<std::mutex> lock(mux_);
while (buffer_.Get((uint8_t *)&frame, sizeof(frame)))
{
if (frame.type == H264Frame::NaluType::SPS)
wait_sps = false;

frame.data = buffer_.GetCurrentPos();
if (init && !wait_sps)
std::unique_lock<std::mutex> lock(mux_);
if (buffer_.Get((uint8_t *)&frame, sizeof(frame)))
{
code = static_cast<err_code>(rtmp_streamer.WriteVideoFrame(frame));
if (KSuccess != code)
memcpy(temp_buf, buffer_.GetCurrentPos(), frame.len);
frame.data = temp_buf;
if (!buffer_.Consume(frame.len))
{
log_w("rtmp connection break,try to reconnect...");
rtmp_streamer.Close();
init = false;
log_e("consme data from buffer failed,rest data not enough");
return;
}
}
if (!buffer_.Consume(frame.len))
else if (run_)
{
log_e("consme data from buffer failed,rest data not enough");
return;
cond_.wait(lock);
continue;
}
}

if (run_ && init)
cond_.wait(lock);
if (frame.type == H264Frame::NaluType::SPS)
wait_sps = false;

if (init && !wait_sps)
{
code = static_cast<err_code>(rtmp_streamer.WriteVideoFrame(frame));
if (KSuccess != code)
{
log_w("rtmp connection break,try to reconnect...");
rtmp_streamer.Close();
init = false;
}
}
}

rtmp_streamer.Close();
free(temp_buf);
}));

init_ = true;
Expand Down
153 changes: 82 additions & 71 deletions monitor/record/mp4_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,55 +49,89 @@ void MP4RecordImpl::OnTrigger(int32_t num)

void MP4RecordImpl::RecordThread()
{
err_code code;
MP4Muxer muxer;
VideoFrame frame;
uint64_t now;
uint64_t start_time;
bool wait_sps;
bool init = false;

now = System::GetSteadyMilliSeconds();
while (run_ && RecordNeedToQuit())
usleep(500000); //500ms

while (run_)
{
if (!init)
}

int32_t MP4RecordImpl::Initialize(const Params &params)
{
if (init_)
return static_cast<int>(KDupInitialize);

params_ = params;
run_ = true;
thread_ = std::unique_ptr<std::thread>(new std::thread([this]() {
err_code code;
MP4Muxer muxer;
VideoFrame frame;
uint64_t now;
uint64_t start_time;
bool wait_sps;

bool init = false;
uint8_t *temp_buf = (uint8_t *)malloc(BUFFER_LEN);
if (!temp_buf)
{
log_e("malloc buffer failed");
return;
}

now = System::GetSteadyMilliSeconds();
while (run_ && RecordNeedToQuit())
usleep(500000); //500ms

while (run_)
{
std::string path;
if (!init)
{
//创建文件夹,按日期创建
std::string path;
{
//创建文件夹,按日期创建
std::ostringstream oss;
oss << params_.path << '/' << System::GetLocalTime(RECORD_DIR_FORMAT);
path = oss.str();
code = static_cast<err_code>(System::CreateDir(path));
if (KSuccess != code)
return;
}
std::ostringstream oss;
oss << params_.path << '/' << System::GetLocalTime(RECORD_DIR_FORMAT);
path = oss.str();
code = static_cast<err_code>(System::CreateDir(path));
oss << path << '/' << "record_" << System::GetLocalTime(RECORD_FILE_FORMAT) << ".mp4";
code = static_cast<err_code>(muxer.Initialize(oss.str().c_str(), params_.width, params_.height, params_.frame_rate));
if (KSuccess != code)
{
log_e("error:%s", make_error_code(code).message().c_str());
return;
}

mux_.lock();
buffer_.Clear();
mux_.unlock();

start_time = System::GetSteadyMilliSeconds();
wait_sps = true;
init = true;
}

std::ostringstream oss;
oss << path << '/' << "record_" << System::GetLocalTime(RECORD_FILE_FORMAT) << ".mp4";
code = static_cast<err_code>(muxer.Initialize(oss.str().c_str(), params_.width, params_.height, params_.frame_rate));
if (KSuccess != code)
{
log_e("error:%s", make_error_code(code).message().c_str());
return;
std::unique_lock<std::mutex> lock(mux_);
if (buffer_.Get((uint8_t *)&frame, sizeof(frame)))
{
memcpy(temp_buf, buffer_.GetCurrentPos(), frame.len);
frame.data = temp_buf;
if (!buffer_.Consume(frame.len))
{
log_e("consme data from buffer failed,rest data not enough");
return;
}
}
else if (run_)
{
cond_.wait(lock);
continue;
}
}

start_time = System::GetSteadyMilliSeconds();
wait_sps = true;
init = true;
}

std::unique_lock<std::mutex> lock(mux_);
while (buffer_.Get((uint8_t *)&frame, sizeof(frame)))
{
if (frame.type == H264Frame::NaluType::SPS)
wait_sps = false;

frame.data = buffer_.GetCurrentPos();

if (!wait_sps)
{
code = static_cast<err_code>(muxer.WriteVideoFrame(frame));
Expand All @@ -108,44 +142,21 @@ void MP4RecordImpl::RecordThread()
}
}

if (!buffer_.Consume(frame.len))
if (RecordNeedToQuit())
{
log_e("buffer rest data not enough");
return;
muxer.Close();
init = false;
while (run_ && RecordNeedToQuit())
usleep(500000); //500ms
}
else if (RecordNeedToSegment(start_time))
{
muxer.Close();
init = false;
}
}

if (RecordNeedToQuit())
{
lock.unlock();
muxer.Close();
init = false;
while (run_ && RecordNeedToQuit())
usleep(500000); //500ms
}
else if (RecordNeedToSegment(start_time))
{
lock.unlock();
muxer.Close();
init = false;
}
else if (run_)
{
cond_.wait(lock);
}
}
muxer.Close();
}

int32_t MP4RecordImpl::Initialize(const Params &params)
{
if (init_)
return static_cast<int>(KDupInitialize);

params_ = params;
run_ = true;
thread_ = std::unique_ptr<std::thread>(new std::thread([this]() {
RecordThread();
muxer.Close();
free(temp_buf);
}));

init_ = true;
Expand Down

0 comments on commit f001128

Please sign in to comment.