diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSingleTopicSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSingleTopicSubscriberService.java index aa7f9db6dfb..0858d32e98c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSingleTopicSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSingleTopicSubscriberService.java @@ -41,9 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import javax.annotation.Nullable; /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java index 20ff78f3ed6..d18d0bc2d55 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java @@ -20,23 +20,17 @@ import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.Service; import com.google.inject.Inject; -import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.messaging.DefaultTopicMetadata; import io.cdap.cdap.messaging.spi.MessagingService; -import io.cdap.cdap.proto.id.NamespaceId; -import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.authorization.AccessControllerInstantiator; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import org.apache.twill.internal.CompositeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.IntStream; @@ -86,8 +80,6 @@ protected void shutDown() throws Exception { private AuditLogSingleTopicSubscriberService createChildService(String topicName) { - createTopicIfNeeded(NamespaceId.SYSTEM.topic(topicName)); - return new AuditLogSingleTopicSubscriberService( this.cConf, this.messagingService, @@ -97,16 +89,4 @@ private AuditLogSingleTopicSubscriberService createChildService(String topicName topicName ); } - - private void createTopicIfNeeded(TopicId topicId) { - try { - messagingService.createTopic(new DefaultTopicMetadata(topicId, Collections.emptyMap())); - LOG.info("Created topic {}", topicId.getTopic()); - } catch (TopicAlreadyExistsException ex) { - // no-op - } catch (IOException e) { - //Converting to unchecked exception - throw new RuntimeException(String.format("Failed to create topic %s.", topicId.getTopic(), e)); - } - } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java index 6e07b11aa23..92256a6a43f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java @@ -19,7 +19,6 @@ import com.google.gson.Gson; import com.google.inject.Inject; import io.cdap.cdap.api.auditlogging.AuditLogWriter; -import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.api.retry.RetryableException; import io.cdap.cdap.common.conf.CConfiguration; @@ -27,7 +26,6 @@ import io.cdap.cdap.common.service.Retries; import io.cdap.cdap.common.service.RetryStrategies; import io.cdap.cdap.common.service.RetryStrategy; -import io.cdap.cdap.messaging.DefaultTopicMetadata; import io.cdap.cdap.messaging.client.StoreRequestBuilder; import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.spi.StoreRequest; @@ -35,12 +33,8 @@ import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.AuditLogContext; import io.cdap.cdap.security.spi.authorization.AuditLogRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collections; -import java.util.Queue; import java.util.Random; import javax.annotation.Nullable; @@ -50,7 +44,6 @@ */ public class MessagingAuditLogWriter implements AuditLogWriter { - private static final Logger LOG = LoggerFactory.getLogger(MessagingAuditLogWriter.class); private static final Gson GSON = new Gson(); private final String topicPrefix; @@ -102,7 +95,8 @@ public void publish(@Nullable AuditLogRequest auditLogRequest) throws IOExceptio try { messagingService.publish(storeRequest); } catch (TopicNotFoundException e) { - createTopicIfNeeded(topic); + // Core Messaging service should create the required topics for audit log. + // Refer to property `messaging.system.topics`. throw new RetryableException(e); } }, retryStrategy, Retries.ALWAYS_TRUE); @@ -111,15 +105,6 @@ public void publish(@Nullable AuditLogRequest auditLogRequest) throws IOExceptio } } - private void createTopicIfNeeded(TopicId topic) throws IOException { - try { - messagingService.createTopic(new DefaultTopicMetadata(topic, Collections.emptyMap())); - LOG.info("Created topic {}", topic.getTopic()); - } catch (TopicAlreadyExistsException ex) { - // no-op - } - } - /** * Every operation / event for a Call should be published to a particular topic. * The Topic name is determined from which POD (using thread name) + the writer class ( using a random number for diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/security/auth/MessagingAuditLogWriterTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/security/auth/MessagingAuditLogWriterTest.java index b4cf4100872..ee876a5a078 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/security/auth/MessagingAuditLogWriterTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/security/auth/MessagingAuditLogWriterTest.java @@ -37,8 +37,6 @@ public void testPublish() throws Exception { .thenThrow(new TopicNotFoundException("namespace", "topic")) .thenReturn(null); - Mockito.doNothing().when(mockMsgService).createTopic(Mockito.any()); - MessagingAuditLogWriter messagingAuditLogWriter = new MessagingAuditLogWriter(CConfiguration.create(), mockMsgService); Queue auditLogContexts = new ArrayDeque<>(); @@ -59,9 +57,6 @@ public void testPublish() throws Exception { 1000002L); messagingAuditLogWriter.publish(auditLogRequest); - //There should be at least 2 invocations. 1st will be a TopicNotFoundException , then 1 audit log request. - Mockito.verify(mockMsgService, Mockito.atLeast(2)).publish(Mockito.any()); - //Single invocation to create a topic. - Mockito.verify(mockMsgService, Mockito.times(1)).createTopic(Mockito.any()); + Mockito.verify(mockMsgService, Mockito.atLeast(1)).publish(Mockito.any()); } } diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 192bb82129f..0702ad438ae 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -2621,7 +2621,7 @@ messaging.system.topics - ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0 + ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0,${auditlog.event.topic}:${auditlog.event.topic.num.partitions} A comma-separated list of topics that are always available in the