Skip to content

Commit

Permalink
Merge pull request #15748 from cdapio/CDAP-21084
Browse files Browse the repository at this point in the history
[CDAP-21084] Adding audit log topics to Messaging system topics list in cdap-default.xml
  • Loading branch information
sahusanket authored Nov 28, 2024
2 parents b936e21 + b423ce0 commit 4f5fbc2
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,17 @@
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import io.cdap.cdap.api.messaging.TopicAlreadyExistsException;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.security.authorization.AccessControllerInstantiator;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import org.apache.twill.internal.CompositeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -86,8 +80,6 @@ protected void shutDown() throws Exception {

private AuditLogSingleTopicSubscriberService createChildService(String topicName) {

createTopicIfNeeded(NamespaceId.SYSTEM.topic(topicName));

return new AuditLogSingleTopicSubscriberService(
this.cConf,
this.messagingService,
Expand All @@ -97,16 +89,4 @@ private AuditLogSingleTopicSubscriberService createChildService(String topicName
topicName
);
}

private void createTopicIfNeeded(TopicId topicId) {
try {
messagingService.createTopic(new DefaultTopicMetadata(topicId, Collections.emptyMap()));
LOG.info("Created topic {}", topicId.getTopic());
} catch (TopicAlreadyExistsException ex) {
// no-op
} catch (IOException e) {
//Converting to unchecked exception
throw new RuntimeException(String.format("Failed to create topic %s.", topicId.getTopic(), e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.api.auditlogging.AuditLogWriter;
import io.cdap.cdap.api.messaging.TopicAlreadyExistsException;
import io.cdap.cdap.api.messaging.TopicNotFoundException;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.StoreRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.security.spi.authorization.AuditLogContext;
import io.cdap.cdap.security.spi.authorization.AuditLogRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Queue;
import java.util.Random;
import javax.annotation.Nullable;

Expand All @@ -50,7 +44,6 @@
*/
public class MessagingAuditLogWriter implements AuditLogWriter {

private static final Logger LOG = LoggerFactory.getLogger(MessagingAuditLogWriter.class);
private static final Gson GSON = new Gson();

private final String topicPrefix;
Expand Down Expand Up @@ -102,7 +95,8 @@ public void publish(@Nullable AuditLogRequest auditLogRequest) throws IOExceptio
try {
messagingService.publish(storeRequest);
} catch (TopicNotFoundException e) {
createTopicIfNeeded(topic);
// Core Messaging service should create the required topics for audit log.
// Refer to property `messaging.system.topics`.
throw new RetryableException(e);
}
}, retryStrategy, Retries.ALWAYS_TRUE);
Expand All @@ -111,15 +105,6 @@ public void publish(@Nullable AuditLogRequest auditLogRequest) throws IOExceptio
}
}

private void createTopicIfNeeded(TopicId topic) throws IOException {
try {
messagingService.createTopic(new DefaultTopicMetadata(topic, Collections.emptyMap()));
LOG.info("Created topic {}", topic.getTopic());
} catch (TopicAlreadyExistsException ex) {
// no-op
}
}

/**
* Every operation / event for a Call should be published to a particular topic.
* The Topic name is determined from which POD (using thread name) + the writer class ( using a random number for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public void testPublish() throws Exception {
.thenThrow(new TopicNotFoundException("namespace", "topic"))
.thenReturn(null);

Mockito.doNothing().when(mockMsgService).createTopic(Mockito.any());

MessagingAuditLogWriter messagingAuditLogWriter = new MessagingAuditLogWriter(CConfiguration.create(),
mockMsgService);
Queue<AuditLogContext> auditLogContexts = new ArrayDeque<>();
Expand All @@ -59,9 +57,6 @@ public void testPublish() throws Exception {
1000002L);
messagingAuditLogWriter.publish(auditLogRequest);

//There should be at least 2 invocations. 1st will be a TopicNotFoundException , then 1 audit log request.
Mockito.verify(mockMsgService, Mockito.atLeast(2)).publish(Mockito.any());
//Single invocation to create a topic.
Mockito.verify(mockMsgService, Mockito.times(1)).createTopic(Mockito.any());
Mockito.verify(mockMsgService, Mockito.atLeast(1)).publish(Mockito.any());
}
}
2 changes: 1 addition & 1 deletion cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2621,7 +2621,7 @@
<property>
<name>messaging.system.topics</name>
<value>
${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0
${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0,${auditlog.event.topic}:${auditlog.event.topic.num.partitions}
</value>
<description>
A comma-separated list of topics that are always available in the
Expand Down

0 comments on commit 4f5fbc2

Please sign in to comment.