Skip to content

Commit

Permalink
OnlineBagger documentation (#1070)
Browse files Browse the repository at this point in the history
* Move docstring into class

* Add missing args/returns

* Add attributes to class

* Fix indentation error

* Add OnlineBagger to docs

* Fix nested if statements

* Slight docs modifications

---------

Co-authored-by: Cameron Brown <[email protected]>
Co-authored-by: cameron brown <[email protected]>
  • Loading branch information
3 people authored Mar 3, 2024
1 parent 69be430 commit 44997b7
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 33 deletions.
9 changes: 9 additions & 0 deletions docs/reference/bagging.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Online Bagger
-------------

.. currentmodule:: mil_tools

.. attributetable:: nodes.online_bagger.OnlineBagger

.. autoclass:: nodes.online_bagger.OnlineBagger
:members:
1 change: 1 addition & 0 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ by MIL. These subsystems relate to a variety of processes.
poi
pneumatic
sabertooth
bagging
156 changes: 123 additions & 33 deletions mil_common/utils/mil_tools/nodes/online_bagger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env python3

from __future__ import annotations

import argparse
import datetime
import itertools
import os
from collections import deque
from typing import TYPE_CHECKING

import rosbag
import rospy
Expand All @@ -19,20 +21,30 @@
)
from tqdm import tqdm

"""
Online Bagger is a node which subscribes to a list of ros topics,
maintaining a buffer of the most recent n seconds. Parts or all of
these buffered topics can be written to a bag file by
sending a new goal to the /online_bagger/bag action server.
When run with the -c flag, instead runs an action client which connects
to online bagger, triggering a bag write and displaying a progress bar
as it writes.
"""
if TYPE_CHECKING:
import genpy


class OnlineBagger:
"""
Node that maintains a list of bagged information relating to the specified
topics.
Subscribes to a list of ROS topics, and maintains a buffer of the most recent
n seconds. Parts or all of these buffered topics can be written to a bag
file by sending a new goal to the /online_bagger/bag action server. When
run with the -c flag, instead runs an action client which connects to online
bagger, triggering a bag write and displaying a progress bar as it writes.
Attributes:
successful_subscription_count (int): The number of successful subscriptions
to topics.
iteration_count (int): The number of iterations.
streaming (bool): Indicates whether the bagger is streaming.
subscriber_list (list[str]): The list of topics subscribed to the
OnlineBagger.
"""

BAG_TOPIC = "/online_bagger/bag"

def __init__(self):
Expand Down Expand Up @@ -67,6 +79,15 @@ def get_subscriber_list(self, status):
"""
Get string of all topics, if their subscribe status matches the input (True / False)
Outputs each topics: time_buffer(float in seconds), subscribe_statue(bool), topic(string)
Args:
status (bool): The subscription status used to search for topics with a matching
subscription status.
Returns:
string: The list of topics that match the desired subscribe status. Each
line in the list contains the buffer time (in seconds) of the topic, the subscrition
status of the topic, and the topic name.
"""
sub_list = ""
for topic in self.subscriber_list:
Expand Down Expand Up @@ -99,10 +120,23 @@ def get_params(self):
self.subscriber_list[topic[0]] = (time, False)

def add_unique_topic(topic):
"""
Adds a topic to the subscriber list if the topic is not already in the
list.
Args:
topic (str): The name of the topic to add to the subscriber list.
"""
if topic not in self.subscriber_list:
self.subscriber_list[topic] = (self.stream_time, False)

def add_env_var(var):
"""
Adds topic(s) to the subscriber list.
Args:
var (str): The topic(s) to add to the subscriber list.
"""
for topic in var.split():
add_unique_topic(topic)

Expand Down Expand Up @@ -189,7 +223,7 @@ def subscribe_loop(self):
self.subscribe,
)

def subscribe(self, time_info=None):
def subscribe(self, _: rospy.timer.TimerEvent | None = None):
"""
Subscribe to the topics defined in the yaml configuration file
Expand All @@ -200,13 +234,12 @@ def subscribe(self, time_info=None):
Each element in self.subscriber list is a list [topic, Bool]
where the Bool tells the current status of the subscriber (success/failure).
Return number of topics that failed subscription
"""
if self.successful_subscription_count == len(self.subscriber_list):
if self.resubscriber is not None:
self.resubscriber.shutdown()
rospy.loginfo("All topics subscribed too! Shutting down resubscriber")
if (self.successful_subscription_count == len(self.subscriber_list)) and (
self.resubscriber is not None
):
self.resubscriber.shutdown()
rospy.loginfo("All topics subscribed too! Shutting down resubscriber")

for topic, (time, subscribed) in self.subscriber_list.items():
if not subscribed:
Expand All @@ -221,33 +254,54 @@ def subscribe(self, time_info=None):

self.subscriber_list[topic] = (time, True)

