You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It iterates through all connections for the requested topics, reading or decompressing all chunks containing those connections, then reading or decompressing all messages within each chunk, then filtering for messages with the requested connection id. If you supply a list of N topics, then assuming chunks contain messages of each connection id, the entire file is read or decompressed up to N times. For a large file (gigabytes), this is extremely inefficient. Also, the current indexing method involves a lot of non-consecutive seeking, which is very slow on a spinning disk.
Instead the loop nesting should be inverted so that only relevant chunks are read and processed, so that the file is only read once.
Relatedly, another issue with the current API is that there is no way to easily and efficiently iterate through all records out of a given topic list in a bag file, particularly if you need the bagfile timestamp (as opposed to the record header timestamp) -- see #22. Right now the only way to accomplish this is to call BagFile.generateIndexesForTopicList to produce a List<MessageIndex> (which may process the file multiple times, as described above), then for each MessageIndex (which has the required timestamp field), call BagFile.getMessageFromIndex to get a MessageType -- however this results in yet another reading and/or decompression of chunks and/or messages.
Here is a solution. These are all static methods, so that they can be used right now by anyone needing this functionality, without needing to modify the bag-reader-java library -- but they should probably be turned into non-static methods of the BagFile class (removing the BagFile file parameter and replacing with this).
In particular this code provides a significantly more efficient version of BagFile.generateIndexesForTopicList, and also a new method processMessagesForTopicList that iterates through all topics on a given topic list in an efficient way, loading/processing data only a single time.
/** Process a bag file record. */privatestaticinterfaceMessageProcessorInternal {
publicvoidprocess(Connectionconn, Timestamptimestamp, longchunkPos,
longposition, ByteBufferdata) throwsBagReaderException;
}
/** * Efficiently iterate through all messages in a bag file for a given topic * list. */privatestaticvoidprocessAllMessagesForTopic(BagFilefile,
List<String> topics, ProgressMonitorprogressMonitor,
MessageProcessorInternalmessageProcessorInternal)
throwsBagReaderException, InterruptedException {
// Find connections corresponding to requested topicsSet<String> topicsSet = newHashSet<>();
for (Stringtopic : topics) {
topicsSet.add(topic);
}
Map<Integer, Connection> connIdToConnForRequestedTopics = newHashMap<>();
for (Connectionconn : file.getConnections()) {
if (topicsSet.contains(conn.getTopic())) {
connIdToConnForRequestedTopics.put(conn.getConnectionId(),
conn);
}
}
List<ChunkInfo> chunkInfos = file.getChunkInfos().stream()
.sorted(newComparator<ChunkInfo>() {
// Sort chunkInfos into ascending order, to ensure chunks// are accessed sequentially, to save on seek time@Overridepublicintcompare(ChunkInfoci0, ChunkInfoci1) {
returnLong.compare(ci0.getChunkPos(),
ci1.getChunkPos());
}
}).filter(newPredicate<ChunkInfo>() {
// Filter for chunkInfos that contain requested topics@Overridepublicbooleantest(ChunkInfochunkInfo) {
for (ChunkConnectionchunkConn : chunkInfo
.getConnections()) {
if (connIdToConnForRequestedTopics
.containsKey(chunkConn.getConnectionId())) {
returntrue;
}
}
returnfalse;
}
}).collect(Collectors.toList());
try (FileChannelchannel = file.getChannel()) {
intchunkNum = 0;
intnumChunks = chunkInfos.size();
if (progressMonitor != null) {
progressMonitor.setMinimum(0);
progressMonitor.setMaximum(numChunks);
if (progressMonitor.isCanceled()) {
thrownewInterruptedException();
}
}
// Iterate through all chunks in bag filefor (ChunkInfochunkInfo : chunkInfos) {
longchunkPos = chunkInfo.getChunkPos();
if (chunkNum % 500 == 0) {
myLogger.info("\t\tChunk " + chunkNum + "/" + numChunks);
}
if (progressMonitor != null) {
if (progressMonitor.isCanceled()) {
progressMonitor.setNote("canceling");
thrownewInterruptedException("canceled indexing");
}
progressMonitor.setProgress(chunkNum);
}
chunkNum++;
// Decompress chunk if necessaryRecordchunk = BagFile.recordAt(channel, chunkPos);
chunk.readData();
try (ByteBufferChannelchunkChannel = newByteBufferChannel(
chunk.getData())) {
while (chunkChannel.position() < chunkChannel.size()) {
// Decompress message if necessarylongposition = chunkChannel.position();
Recordmsg = newRecord(chunkChannel);
ByteBufferdata = msg.getData()
.order(ByteOrder.LITTLE_ENDIAN);
Headerheader = msg.getHeader();
if (header.getType() == Record.RecordType.MESSAGE_DATA) {
// Check if message has one of the requested topicsintconnId = header.getInt("conn");
Connectionconn = connIdToConnForRequestedTopics
.get(connId);
if (conn != null) {
// Message has requested topic, process itTimestamptimestamp = header.getTimestamp("time");
messageProcessorInternal.process(conn,
timestamp, chunkPos, position, data);
}
}
}
}
}
} catch (IOExceptione) {
thrownewBagReaderException(e);
}
}
/** * More efficient version of {#link * {@link BagFile#generateIndexesForTopicList(List, ProgressMonitor)}}. */publicstaticList<MessageIndex> generateIndexesForTopicList(BagFilefile,
List<String> topics, ProgressMonitorprogressMonitor)
throwsBagReaderException, InterruptedException {
List<MessageIndex> messageIndexList = newArrayList<>();
processAllMessagesForTopic(file, topics, progressMonitor,
newMessageProcessorInternal() {
@Overridepublicvoidprocess(Connectionconn, Timestamptimestamp,
longchunkPos, longposition, ByteBufferdata) {
messageIndexList.add(newMessageIndex(chunkPos,
position, conn.getTopic(), timestamp));
}
});
returnmessageIndexList;
}
/** Process a message from a bag file. */publicstaticinterfaceMessageProcessor {
publicvoidprocess(Stringtopic, Timestamptimestamp,
MessageTypemessageType) throwsUninitializedFieldException,
BagReaderException;
}
/** Process all messages from a given topic list. */publicstaticvoidprocessMessagesForTopicList(BagFilefile,
List<String> topics, ProgressMonitorprogressMonitor,
MessageProcessormessageProcessor)
throwsBagReaderException, InterruptedException {
processAllMessagesForTopic(file, topics, progressMonitor,
newMessageProcessorInternal() {
@Overridepublicvoidprocess(Connectionconn, Timestamptimestamp,
longchunkPos, longposition, ByteBufferdata)
throwsBagReaderException {
// Deserialize and process messageMessageTypemessageType;
try {
messageType = conn.getMessageCollection()
.getMessageType();
} catch (UnknownMessageExceptione) {
thrownewBagReaderException(e);
}
messageType.readMessage(data);
try {
messageProcessor.process(conn.getTopic(),
timestamp, messageType);
} catch (UninitializedFieldExceptione) {
thrownewBagReaderException(e);
}
}
});
}
The text was updated successfully, but these errors were encountered:
The
BagFile.generateIndexesForTopicList
method is very inefficient:https://github.com/swri-robotics/bag-reader-java/blob/master/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java#L784
It iterates through all connections for the requested topics, reading or decompressing all chunks containing those connections, then reading or decompressing all messages within each chunk, then filtering for messages with the requested connection id. If you supply a list of N topics, then assuming chunks contain messages of each connection id, the entire file is read or decompressed up to N times. For a large file (gigabytes), this is extremely inefficient. Also, the current indexing method involves a lot of non-consecutive seeking, which is very slow on a spinning disk.
Instead the loop nesting should be inverted so that only relevant chunks are read and processed, so that the file is only read once.
Relatedly, another issue with the current API is that there is no way to easily and efficiently iterate through all records out of a given topic list in a bag file, particularly if you need the bagfile timestamp (as opposed to the record header timestamp) -- see #22. Right now the only way to accomplish this is to call
BagFile.generateIndexesForTopicList
to produce aList<MessageIndex>
(which may process the file multiple times, as described above), then for eachMessageIndex
(which has the requiredtimestamp
field), callBagFile.getMessageFromIndex
to get aMessageType
-- however this results in yet another reading and/or decompression of chunks and/or messages.Here is a solution. These are all static methods, so that they can be used right now by anyone needing this functionality, without needing to modify the
bag-reader-java
library -- but they should probably be turned into non-static methods of theBagFile
class (removing theBagFile file
parameter and replacing withthis
).In particular this code provides a significantly more efficient version of
BagFile.generateIndexesForTopicList
, and also a new methodprocessMessagesForTopicList
that iterates through all topics on a given topic list in an efficient way, loading/processing data only a single time.The text was updated successfully, but these errors were encountered: