Skip to content

Commit

Permalink
Add small fixes for operation launch
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 21, 2023
1 parent 135cdb5 commit 4d4c682
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.cdap.gateway.handlers.ImpersonationHandler;
import io.cdap.cdap.gateway.handlers.InstanceOperationHttpHandler;
import io.cdap.cdap.gateway.handlers.NamespaceHttpHandler;
import io.cdap.cdap.gateway.handlers.OperationHttpHandler;
import io.cdap.cdap.gateway.handlers.OperationalStatsHttpHandler;
import io.cdap.cdap.gateway.handlers.OperationsDashboardHttpHandler;
import io.cdap.cdap.gateway.handlers.PreferencesHttpHandler;
Expand Down Expand Up @@ -471,6 +472,7 @@ protected void configure() {
handlerBinder.addBinding().to(AppStateHandler.class);
handlerBinder.addBinding().to(CredentialProviderHttpHandler.class);
handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class);
handlerBinder.addBinding().to(OperationHttpHandler.class);

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
import io.cdap.cdap.internal.namespace.credential.NamespaceCredentialProviderService;
import io.cdap.cdap.internal.operation.OperationNotificationSubscriberService;
import io.cdap.cdap.internal.provision.ProvisioningService;
import io.cdap.cdap.internal.sysapp.SystemAppManagementService;
import io.cdap.cdap.proto.id.NamespaceId;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class AppFabricServer extends AbstractIdleService {
private final SystemAppManagementService systemAppManagementService;
private final SourceControlOperationRunner sourceControlOperationRunner;
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -131,7 +133,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunRecordTimeToLiveService runRecordTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService) {
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -159,6 +162,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
}

/**
Expand Down Expand Up @@ -189,7 +193,8 @@ protected void startUp() throws Exception {
runRecordCounterService.start(),
runRecordTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
repositoryCleanupService.start(),
operationNotificationSubscriberService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -250,6 +255,7 @@ protected void shutDown() throws Exception {
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.app.sourcecontrol;


/**
* Factory interface for creating {@link PushAppsOperation}.
* This interface is for Guice assisted binding, hence there will be no concrete implementation of it.
Expand All @@ -27,7 +26,7 @@ public interface PushAppsOperationFactory {
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to pull
* @param request contains list of apps to push
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsOperation create(PushAppsRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;

/**
* Abstract runner implementation with common functionality.
*/
public abstract class AbstractOperationRunner implements OperationRunner {

private final PullAppsOperationFactory pullOperationFactory;
private final PushAppsOperationFactory pushAppsOperationFactory;

AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory) {
AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory,
PushAppsOperationFactory pushAppsOperationFactory) {
this.pullOperationFactory = pullOperationFactory;
this.pushAppsOperationFactory = pushAppsOperationFactory;
}