def get_topic_duration(self, topic):
"""
Return current time duration of topic
def get_topic_duration(self, topic: str):
"""
Returns the current time duration of topic
Args:
topic (str): The topic for which the duration will be calculated.
Returns:
Duration: The time duration of the topic.
"""
return self.topic_messages[topic][-1][0] - self.topic_messages[topic][0][0]

def get_header_time(self, msg):
def get_header_time(self, msg: genpy.Message):
"""
Retrieve header time if available
"""
Args:
msg (genpy.Message): The ROS message from which to extract the time.
Returns:
rospy.Time: The timestamp of the topic's header if the topic
has a header. Otherwise, the current time is returned.
"""
if hasattr(msg, "header"):
return msg.header.stamp
else:
return rospy.get_rostime()

def get_time_index(self, topic, requested_seconds):
"""
Return the index for the time index for a topic at 'n' seconds from the end of the dequeue
Returns the index for the time index for a topic at 'n' seconds from the end of the dequeue.
For example, to bag the last 10 seconds of data, the index for 10 seconds back from the most
recent message can be obtained with this function.
The number of requested seconds should be the number of seoncds desired from
the end of deque. (ie. requested_seconds = 10 )
If the desired time length of the bag is greater than the available messages it will output a
message and return how ever many seconds of data are available at the moment.
Seconds is of a number type (not a rospy.Time type) (ie. int, float)
Args:
topic (str): The name of the topic for which to get the time index.
requested_seconds (int/float): The number of seconds from the end of the dequeue to search
for the topic.
Returns:
int: The index for the time index of the topic at requested_seconds seconds from the
end of the dequeue.
"""

topic_duration = self.get_topic_duration(topic).to_sec()
Expand All @@ -259,12 +313,17 @@ def get_time_index(self, topic, requested_seconds):
index = int(self.get_topic_message_count(topic) * (1 - min(ratio, 1)))
return index

def bagger_callback(self, msg, topic):
def bagger_callback(self, msg: genpy.Message, topic: str):
"""
Streaming callback function, stops streaming during bagging process
also pops off msgs from dequeue if stream size is greater than specified stream_time
Adds incoming messages to the appropriate topic and removes older messages if necessary.
Streaming callback function, stops streaming during bagging process and pops off msgs
from dequeue if stream size is greater than specified stream_time. Stream, callback
function does nothing if streaming is not active.
Stream, callback function does nothing if streaming is not active
Args:
msg (genpy.Message): The incoming message.
topic (str): The topic to which the incoming message will be added.
"""

if not self.streaming:
Expand Down Expand Up @@ -294,18 +353,27 @@ def bagger_callback(self, msg, topic):
self.topic_messages[topic].popleft()
time_diff = self.get_topic_duration(topic)

def get_topic_message_count(self, topic):
def get_topic_message_count(self, topic: str):
"""
Return number of messages available in a topic
Args:
topic (str): The name of the topic for which to calculate the number
of messages.
Returns:
int: The number of messages available in the specified topic.
"""

return len(self.topic_messages[topic])

def get_total_message_count(self):
"""
Returns total number of messages across all topics
"""
Returns:
int: The total number of messages available in all topics.
"""
total_message_count = 0
for topic in self.topic_messages:
total_message_count = total_message_count + self.get_topic_message_count(
Expand All @@ -315,14 +383,28 @@ def get_total_message_count(self):
return total_message_count

def _get_default_filename(self):
"""
Uses the current date and time to create a default bag name.
Returns:
str: The default bag name constructed using format date-time.
"""
return (
str(datetime.date.today()) + "-" + str(datetime.datetime.now().time())[0:8]
)

def get_bag_name(self, filename=""):
def get_bag_name(self, filename: str = ""):
"""
Create ros bag save directory
If no bag name is provided, the current date/time is used as default.
Args:
filename (str): The save directory for the ros bag. The default value
is an empty string, which will result in the default filename being
used.
Returns:
str: A string representing the path of the ros bag file.
"""
# If directory param is not set, default to $HOME/bags/<date>
default_dir = self.dir
Expand All @@ -347,11 +429,19 @@ def get_bag_name(self, filename=""):
bag_name = bag_name + ".bag"
return os.path.join(bag_dir, bag_name)

def start_bagging(self, req):
def start_bagging(self, req: BagOnlineGoal):
"""
Writes collected data to a bag file.
Dump all data in dictionary to bags, temporarily stops streaming
during the bagging process, resumes streaming when over.
If bagging is already false because of an active call to this service
If bagging is already false because of an active call to this service.
Args:
req (BagOnlineGoal): The bagging request information.
Raises:
IOError: A problem occurs when opening or closing the bag file.
"""
result = BagOnlineResult()
if self.streaming is False:
Expand Down

0 comments on commit 44997b7

Please sign in to comment.