diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index c32d44323d0..24c028a0471 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -67,6 +67,7 @@ import io.cdap.cdap.gateway.handlers.ImpersonationHandler; import io.cdap.cdap.gateway.handlers.InstanceOperationHttpHandler; import io.cdap.cdap.gateway.handlers.NamespaceHttpHandler; +import io.cdap.cdap.gateway.handlers.OperationHttpHandler; import io.cdap.cdap.gateway.handlers.OperationalStatsHttpHandler; import io.cdap.cdap.gateway.handlers.OperationsDashboardHttpHandler; import io.cdap.cdap.gateway.handlers.PreferencesHttpHandler; @@ -471,6 +472,7 @@ protected void configure() { handlerBinder.addBinding().to(AppStateHandler.class); handlerBinder.addBinding().to(CredentialProviderHttpHandler.class); handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class); + handlerBinder.addBinding().to(OperationHttpHandler.class); FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java index 81a187cd042..40cda348e4c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java @@ -41,6 +41,7 @@ import io.cdap.cdap.internal.bootstrap.BootstrapService; import io.cdap.cdap.internal.credential.CredentialProviderService; import io.cdap.cdap.internal.namespace.credential.NamespaceCredentialProviderService; +import io.cdap.cdap.internal.operation.OperationNotificationSubscriberService; import io.cdap.cdap.internal.provision.ProvisioningService; import io.cdap.cdap.internal.sysapp.SystemAppManagementService; import io.cdap.cdap.proto.id.NamespaceId; @@ -93,6 +94,7 @@ public class AppFabricServer extends AbstractIdleService { private final SystemAppManagementService systemAppManagementService; private final SourceControlOperationRunner sourceControlOperationRunner; private final RepositoryCleanupService repositoryCleanupService; + private final OperationNotificationSubscriberService operationNotificationSubscriberService; private final CConfiguration cConf; private final SConfiguration sConf; private final boolean sslEnabled; @@ -131,7 +133,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, CommonNettyHttpServiceFactory commonNettyHttpServiceFactory, RunRecordTimeToLiveService runRecordTimeToLiveService, SourceControlOperationRunner sourceControlOperationRunner, - RepositoryCleanupService repositoryCleanupService) { + RepositoryCleanupService repositoryCleanupService, + OperationNotificationSubscriberService operationNotificationSubscriberService) { this.hostname = hostname; this.discoveryService = discoveryService; this.handlers = handlers; @@ -159,6 +162,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory; this.sourceControlOperationRunner = sourceControlOperationRunner; this.repositoryCleanupService = repositoryCleanupService; + this.operationNotificationSubscriberService = operationNotificationSubscriberService; } /** @@ -189,7 +193,8 @@ protected void startUp() throws Exception { runRecordCounterService.start(), runRecordTimeToLiveService.start(), sourceControlOperationRunner.start(), - repositoryCleanupService.start() + repositoryCleanupService.start(), + operationNotificationSubscriberService.start() )); Futures.allAsList(futuresList).get(); @@ -250,6 +255,7 @@ protected void shutDown() throws Exception { repositoryCleanupService.stopAndWait(); credentialProviderService.stopAndWait(); namespaceCredentialProviderService.stopAndWait(); + operationNotificationSubscriberService.stopAndWait(); } private Cancellable startHttpService(NettyHttpService httpService) throws Exception { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/PushAppsOperationFactory.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/PushAppsOperationFactory.java index 6a3ce0f3e53..0f3add565a0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/PushAppsOperationFactory.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/PushAppsOperationFactory.java @@ -16,7 +16,6 @@ package io.cdap.cdap.internal.app.sourcecontrol; - /** * Factory interface for creating {@link PushAppsOperation}. * This interface is for Guice assisted binding, hence there will be no concrete implementation of it. @@ -27,7 +26,7 @@ public interface PushAppsOperationFactory { * Returns an implementation of {@link PushAppsOperation} that operates on the given {@link * PushAppsRequest}. * - * @param request contains list of apps to pull + * @param request contains list of apps to push * @return a new instance of {@link PushAppsOperation}. */ PushAppsOperation create(PushAppsRequest request); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java index 5970f4f432e..57d996333d3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java @@ -18,6 +18,8 @@ import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; /** * Abstract runner implementation with common functionality. @@ -25,9 +27,12 @@ public abstract class AbstractOperationRunner implements OperationRunner { private final PullAppsOperationFactory pullOperationFactory; + private final PushAppsOperationFactory pushAppsOperationFactory; - AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory) { + AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory, + PushAppsOperationFactory pushAppsOperationFactory) { this.pullOperationFactory = pullOperationFactory; + this.pushAppsOperationFactory = pushAppsOperationFactory; } /** @@ -40,12 +45,17 @@ protected LongRunningOperation createOperation(OperationRunDetail detail) throws IllegalStateException { switch (detail.getRun().getType()) { case PULL_APPS: - PullAppsRequest request = detail.getPullAppsRequest(); - if (request == null) { + PullAppsRequest pullReq = detail.getPullAppsRequest(); + if (pullReq == null) { throw new IllegalStateException("Missing request for pull operation"); } - return pullOperationFactory.create(request); + return pullOperationFactory.create(pullReq); case PUSH_APPS: + PushAppsRequest pushReq = detail.getPushAppsRequest(); + if (pushReq == null) { + throw new IllegalStateException("Missing request for push operation"); + } + return pushAppsOperationFactory.create(pushReq); default: throw new IllegalStateException( String.format("Invalid operation type %s", detail.getRun().getType())); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java index 8b6c4697849..513b160b5e3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java @@ -18,6 +18,7 @@ import com.google.inject.Inject; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory; /** * Implementation of {@link OperationRunner} to run an operation in the same service. @@ -33,8 +34,8 @@ public class InMemoryOperationRunner extends AbstractOperationRunner { */ @Inject public InMemoryOperationRunner(OperationStatePublisher statePublisher, - PullAppsOperationFactory pullOperationFactory) { - super(pullOperationFactory); + PullAppsOperationFactory pullOperationFactory, PushAppsOperationFactory pushAppsOperationFactory) { + super(pullOperationFactory, pushAppsOperationFactory); this.statePublisher = statePublisher; } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java index e2780790bd2..60dfadba751 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java @@ -19,6 +19,7 @@ import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationRun; import java.util.Arrays; @@ -57,15 +58,21 @@ public class OperationRunDetail { @Nullable private final PullAppsRequest pullAppsRequest; + @SerializedName("pullAppsRequest") + @Nullable + private final PushAppsRequest pushAppsRequest; + protected OperationRunDetail( OperationRunId runId, OperationRun run, byte[] sourceId, @Nullable String principal, - @Nullable PullAppsRequest pullAppsRequest) { + @Nullable PullAppsRequest pullAppsRequest, + @Nullable PushAppsRequest pushAppsRequest) { this.runId = runId; this.run = run; this.sourceId = sourceId; this.principal = principal; this.pullAppsRequest = pullAppsRequest; + this.pushAppsRequest = pushAppsRequest; } @Nullable @@ -82,6 +89,10 @@ public PullAppsRequest getPullAppsRequest() { return pullAppsRequest; } + public PushAppsRequest getPushAppsRequest() { + return pushAppsRequest; + } + public OperationRun getRun() { return run; } @@ -138,6 +149,7 @@ public static class Builder { protected byte[] sourceId; protected String principal; protected PullAppsRequest pullAppsRequest; + protected PushAppsRequest pushAppsRequest; protected Builder() { } @@ -148,6 +160,7 @@ protected Builder(OperationRunDetail detail) { run = detail.getRun(); runId = detail.getRunId(); pullAppsRequest = detail.getPullAppsRequest(); + pushAppsRequest = detail.getPushAppsRequest(); } public Builder setSourceId(byte[] sourceId) { @@ -176,6 +189,11 @@ public Builder setPullAppsRequest(PullAppsRequest pullAppsRequest) { return this; } + public Builder setPushAppsRequest(PushAppsRequest pushAppsRequest) { + this.pushAppsRequest = pushAppsRequest; + return this; + } + /** * Validates input and returns a OperationRunDetail. */ @@ -183,9 +201,6 @@ public OperationRunDetail build() { if (runId == null) { throw new IllegalArgumentException("run id must be specified."); } - if (sourceId == null) { - throw new IllegalArgumentException("Operation run source id must be specified."); - } if (run == null) { throw new IllegalArgumentException("Operation run must be specified."); } @@ -194,12 +209,12 @@ public OperationRunDetail build() { throw new IllegalArgumentException("Exactly one request type can be non-null"); } - return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest); + return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest, pushAppsRequest); } private boolean validateRequests() { // validate only one of the request is non-null - return Stream.of(pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1; + return Stream.of(pullAppsRequest, pushAppsRequest).filter(java.util.Objects::nonNull).count() == 1; } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java index ed437d960d8..aeeee7e8a97 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java @@ -321,13 +321,13 @@ private Optional getOperationRunInternal(OperationRunId runId) private void writeOperationRun(OperationRunId runId, OperationRunDetail detail) throws IOException { Collection> fields = ImmutableList.of( - Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()), Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, runId.getNamespace()), - Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, - detail.getRun().getStatus().name()), + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()), Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD, detail.getRun().getType().name()), + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + detail.getRun().getStatus().name()), Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD, detail.getRun().getMetadata().getCreateTime().toEpochMilli()), Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java index a019958545a..47bc52e8c95 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java @@ -21,6 +21,8 @@ import io.cdap.cdap.internal.app.sourcecontrol.LocalApplicationManager; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperation; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory; import io.cdap.cdap.internal.operation.InMemoryOperationRunner; import io.cdap.cdap.internal.operation.InMemoryOperationRuntime; import io.cdap.cdap.internal.operation.LongRunningOperation; @@ -41,6 +43,9 @@ protected void configure() { install(new FactoryModuleBuilder() .implement(LongRunningOperation.class, PullAppsOperation.class) .build(PullAppsOperationFactory.class)); + install(new FactoryModuleBuilder() + .implement(LongRunningOperation.class, PushAppsOperation.class) + .build(PushAppsOperationFactory.class)); // TODO(samik) change based on worker enabled on not bind(ApplicationManager.class).to(LocalApplicationManager.class); bind(OperationRunner.class).to(InMemoryOperationRunner.class); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 3178a7a8d67..28755a866f4 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2490,7 +2490,7 @@ public static final class Operation { public static final String STATUS_EVENT_NUM_PARTITIONS = "operation.status.event.topic.num.partitions"; public static final String STATUS_EVENT_FETCH_SIZE = "operation.status.event.fetch.size"; public static final String STATUS_EVENT_TX_SIZE = "operation.status.event.tx.size"; - public static final String STATUS_EVENT_POLL_DELAY_MILLIS = "operatopn.status.event.poll.delay.millis"; + public static final String STATUS_EVENT_POLL_DELAY_MILLIS = "operation.status.event.poll.delay.millis"; /** * Topic name for publishing program status recording events to the messaging system. */ diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index a6777d87bb8..ac6555c6819 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -2611,7 +2611,7 @@ messaging.system.topics - ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0 + ${audit.topic},${metadata.messaging.topic},${data.event.topic},${metrics.topic.prefix}:${metrics.messaging.topic.num},${metrics.admin.topic},${time.event.topic},${program.status.event.topic},${program.status.event.topic}:${program.status.event.topic.num.partitions},${operation.status.event.topic}:${operation.status.event.topic.num.partitions},${program.status.record.event.topic},${log.tms.topic.prefix}:${log.publish.num.partitions},${preview.messaging.topic},previewlog0 A comma-separated list of topics that are always available in the system namespace. Multiple topics sharing the same prefix and @@ -4382,47 +4382,6 @@ - - operation.status.retry.policy.base.delay.ms - 1000 - - The base delay between retries in milliseconds - - - - - operation.status.retry.policy.max.delay.ms - 3000 - - The maximum delay between retries in milliseconds - - - - - operation.status.retry.policy.max.retries - 1000 - - The maximum number of retries to attempt before aborting - - - - - operation.status.retry.policy.max.time.secs - 600 - - The maximum elapsed time in seconds before retries are aborted - - - - - operation.status.retry.policy.type - fixed.delay - - The type of retry policy for programs. Allowed options: - "none", "fixed.delay", or "exponential.backoff". - - - @@ -6293,4 +6252,93 @@ + + + + operation.status.retry.policy.base.delay.ms + 1000 + + The base delay between retries in milliseconds + + + + + operation.status.retry.policy.max.delay.ms + 3000 + + The maximum delay between retries in milliseconds + + + + + operation.status.retry.policy.max.retries + 1000 + + The maximum number of retries to attempt before aborting + + + + + operation.status.retry.policy.max.time.secs + 600 + + The maximum elapsed time in seconds before retries are aborted + + + + + operation.status.retry.policy.type + fixed.delay + + The type of retry policy for programs. Allowed options: + "none", "fixed.delay", or "exponential.backoff". + + + + + operation.status.event.topic + operationstatusevent + + Topic prefix for publishing status transitioning events of operation runs to + the messaging system. + + f + + + operation.status.event.topic.num.partitions + 1 + + Number of topics to use for operation run events. + All events related to same run should always go to same topic. + + + + + operation.status.event.fetch.size + 10 + + Maximum number of events to fetch from the messaging system in each + processing cycle for operation status update events + + + + + operation.status.event.tx.size + ${operation.status.event.fetch.size} + + Maximum number of events to process in one transaction when messages are + fetched from the messaging system + in each processing cycle for operation status update events + + + + + operation.status.event.poll.delay.millis + 1000 + + The delay in milliseconds to check again for new operation status events + after it detects there was no event + + + diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java index 10cb3304e4a..2f86499b25d 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java @@ -1344,8 +1344,8 @@ public static final class OperationRunsStore { new StructuredTableSpecification.Builder() .withId(OPERATION_RUNS) .withFields( - Fields.stringType(ID_FIELD), Fields.stringType(NAMESPACE_FIELD), + Fields.stringType(ID_FIELD), Fields.stringType(TYPE_FIELD), Fields.stringType(STATUS_FIELD), Fields.longType(START_TIME_FIELD), diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/startup/ConfigurationCheck.java b/cdap-master/src/main/java/io/cdap/cdap/master/startup/ConfigurationCheck.java index a8f5bd445df..6ec373f8295 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/startup/ConfigurationCheck.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/startup/ConfigurationCheck.java @@ -193,6 +193,7 @@ private void checkMessagingTopics(Set problemKeys) { validateMessagingTopic(Constants.Scheduler.TIME_EVENT_TOPIC, problemKeys); validateMessagingTopic(Constants.AppFabric.PROGRAM_STATUS_EVENT_TOPIC, problemKeys); validateMessagingTopic(Constants.AppFabric.PROGRAM_STATUS_RECORD_EVENT_TOPIC, problemKeys); + validateMessagingTopic(Constants.Operation.STATUS_EVENT_TOPIC, problemKeys); } private void checkProgramConfigurations(Set problemKeys) {