/**
Expand All @@ -40,12 +45,17 @@ protected LongRunningOperation createOperation(OperationRunDetail detail)
throws IllegalStateException {
switch (detail.getRun().getType()) {
case PULL_APPS:
PullAppsRequest request = detail.getPullAppsRequest();
if (request == null) {
PullAppsRequest pullReq = detail.getPullAppsRequest();
if (pullReq == null) {
throw new IllegalStateException("Missing request for pull operation");
}
return pullOperationFactory.create(request);
return pullOperationFactory.create(pullReq);
case PUSH_APPS:
PushAppsRequest pushReq = detail.getPushAppsRequest();
if (pushReq == null) {
throw new IllegalStateException("Missing request for push operation");
}
return pushAppsOperationFactory.create(pushReq);
default:
throw new IllegalStateException(
String.format("Invalid operation type %s", detail.getRun().getType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.inject.Inject;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;

/**
* Implementation of {@link OperationRunner} to run an operation in the same service.
Expand All @@ -33,8 +34,8 @@ public class InMemoryOperationRunner extends AbstractOperationRunner {
*/
@Inject
public InMemoryOperationRunner(OperationStatePublisher statePublisher,
PullAppsOperationFactory pullOperationFactory) {
super(pullOperationFactory);
PullAppsOperationFactory pullOperationFactory, PushAppsOperationFactory pushAppsOperationFactory) {
super(pullOperationFactory, pushAppsOperationFactory);
this.statePublisher = statePublisher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationRun;
import java.util.Arrays;
Expand Down Expand Up @@ -57,15 +58,21 @@ public class OperationRunDetail {
@Nullable
private final PullAppsRequest pullAppsRequest;

@SerializedName("pullAppsRequest")
@Nullable
private final PushAppsRequest pushAppsRequest;

protected OperationRunDetail(
OperationRunId runId, OperationRun run,
byte[] sourceId, @Nullable String principal,
@Nullable PullAppsRequest pullAppsRequest) {
@Nullable PullAppsRequest pullAppsRequest,
@Nullable PushAppsRequest pushAppsRequest) {
this.runId = runId;
this.run = run;
this.sourceId = sourceId;
this.principal = principal;
this.pullAppsRequest = pullAppsRequest;
this.pushAppsRequest = pushAppsRequest;
}

@Nullable
Expand All @@ -82,6 +89,10 @@ public PullAppsRequest getPullAppsRequest() {
return pullAppsRequest;
}

public PushAppsRequest getPushAppsRequest() {
return pushAppsRequest;
}

public OperationRun getRun() {
return run;
}
Expand Down Expand Up @@ -138,6 +149,7 @@ public static class Builder {
protected byte[] sourceId;
protected String principal;
protected PullAppsRequest pullAppsRequest;
protected PushAppsRequest pushAppsRequest;

protected Builder() {
}
Expand All @@ -148,6 +160,7 @@ protected Builder(OperationRunDetail detail) {
run = detail.getRun();
runId = detail.getRunId();
pullAppsRequest = detail.getPullAppsRequest();
pushAppsRequest = detail.getPushAppsRequest();
}

public Builder setSourceId(byte[] sourceId) {
Expand Down Expand Up @@ -176,16 +189,18 @@ public Builder setPullAppsRequest(PullAppsRequest pullAppsRequest) {
return this;
}

public Builder setPushAppsRequest(PushAppsRequest pushAppsRequest) {
this.pushAppsRequest = pushAppsRequest;
return this;
}

/**
* Validates input and returns a OperationRunDetail.
*/
public OperationRunDetail build() {
if (runId == null) {
throw new IllegalArgumentException("run id must be specified.");
}
if (sourceId == null) {
throw new IllegalArgumentException("Operation run source id must be specified.");
}
if (run == null) {
throw new IllegalArgumentException("Operation run must be specified.");
}
Expand All @@ -194,12 +209,12 @@ public OperationRunDetail build() {
throw new IllegalArgumentException("Exactly one request type can be non-null");
}

return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest);
return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest, pushAppsRequest);
}

private boolean validateRequests() {
// validate only one of the request is non-null
return Stream.of(pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1;
return Stream.of(pullAppsRequest, pushAppsRequest).filter(java.util.Objects::nonNull).count() == 1;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ private Optional<StructuredRow> getOperationRunInternal(OperationRunId runId)
private void writeOperationRun(OperationRunId runId, OperationRunDetail detail)
throws IOException {
Collection<Field<?>> fields = ImmutableList.of(
Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()),
Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD,
runId.getNamespace()),
Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD,
detail.getRun().getStatus().name()),
Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()),
Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD,
detail.getRun().getType().name()),
Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD,
detail.getRun().getStatus().name()),
Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD,
detail.getRun().getMetadata().getCreateTime().toEpochMilli()),
Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.cdap.cdap.internal.app.sourcecontrol.LocalApplicationManager;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.internal.operation.InMemoryOperationRunner;
import io.cdap.cdap.internal.operation.InMemoryOperationRuntime;
import io.cdap.cdap.internal.operation.LongRunningOperation;
Expand All @@ -41,6 +43,9 @@ protected void configure() {
install(new FactoryModuleBuilder()
.implement(LongRunningOperation.class, PullAppsOperation.class)
.build(PullAppsOperationFactory.class));
install(new FactoryModuleBuilder()
.implement(LongRunningOperation.class, PushAppsOperation.class)
.build(PushAppsOperationFactory.class));
// TODO(samik) change based on worker enabled on not
bind(ApplicationManager.class).to(LocalApplicationManager.class);
bind(OperationRunner.class).to(InMemoryOperationRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2490,7 +2490,7 @@ public static final class Operation {
public static final String STATUS_EVENT_NUM_PARTITIONS = "operation.status.event.topic.num.partitions";
public static final String STATUS_EVENT_FETCH_SIZE = "operation.status.event.fetch.size";
public static final String STATUS_EVENT_TX_SIZE = "operation.status.event.tx.size";
public static final String STATUS_EVENT_POLL_DELAY_MILLIS = "operatopn.status.event.poll.delay.millis";
public static final String STATUS_EVENT_POLL_DELAY_MILLIS = "operation.status.event.poll.delay.millis";
/**
* Topic name for publishing program status recording events to the messaging system.
*/
Expand Down
Loading

0 comments on commit 4d4c682

Please sign in to comment.