Skip to content

Commit

Permalink
Add lock guards
Browse files Browse the repository at this point in the history
  • Loading branch information
raultapia committed Apr 16, 2024
1 parent 3048b92 commit d92d05c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 53 deletions.
34 changes: 16 additions & 18 deletions src/main_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@ void Asap::run() {
ROS_INFO("ASAP: Running.");

while(ros::ok()) {
dvsMutex_.lock();
apsMutex_.lock();
imuMutex_.lock();

camera_.getData(dvsBuffer_, apsBuffer_, imuBuffer_);

if(dvsBuffer_.size() > DVS_BUFFER_WARNING_SIZE) {
ROS_WARN("DVS buffer size exceeds recommended limit");
}
if(apsBuffer_.size() > APS_BUFFER_WARNING_SIZE) {
ROS_WARN("APS buffer size exceeds recommended limit");
{
const std::lock_guard<std::mutex> lock_dvs(dvsMutex_);
const std::lock_guard<std::mutex> lock_aps(apsMutex_);
const std::lock_guard<std::mutex> lock_imu(imuMutex_);

camera_.getData(dvsBuffer_, apsBuffer_, imuBuffer_);

if(dvsBuffer_.size() > DVS_BUFFER_WARNING_SIZE) {
ROS_WARN("DVS buffer size exceeds recommended limit");
}
if(apsBuffer_.size() > APS_BUFFER_WARNING_SIZE) {
ROS_WARN("APS buffer size exceeds recommended limit");
}
if(imuBuffer_.size() > IMU_BUFFER_WARNING_SIZE) {
ROS_WARN("IMU buffer size exceeds recommended limit");
}
}
if(imuBuffer_.size() > IMU_BUFFER_WARNING_SIZE) {
ROS_WARN("IMU buffer size exceeds recommended limit");
}

dvsMutex_.unlock();
apsMutex_.unlock();
imuMutex_.unlock();

ros::spinOnce();
std::this_thread::sleep_for(std::chrono::nanoseconds(THREAD_SLEEP_TIME_NSEC));
Expand Down
66 changes: 31 additions & 35 deletions src/publishers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
#include "asap/asap.hpp"

template <typename T>
inline void clear_buffer(std::queue<T>& q, std::mutex& m){
const std::lock_guard<std::mutex> lock(m);
while(!q.empty()){
q.pop();
}
}

template <typename T>
inline void get_data_from_buffer(std::queue<T>& q, std::vector<T>& v, std::mutex& m){
const std::lock_guard<std::mutex> lock(m);
while(!q.empty()) {
v.emplace_back(q.front());
q.pop();
}
}

void Asap::initPublishers() {
if(config_.dvs.enabled) {
dvsPub_ = n_.advertise<dvs_msgs::EventArray>("/asap/events", 100);
Expand Down Expand Up @@ -40,19 +57,16 @@ void Asap::dvsPublishFunction() {
std::this_thread::sleep_for(std::chrono::nanoseconds(THREAD_SLEEP_TIME_NSEC));
continue;
}
if(dvsPub_.getNumSubscribers() < 1){
dvsMutex_.lock();
while(!dvsBuffer_.empty()){
dvsBuffer_.pop();
}
dvsMutex_.unlock();
if(dvsPub_.getNumSubscribers() < 1) {
clear_buffer(dvsBuffer_, dvsMutex_);
continue;
}

switch(config_.dvs.mode) {
case Mode::SIZE:
case Mode::AUTO:
dvsMutex_.lock();
{
const std::lock_guard<std::mutex> lock(dvsMutex_);
while(!dvsBuffer_.empty() && amsg.events.size() < static_cast<std::size_t>(config_.dvs.size)) {
if(config_.dvs.gamma == 1 || distribution(generator) < config_.dvs.gamma) {
e = dvsBuffer_.front();
Expand All @@ -64,10 +78,11 @@ void Asap::dvsPublishFunction() {
}
dvsBuffer_.pop();
}
dvsMutex_.unlock();
}
break;
case Mode::TIME:
dvsMutex_.lock();
{
const std::lock_guard<std::mutex> lock(dvsMutex_);
while(!dvsBuffer_.empty() && timediff(amsg.events) < hz2sec(config_.dvs.rate)) {
if(config_.dvs.gamma == 1 || distribution(generator) < config_.dvs.gamma) {
e = dvsBuffer_.front();
Expand All @@ -79,7 +94,7 @@ void Asap::dvsPublishFunction() {
}
dvsBuffer_.pop();
}
dvsMutex_.unlock();
}
break;
}

Expand Down Expand Up @@ -127,21 +142,12 @@ void Asap::apsPublishFunction() {
std::this_thread::sleep_for(std::chrono::nanoseconds(THREAD_SLEEP_TIME_NSEC));
continue;
}
if(apsPub_.getNumSubscribers() < 1){
apsMutex_.lock();
while(!apsBuffer_.empty()){
apsBuffer_.pop();
}
apsMutex_.unlock();
if(apsPub_.getNumSubscribers() < 1) {
clear_buffer(apsBuffer_, apsMutex_);
continue;
}

apsMutex_.lock();
while(!apsBuffer_.empty()) {
aps.emplace_back(apsBuffer_.front());
apsBuffer_.pop();
}
apsMutex_.unlock();
get_data_from_buffer(apsBuffer_, aps, apsMutex_);
for(const ev::StampedMat &a : aps) {
header.stamp.fromSec(a.t);
cv_bridge::CvImage br = cv_bridge::CvImage(header, sensor_msgs::image_encodings::MONO8, a);
Expand All @@ -167,22 +173,12 @@ void Asap::imuPublishFunction() {
std::this_thread::sleep_for(std::chrono::nanoseconds(THREAD_SLEEP_TIME_NSEC));
continue;
}
if(imuPub_.getNumSubscribers() < 1){
imuMutex_.lock();
while(!imuBuffer_.empty()){
imuBuffer_.pop();
}
imuMutex_.unlock();
if(imuPub_.getNumSubscribers() < 1) {
clear_buffer(imuBuffer_, imuMutex_);
continue;
}

imuMutex_.lock();
while(!imuBuffer_.empty()) {
imu.emplace_back(imuBuffer_.front());
imuBuffer_.pop();
}
imuMutex_.unlock();

get_data_from_buffer(imuBuffer_, imu, imuMutex_);
for(const ev::Imu &i : imu) {
msg.header.stamp.fromSec(i.t);
msg.linear_acceleration.x = i.linear_acceleration.x;
Expand Down

0 comments on commit d92d05c

Please sign in to comment.