Skip to content

Commit

Permalink
Fix a leak when cancelling scheduled tasks (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Sep 6, 2017
1 parent 179984f commit 76e3d39
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 43 deletions.
7 changes: 7 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ jobs:
working_directory: /tmp/licode

steps:
- run:
name: Install Git client
command: |
set -x
sudo apt-get update
sudo apt-get install -y git
- checkout

- setup_remote_docker
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/DtlsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ static std::mutex dtls_mutex;

Resender::Resender(DtlsTransport* transport, dtls::DtlsSocketContext* ctx)
: transport_(transport), socket_context_(ctx),
resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends) {
resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends),
scheduled_task_{std::make_shared<ScheduledTaskReference>()} {
}

Resender::~Resender() {
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/DtlsTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Resender {
packetPtr packet_;
unsigned int resend_seconds_;
unsigned int max_resends_;
int scheduled_task_ = -1;
std::shared_ptr<ScheduledTaskReference> scheduled_task_;
};
} // namespace erizo
#endif // ERIZO_SRC_ERIZO_DTLSTRANSPORT_H_
6 changes: 3 additions & 3 deletions erizo/src/erizo/rtp/PliPacerHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ constexpr duration PliPacerHandler::kKeyframeTimeout;

PliPacerHandler::PliPacerHandler(std::shared_ptr<erizo::Clock> the_clock)
: enabled_{true}, connection_{nullptr}, clock_{the_clock}, time_last_keyframe_{clock_->now()},
waiting_for_keyframe_{false}, scheduled_pli_{-1},
waiting_for_keyframe_{false}, scheduled_pli_{std::make_shared<ScheduledTaskReference>()},
video_sink_ssrc_{0}, video_source_ssrc_{0}, fir_seq_number_{0} {}

void PliPacerHandler::enable() {
Expand All @@ -38,7 +38,7 @@ void PliPacerHandler::read(Context *ctx, std::shared_ptr<dataPacket> packet) {
time_last_keyframe_ = clock_->now();
waiting_for_keyframe_ = false;
connection_->getWorker()->unschedule(scheduled_pli_);
scheduled_pli_ = -1;
scheduled_pli_ = std::make_shared<ScheduledTaskReference>();
}
ctx->fireRead(std::move(packet));
}
Expand All @@ -54,7 +54,7 @@ void PliPacerHandler::sendFIR() {
getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++));
getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++));
waiting_for_keyframe_ = false;
scheduled_pli_ = -1;
scheduled_pli_ = std::make_shared<ScheduledTaskReference>();
}

void PliPacerHandler::scheduleNextPLI() {
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/rtp/PliPacerHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "./logger.h"
#include "pipeline/Handler.h"
#include "thread/Worker.h"
#include "lib/Clock.h"

namespace erizo {
Expand Down Expand Up @@ -43,7 +44,7 @@ class PliPacerHandler: public Handler, public std::enable_shared_from_this<PliPa
std::shared_ptr<erizo::Clock> clock_;
time_point time_last_keyframe_;
bool waiting_for_keyframe_;
int scheduled_pli_;
std::shared_ptr<ScheduledTaskReference> scheduled_pli_;
uint32_t video_sink_ssrc_;
uint32_t video_source_ssrc_;
uint8_t fir_seq_number_;
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ RtpPaddingGeneratorHandler::RtpPaddingGeneratorHandler(std::shared_ptr<erizo::Cl
marker_rate_{std::chrono::milliseconds(100), 20, 1., clock_},
rtp_header_length_{12},
bucket_{kInitialBitrate, kPaddingBurstSize, clock_},
scheduled_task_{-1} {}
scheduled_task_{std::make_shared<ScheduledTaskReference>()} {}



Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "pipeline/Handler.h"
#include "lib/Clock.h"
#include "lib/TokenBucket.h"
#include "thread/Worker.h"
#include "rtp/SequenceNumberTranslator.h"
#include "./Stats.h"

Expand Down Expand Up @@ -64,7 +65,7 @@ class RtpPaddingGeneratorHandler: public Handler, public std::enable_shared_from
MovingIntervalRateStat marker_rate_;
uint32_t rtp_header_length_;
TokenBucket bucket_;
int scheduled_task_;
std::shared_ptr<ScheduledTaskReference> scheduled_task_;
};

} // namespace erizo
Expand Down
54 changes: 27 additions & 27 deletions erizo/src/erizo/thread/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@

using erizo::Worker;
using erizo::SimulatedWorker;
using erizo::ScheduledTaskReference;

ScheduledTaskReference::ScheduledTaskReference() : cancelled{false} {
}

bool ScheduledTaskReference::isCancelled() {
return cancelled;
}
void ScheduledTaskReference::cancel() {
cancelled = true;
}

Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
: scheduler_{scheduler},
Expand Down Expand Up @@ -50,21 +61,22 @@ void Worker::close() {
service_.stop();
}

int Worker::scheduleFromNow(Task f, duration delta) {
std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) {
auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
int uuid = next_scheduled_++;
auto id = std::make_shared<ScheduledTaskReference>();
if (auto scheduler = scheduler_.lock()) {
scheduler->scheduleFromNow(safeTask([f, uuid](std::shared_ptr<Worker> this_ptr) {
this_ptr->task(this_ptr->safeTask([f, uuid](std::shared_ptr<Worker> this_ptr) {
std::unique_lock<std::mutex> lock(this_ptr->cancel_mutex_);
if (this_ptr->isCancelled(uuid)) {
return;
scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) {
this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) {
{
if (id->isCancelled()) {
return;
}
}
f();
}));
}), delta_ms);
}
return uuid;
return id;
}

void Worker::scheduleEvery(ScheduledTask f, duration period) {
Expand All @@ -84,20 +96,8 @@ void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay
}), next_delay);
}

