Skip to content

Commit

Permalink
Multi-threaded video decoding
Browse files Browse the repository at this point in the history
Summary:
Until this diff, vrsplayer was always decoding video frames right when reading the frame, because a video codec frame may need to trigger reading past records and past frames in order to decode a p-frame. This slows down decoding of video streams and limits our ability to decode multiple in real time, since they end up all doing the video decoding, probably in SW, in a single thread.
However, we typically use i-frame only in VRS files, because using p-frames isn't helping too much with file size, allowing us to decode video codec frames in the background thread we use from jpg/png/jxl decompression, and pixel format normalization.
With this diff, we now track if we found any p-frame, and only then switch to decoding video codec frames for that stream in the onImageRead callback, otherwise, we delegate the video decoding to the stream's background thread.

Reviewed By: kiminoue7

Differential Revision: D51789223

fbshipit-source-id: 01abf8d4b1c6095744e6e7d8dfec7c934effb96d
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Dec 5, 2023
1 parent d651352 commit 798e3b3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 45 deletions.
71 changes: 35 additions & 36 deletions tools/vrsplayer/FramePlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,32 @@ bool FramePlayer::onImageRead(
const auto& spec = contentBlock.image();
shared_ptr<PixelFrame> frame = getFrame(true);
bool frameValid = false;
unique_ptr<ImageJob> job;
imageFormat_ = spec.getImageFormat();
if (imageFormat_ == vrs::ImageFormat::RAW) {
// RAW images are read from disk directly, but pixel format conversion is asynchronous
frameValid = PixelFrame::readRawFrame(frame, record.reader, spec);
if (!firstImage_ && frameValid) {
job = make_unique<ImageJob>(vrs::ImageFormat::RAW);
}
} else if (imageFormat_ == vrs::ImageFormat::VIDEO) {
if (imageFormat_ == vrs::ImageFormat::VIDEO) {
// Video codec decompression happens here, but pixel format conversion is asynchronous
PixelFrame::init(frame, contentBlock.image());
frameValid = (tryToDecodeFrame(*frame, record, contentBlock) == 0);
if (!firstImage_ && frameValid) {
job = make_unique<ImageJob>(vrs::ImageFormat::VIDEO);
if (spec.getKeyFrameIndex() > 0) {
iframesOnly_.store(false, memory_order_relaxed);
}
if (firstImage_ || !iframesOnly_.load(memory_order_relaxed)) {
PixelFrame::init(frame, contentBlock.image());
unique_lock<mutex> lock(videoDecodingMutex_);
frameValid = (tryToDecodeFrame(*frame, record, contentBlock) == 0);
} else {
frameValid = PixelFrame::readDiskImageData(frame, record.reader, contentBlock);
}
} else {
if (firstImage_) {
frameValid = PixelFrame::readFrame(frame, record.reader, contentBlock);
} else {
// decoding & pixel format conversion happen asynchronously
job = make_unique<ImageJob>(imageFormat_);
job->buffer.resize(contentBlock.getBlockSize());
frameValid = (record.reader->read(job->buffer) == 0);
frameValid = PixelFrame::readDiskImageData(frame, record.reader, contentBlock);
}
}
if (frameValid && job) {
job->frame = std::move(frame);
if (frameValid && !firstImage_) {
imageJobs_.startThreadIfNeeded(&FramePlayer::imageJobsThreadActivity, this);
imageJobs_.sendJob(std::move(job));
imageJobs_.sendJob(make_unique<ImageJob>(std::move(frame)));
return true;
}
// Processing was not sent in the background, complete here!
if (firstImage_) {
fmt::print(
"Found '{} - {}': {}, {}",
Expand All @@ -111,10 +106,8 @@ bool FramePlayer::onImageRead(
fmt::print(" - {}", frame->getSpec().asString());
}
blankMode_ = false;
}
if (frameValid) {
convertFrame(frame);
if (firstImage_) {
if (frameValid) {
convertFrame(frame);
if (needsConvertedFrame_) {
fmt::print(" -> {}", frame->getSpec().asString());
}
Expand All @@ -123,14 +116,18 @@ bool FramePlayer::onImageRead(
}
frame->blankFrame();
blankMode_ = true;
widget_->swapImage(frame);
}
widget_->swapImage(frame);
}
recycle(frame, !needsConvertedFrame_);
if (firstImage_) {
fmt::print("\n");
firstImage_ = false;
} else {
// !firstImage_
if (frameValid) {
convertFrame(frame);
widget_->swapImage(frame);
}
}
recycle(frame, !needsConvertedFrame_);
return true; // read next blocks, if any
}

Expand Down Expand Up @@ -257,26 +254,28 @@ bool FramePlayer::saveFrame(
void FramePlayer::imageJobsThreadActivity() {
unique_ptr<ImageJob> job;
while (imageJobs_.waitForJob(job)) {
shared_ptr<PixelFrame> frame = std::move(job->frame);
// if we're behind, we just drop images except the last one!
while (imageJobs_.getJob(job)) {
recycle(frame, true);
frame = std::move(job->frame);
; // just skip!
}
shared_ptr<PixelFrame>& frame = *job;
bool frameValid = false;
if (job->imageFormat == vrs::ImageFormat::RAW || job->imageFormat == vrs::ImageFormat::VIDEO) {
vrs::ImageFormat imageFormat = frame->getSpec().getImageFormat();
if (imageFormat == vrs::ImageFormat::RAW) {
frameValid = (frame != nullptr);
} else if (imageFormat == vrs::ImageFormat::VIDEO) {
unique_lock<mutex> lock(videoDecodingMutex_);
frameValid = frame->decompressImage(&getVideoFrameHandler(id_));
} else {
if (!frame) {
frame = make_shared<PixelFrame>();
}
frameValid = frame->readCompressedFrame(job->buffer, job->imageFormat);
frameValid = frame->decompressImage();
}
if (frameValid) {
convertFrame(frame);
widget_->swapImage(frame);
}
recycle(frame, !frameValid || !needsConvertedFrame_);
if (imageFormat != vrs::ImageFormat::VIDEO) {
recycle(frame, !frameValid || !needsConvertedFrame_);
}
}
}

Expand Down
11 changes: 4 additions & 7 deletions tools/vrsplayer/FramePlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <thread>

#include <QtCore/qglobal.h>
#include <qobject.h>
Expand Down Expand Up @@ -51,12 +51,7 @@ using ::vrs::StreamPlayer;
using ::vrs::utils::PixelFrame;
using ::vrs::utils::VideoRecordFormatStreamPlayer;

struct ImageJob {
ImageJob(vrs::ImageFormat imageFormat) : imageFormat{imageFormat} {}
vrs::ImageFormat imageFormat;
shared_ptr<PixelFrame> frame;
vector<uint8_t> buffer;
};
using ImageJob = shared_ptr<PixelFrame>;

class FramePlayer : public QObject, public VideoRecordFormatStreamPlayer {
Q_OBJECT
Expand Down Expand Up @@ -94,6 +89,7 @@ class FramePlayer : public QObject, public VideoRecordFormatStreamPlayer {
void mediaStateChanged(FileReaderState state);

private:
std::mutex videoDecodingMutex_;
std::mutex frameMutex_;
vector<shared_ptr<PixelFrame>> inputFrames_;
vector<shared_ptr<PixelFrame>> convertedframes_;
Expand All @@ -105,6 +101,7 @@ class FramePlayer : public QObject, public VideoRecordFormatStreamPlayer {
bool visible_{true};
bool blankMode_{true};
bool firstImage_{true};
std::atomic<bool> iframesOnly_{true};
string saveNextFramePath_;
int estimatedFps_;
Fps dataFps_;
Expand Down
6 changes: 5 additions & 1 deletion vrs/utils/VideoRecordFormatStreamPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ int VideoRecordFormatStreamPlayer::readMissingFrames(
return result;
}

VideoFrameHandler& VideoRecordFormatStreamPlayer::getVideoFrameHandler(StreamId streamId) {
return handlers_[streamId];
}

int VideoRecordFormatStreamPlayer::tryToDecodeFrame(
void* outBuffer,
const CurrentRecord& record,
Expand Down Expand Up @@ -75,7 +79,7 @@ bool VideoRecordFormatStreamPlayer::readFrame(
return outFrame.readFrame(record.reader, cb);
}

void VideoRecordFormatStreamPlayer::resetVideoFrameHandler(const StreamId& streamId) {
void VideoRecordFormatStreamPlayer::resetVideoFrameHandler(StreamId streamId) {
if (streamId.isValid()) {
handlers_[streamId].reset();
} else {
Expand Down
5 changes: 4 additions & 1 deletion vrs/utils/VideoRecordFormatStreamPlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ class VideoRecordFormatStreamPlayer : public RecordFormatStreamPlayer {
return whileReadingMissingFrames_;
}

/// Get the video frame handler for a particular stream
VideoFrameHandler& getVideoFrameHandler(StreamId streamId);

/// Reset videoFrameHandler's internal state to force reading from the key frame.
/// @param streamId: StreamID for the handler you want to reset, if you don't specify anything
/// reset all the handlers.
void resetVideoFrameHandler(const StreamId& streamId = {});
void resetVideoFrameHandler(StreamId streamId = {});

/// Reference implementation to systematically & transparently read missing I-frame & P-frames.
/// Note that callbacks when reading previous frames will be happen, as if the record was really
Expand Down

0 comments on commit 798e3b3

Please sign in to comment.