Skip to content

Commit

Permalink
added API handler functions (scan and get) for SCM LRO
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed Nov 16, 2023
1 parent 815f131 commit 17414ca
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.debug.settings.onBuildFailureProceed": true
}
3 changes: 3 additions & 0 deletions cdap-app-fabric/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ lib/
.idea
data/

# VSCode Files & Dir
.vscode/

# Gradle Files & Dir #
build/
.gradle/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,111 @@
package io.cdap.cdap.gateway.handlers;

import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.gateway.handlers.util.AbstractAppFabricHttpHandler;
import io.cdap.cdap.internal.operation.OperationLifecycleManager;
import io.cdap.cdap.internal.operation.OperationRunFilter;
import io.cdap.cdap.internal.operation.OperationRunNotFoundException;
import io.cdap.cdap.internal.operation.ScanOperationRunsRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.http.HttpHandler;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.ArrayList;
import java.util.List;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;

/**
* The {@link HttpHandler} for handling REST calls to operation endpoints.
*/
/** The {@link HttpHandler} for handling REST calls to operation endpoints. */
@Path(Constants.Gateway.API_VERSION_3 + "/namespaces/{namespace-id}/operations")
public class OperationHttpHandler extends AbstractAppFabricHttpHandler {

private final CConfiguration cConf;
private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(\"?)(\\w+)=(\\w+)(\"?)");
private static final String FILTER_SPLITTER = "AND";
private final FeatureFlagsProvider featureFlagsProvider;
private static final Gson GSON = new Gson();

OperationHttpHandler() {
private final OperationLifecycleManager operationLifecycleManager;
private final int batchSize;
public static final String OPERATIONS_LIST_PAGINATED_KEY = "operations";

@Inject
OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager)
throws Exception {
this.cConf = cConf;
this.batchSize = this.cConf.getInt(AppFabric.STREAMING_BATCH_SIZE);
this.operationLifecycleManager = operationLifecycleManager;
this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}

// TODO[CDAP-20881] : Add RBAC check
/**
* API to fetch all running operations in a namespace.
*
* @param namespaceId Namespace to fetch runs from
* @param pageToken the token identifier for the current page requested in a paginated
* request
* @param pageToken the token identifier for the current page requested in a paginated request
* @param pageSize the number of application details returned in a paginated request
* @param filter optional filters in EBNF grammar. Currently Only one status and one type
* filter is supported with AND expression.
* @param filter optional filters in EBNF grammar. Currently Only one status and one type filter
* is supported with AND expression.
*/
@GET
@Path("/")
public void scanOperations(HttpRequest request, HttpResponder responder,
public void scanOperations(
HttpRequest request,
HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@QueryParam("pageToken") String pageToken,
@QueryParam("pageSize") Integer pageSize,
@QueryParam("filter") String filter) {
// TODO(samik, CDAP-20812) fetch the operation runs from store
List<OperationRun> runs = new ArrayList<>();
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(runs));
@QueryParam("filter") String filter)
throws BadRequestException, IOException, ForbiddenException {
checkSourceControlMultiAppFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(
GSON,
responder,
OPERATIONS_LIST_PAGINATED_KEY,
jsonListResponder -> {
AtomicReference<OperationRun> lastRun = new AtomicReference<>();
ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize, filter);
boolean pageLimitReached = false;
try {
pageLimitReached =
operationLifecycleManager.scanOperations(
scanRequest,
batchSize,
runDetail -> {
OperationRun run = runDetail.getRun();
jsonListResponder.send(run);
lastRun.set(run);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (OperationRunNotFoundException e) {
responder.sendString(HttpResponseStatus.BAD_REQUEST, e.getMessage());
}
OperationRun run = lastRun.get();
return !pageLimitReached || run == null ? null : run.getId();
});
}

/**
Expand All @@ -74,12 +132,23 @@ public void scanOperations(HttpRequest request, HttpResponder responder,
*/
@GET
@Path("/{id}")
public void getOperationRun(HttpRequest request, HttpResponder responder,
public void getOperationRun(
HttpRequest request,
HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@PathParam("id") String runId) {
// // TODO(samik, CDAP-20813) fetch the operation runs from store
OperationRun run = null;
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(run));
@PathParam("id") String runId)
throws BadRequestException, OperationRunNotFoundException, IOException, ForbiddenException {
checkSourceControlMultiAppFeatureFlag();
validateNamespaceId(namespaceId);
if (runId == null || runId.isEmpty()) {
throw new BadRequestException("Path parameter runId cannot be empty");
}
responder.sendJson(
HttpResponseStatus.OK,
GSON.toJson(
operationLifecycleManager
.getOperationRun(new OperationRunId(namespaceId, runId))
.getRun()));
}

/**
Expand All @@ -98,4 +167,96 @@ public void failOperation(FullHttpRequest request, HttpResponder responder,
String.format("Updated status for operation run %s in namespace '%s'.", runId,
namespaceId));
}

private ScanOperationRunsRequest getScanRequest(
String namespaceId, String pageToken, Integer pageSize, String filterStr)
throws IllegalArgumentException {
ScanOperationRunsRequest.Builder builder = ScanOperationRunsRequest.builder();
builder.setNamespace(namespaceId);
if (pageSize != null) {
builder.setLimit(pageSize);
}
if (pageToken != null) {
builder.setScanAfter(pageToken);
}
if (filterStr != null && !filterStr.isEmpty()) {
OperationRunFilter operationRunFilter = getFilter(filterStr);
builder.setFilter(operationRunFilter);
}
return builder.build();
}

// TODO[CDAP-20895] : Add unit tests for extracting OperationRunFilter from filter string
private OperationRunFilter getFilter(String filterStr) throws IllegalArgumentException {
Map<String, String> filterKeyValMap = parseKeyValStr(filterStr, FILTER_SPLITTER);
OperationType operationType = null;
OperationRunStatus operationStatus = null;

for (Map.Entry<String, String> entry : filterKeyValMap.entrySet()) {
String filterValue = entry.getValue();
OperationFilterKey filterKey = OperationFilterKey.valueOf(entry.getKey());

try {
switch (filterKey) {
case TYPE:
operationType = OperationType.valueOf(filterValue);
break;
case STATUS:
operationStatus = OperationRunStatus.valueOf(filterValue);
break;
default:
throw new IllegalArgumentException("Unknown filter key: " + filterKey);
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid " + filterKey.name() + ": " + filterValue, e);
}
}
return new OperationRunFilter(operationType, operationStatus);
}

/**
* Parses a string containing key-value pairs separated by a specified splitter. The string is
* enclosed within quotes.
*
* @param input The input string containing key-value pairs.
* @param splitter The string used to split key-value pairs.
* @return A {@code Map<String, String>} containing the parsed key-value pairs.
* @throws IllegalArgumentException If the input does not match the expected key=val pair pattern.
*/
private static Map<String, String> parseKeyValStr(String input, String splitter) {
Map<String, String> keyValMap = new HashMap<>();
String[] keyValPairs = input.split(splitter);

for (String keyValPair : keyValPairs) {
Matcher matcher = KEY_VALUE_PATTERN.matcher(keyValPair.trim());

if (matcher.matches()) {
keyValMap.put(matcher.group(2).trim().toUpperCase(), matcher.group(3).trim().toUpperCase());
} else {
throw new IllegalArgumentException("Invalid filter key=val pair: " + keyValPair);
}
}
return keyValMap;
}

private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException {
try {
return new NamespaceId(namespaceId);
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage(), e);
}
}

/** throws {@link ForbiddenException} if the feature is disabled */
private void checkSourceControlMultiAppFeatureFlag() throws ForbiddenException {
if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.isEnabled(featureFlagsProvider)) {
throw new ForbiddenException(
"Source Control Management Multiple Apps feature is not enabled.");
}
}

private enum OperationFilterKey {
TYPE,
STATUS
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;

import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand Down Expand Up @@ -78,6 +80,27 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
return currentLimit == 0;
}

/**
* Retrieves details of an operation run identified by the provided {@code OperationRunId}.
*
* @param runId The unique identifier for the operation run.
* @return An {@code OperationRunDetail} object containing information about the specified operation run.
* @throws OperationRunNotFoundException If the specified operation run is not found.
*/
public OperationRunDetail getOperationRun(OperationRunId runId)
throws IOException, OperationRunNotFoundException {
OperationRunDetail operationRunDetail =
TransactionRunners.run(
transactionRunner,
context -> {
return getOperationRunStore(context).getOperation(runId);
},
IOException.class,
OperationRunNotFoundException.class);

return operationRunDetail;
}

/**
* Runs a given operation. It is the responsibility of the caller to validate state transition.
*
Expand Down
Loading

0 comments on commit 17414ca

Please sign in to comment.