Skip to content

Commit

Permalink
Merge pull request #15393 from cdapio/bugfixes/cdap-20869-2
Browse files Browse the repository at this point in the history
[CDAP-20869] Fix closing message tables
  • Loading branch information
masoud-io authored Oct 31, 2023
2 parents 0203ec5 + ae33526 commit 7e9946b
Showing 1 changed file with 16 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,7 @@ public void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOExcept
@Override
public Map<String, String> getTopicMetadataProperties(TopicId topicId)
throws TopicNotFoundException, IOException {
try {
return topicCache.get(topicId).getProperties();
} catch (ExecutionException e) {
Throwable cause = Objects.firstNonNull(e.getCause(), e);
Throwables.propagateIfPossible(cause, TopicNotFoundException.class, IOException.class);
throw Throwables.propagate(e.getCause());
}
return getTopic(topicId).getProperties();
}

@Override
Expand Down Expand Up @@ -438,20 +432,25 @@ private Map<String, String> createDefaultProperties() {
return properties;
}

private TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException {
try {
return topicCache.get(topicId);
} catch (ExecutionException e) {
Throwable cause = Objects.firstNonNull(e.getCause(), e);
Throwables.propagateIfPossible(cause, TopicNotFoundException.class, IOException.class);
throw Throwables.propagate(e.getCause());
}
}

@Override
public CloseableIterator<RawMessage> fetch(MessageFetchRequest messageFetchRequest)
throws TopicNotFoundException, IOException {
final TopicMetadata metadata =
new DefaultTopicMetadata(
messageFetchRequest.getTopicId(),
getTopicMetadataProperties(messageFetchRequest.getTopicId()));
TopicMetadata metadata = getTopic(messageFetchRequest.getTopicId());

MessageTable messageTable = createMessageTable(metadata);
try {
return new MessageCloseableIterator(
metadata,
() -> createMessageTable(metadata),
() -> createPayloadTable(metadata),
messageFetchRequest);
metadata, messageTable, () -> createPayloadTable(metadata), messageFetchRequest);
} catch (Throwable t) {
closeQuietly(messageTable);
throw t;
Expand Down Expand Up @@ -547,13 +546,13 @@ private final class MessageCloseableIterator implements CloseableIterator<RawMes

MessageCloseableIterator(
TopicMetadata topicMetadata,
TableProvider<MessageTable> messageTableProvider,
MessageTable messageTable,
TableProvider<PayloadTable> payloadTableProvider,
MessageFetchRequest messageFetchRequest)
throws IOException {
this.topicMetadata = topicMetadata;
this.topicId = topicMetadata.getTopicId();
this.messageTable = messageTableProvider.get();
this.messageTable = messageTable;
this.payloadTableProvider = payloadTableProvider;
this.inclusive = messageFetchRequest.isIncludeStart();
this.messageLimit = messageFetchRequest.getLimit();
Expand Down

0 comments on commit 7e9946b

Please sign in to comment.