void Worker::unschedule(int uuid) {
if (uuid < 0) {
return;
}
std::unique_lock<std::mutex> lock(cancel_mutex_);
cancelled_.push_back(uuid);
}

bool Worker::isCancelled(int uuid) {
if (std::find(cancelled_.begin(), cancelled_.end(), uuid) != cancelled_.end()) {
cancelled_.erase(std::remove(cancelled_.begin(), cancelled_.end(), uuid), cancelled_.end());
return true;
}
return false;
void Worker::unschedule(std::shared_ptr<ScheduledTaskReference> id) {
id->cancel();
}

std::function<void()> Worker::safeTask(std::function<void(std::shared_ptr<Worker>)> f) {
Expand Down Expand Up @@ -128,15 +128,15 @@ void SimulatedWorker::close() {
tasks_.clear();
}

int SimulatedWorker::scheduleFromNow(Task f, duration delta) {
int uuid = next_scheduled_++;
scheduled_tasks_[clock_->now() + delta] = [this, f, uuid] {
if (isCancelled(uuid)) {
std::shared_ptr<ScheduledTaskReference> SimulatedWorker::scheduleFromNow(Task f, duration delta) {
auto id = std::make_shared<ScheduledTaskReference>();
scheduled_tasks_[clock_->now() + delta] = [this, f, id] {
if (id->isCancelled()) {
return;
}
f();
};
return uuid;
return id;
}

void SimulatedWorker::executeTasks() {
Expand Down
20 changes: 12 additions & 8 deletions erizo/src/erizo/thread/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

namespace erizo {

class ScheduledTaskReference {
public:
ScheduledTaskReference();
bool isCancelled();
void cancel();
private:
std::atomic<bool> cancelled;
};

class Worker : public std::enable_shared_from_this<Worker> {
public:
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
Expand All @@ -33,14 +42,11 @@ class Worker : public std::enable_shared_from_this<Worker> {
virtual void start(std::shared_ptr<std::promise<void>> start_promise);
virtual void close();

virtual int scheduleFromNow(Task f, duration delta);
virtual void unschedule(int uuid);
virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta);
virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id);

virtual void scheduleEvery(ScheduledTask f, duration period);

protected:
bool isCancelled(int uuid);

private:
void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);
Expand All @@ -55,8 +61,6 @@ class Worker : public std::enable_shared_from_this<Worker> {
asio_worker service_worker_;
boost::thread_group group_;
std::atomic<bool> closed_;
std::vector<int> cancelled_;
mutable std::mutex cancel_mutex_;
};

class SimulatedWorker : public Worker {
Expand All @@ -66,7 +70,7 @@ class SimulatedWorker : public Worker {
void start() override;
void start(std::shared_ptr<std::promise<void>> start_promise) override;
void close() override;
int scheduleFromNow(Task f, duration delta) override;
std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta) override;

void executeTasks();
void executePastScheduledTasks();
Expand Down

0 comments on commit 76e3d39

Please sign in to comment.