diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000000..c995aa5cefc3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.debug.settings.onBuildFailureProceed": true +} \ No newline at end of file diff --git a/cdap-app-fabric/.gitignore b/cdap-app-fabric/.gitignore index 321af1077cac..37a691f73e9c 100644 --- a/cdap-app-fabric/.gitignore +++ b/cdap-app-fabric/.gitignore @@ -17,6 +17,9 @@ lib/ .idea data/ +# VSCode Files & Dir +.vscode/ + # Gradle Files & Dir # build/ .gradle/ diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java index 6e1673b883de..964f84cd3406 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java @@ -17,53 +17,111 @@ package io.cdap.cdap.gateway.handlers; import com.google.gson.Gson; +import com.google.inject.Inject; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; +import io.cdap.cdap.common.BadRequestException; +import io.cdap.cdap.common.ForbiddenException; +import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.conf.Constants.AppFabric; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.gateway.handlers.util.AbstractAppFabricHttpHandler; +import io.cdap.cdap.internal.operation.OperationLifecycleManager; +import io.cdap.cdap.internal.operation.OperationRunFilter; +import io.cdap.cdap.internal.operation.OperationRunNotFoundException; +import io.cdap.cdap.internal.operation.ScanOperationRunsRequest; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.http.HttpHandler; import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; -import java.util.ArrayList; -import java.util.List; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; -/** - * The {@link HttpHandler} for handling REST calls to operation endpoints. - */ +/** The {@link HttpHandler} for handling REST calls to operation endpoints. */ @Path(Constants.Gateway.API_VERSION_3 + "/namespaces/{namespace-id}/operations") public class OperationHttpHandler extends AbstractAppFabricHttpHandler { - + private final CConfiguration cConf; + private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(\"?)(\\w+)=(\\w+)(\"?)"); + private static final String FILTER_SPLITTER = "AND"; + private final FeatureFlagsProvider featureFlagsProvider; private static final Gson GSON = new Gson(); - - OperationHttpHandler() { + private final OperationLifecycleManager operationLifecycleManager; + private final int batchSize; + public static final String OPERATIONS_LIST_PAGINATED_KEY = "operations"; + + @Inject + OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager) + throws Exception { + this.cConf = cConf; + this.batchSize = this.cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); + this.operationLifecycleManager = operationLifecycleManager; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); } + // TODO[CDAP-20881] : Add RBAC check /** * API to fetch all running operations in a namespace. * * @param namespaceId Namespace to fetch runs from - * @param pageToken the token identifier for the current page requested in a paginated - * request + * @param pageToken the token identifier for the current page requested in a paginated request * @param pageSize the number of application details returned in a paginated request - * @param filter optional filters in EBNF grammar. Currently Only one status and one type - * filter is supported with AND expression. + * @param filter optional filters in EBNF grammar. Currently Only one status and one type filter + * is supported with AND expression. */ @GET @Path("/") - public void scanOperations(HttpRequest request, HttpResponder responder, + public void scanOperations( + HttpRequest request, + HttpResponder responder, @PathParam("namespace-id") String namespaceId, @QueryParam("pageToken") String pageToken, @QueryParam("pageSize") Integer pageSize, - @QueryParam("filter") String filter) { - // TODO(samik, CDAP-20812) fetch the operation runs from store - List runs = new ArrayList<>(); - responder.sendJson(HttpResponseStatus.OK, GSON.toJson(runs)); + @QueryParam("filter") String filter) + throws BadRequestException, IOException, ForbiddenException { + checkSourceControlMultiAppFeatureFlag(); + validateNamespaceId(namespaceId); + JsonPaginatedListResponder.respond( + GSON, + responder, + OPERATIONS_LIST_PAGINATED_KEY, + jsonListResponder -> { + AtomicReference lastRun = new AtomicReference<>(); + ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize, filter); + boolean pageLimitReached = false; + try { + pageLimitReached = + operationLifecycleManager.scanOperations( + scanRequest, + batchSize, + runDetail -> { + OperationRun run = runDetail.getRun(); + jsonListResponder.send(run); + lastRun.set(run); + }); + } catch (IOException e) { + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } catch (OperationRunNotFoundException e) { + responder.sendString(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } + OperationRun run = lastRun.get(); + return !pageLimitReached || run == null ? null : run.getId(); + }); } /** @@ -74,12 +132,23 @@ public void scanOperations(HttpRequest request, HttpResponder responder, */ @GET @Path("/{id}") - public void getOperationRun(HttpRequest request, HttpResponder responder, + public void getOperationRun( + HttpRequest request, + HttpResponder responder, @PathParam("namespace-id") String namespaceId, - @PathParam("id") String runId) { - // // TODO(samik, CDAP-20813) fetch the operation runs from store - OperationRun run = null; - responder.sendJson(HttpResponseStatus.OK, GSON.toJson(run)); + @PathParam("id") String runId) + throws BadRequestException, OperationRunNotFoundException, IOException, ForbiddenException { + checkSourceControlMultiAppFeatureFlag(); + validateNamespaceId(namespaceId); + if (runId == null || runId.isEmpty()) { + throw new BadRequestException("Path parameter runId cannot be empty"); + } + responder.sendJson( + HttpResponseStatus.OK, + GSON.toJson( + operationLifecycleManager + .getOperationRun(new OperationRunId(namespaceId, runId)) + .getRun())); } /** @@ -98,4 +167,96 @@ public void failOperation(FullHttpRequest request, HttpResponder responder, String.format("Updated status for operation run %s in namespace '%s'.", runId, namespaceId)); } + + private ScanOperationRunsRequest getScanRequest( + String namespaceId, String pageToken, Integer pageSize, String filterStr) + throws IllegalArgumentException { + ScanOperationRunsRequest.Builder builder = ScanOperationRunsRequest.builder(); + builder.setNamespace(namespaceId); + if (pageSize != null) { + builder.setLimit(pageSize); + } + if (pageToken != null) { + builder.setScanAfter(pageToken); + } + if (filterStr != null && !filterStr.isEmpty()) { + OperationRunFilter operationRunFilter = getFilter(filterStr); + builder.setFilter(operationRunFilter); + } + return builder.build(); + } + + // TODO[CDAP-20895] : Add unit tests for extracting OperationRunFilter from filter string + private OperationRunFilter getFilter(String filterStr) throws IllegalArgumentException { + Map filterKeyValMap = parseKeyValStr(filterStr, FILTER_SPLITTER); + OperationType operationType = null; + OperationRunStatus operationStatus = null; + + for (Map.Entry entry : filterKeyValMap.entrySet()) { + String filterValue = entry.getValue(); + OperationFilterKey filterKey = OperationFilterKey.valueOf(entry.getKey()); + + try { + switch (filterKey) { + case TYPE: + operationType = OperationType.valueOf(filterValue); + break; + case STATUS: + operationStatus = OperationRunStatus.valueOf(filterValue); + break; + default: + throw new IllegalArgumentException("Unknown filter key: " + filterKey); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid " + filterKey.name() + ": " + filterValue, e); + } + } + return new OperationRunFilter(operationType, operationStatus); + } + + /** + * Parses a string containing key-value pairs separated by a specified splitter. The string is + * enclosed within quotes. + * + * @param input The input string containing key-value pairs. + * @param splitter The string used to split key-value pairs. + * @return A {@code Map} containing the parsed key-value pairs. + * @throws IllegalArgumentException If the input does not match the expected key=val pair pattern. + */ + private static Map parseKeyValStr(String input, String splitter) { + Map keyValMap = new HashMap<>(); + String[] keyValPairs = input.split(splitter); + + for (String keyValPair : keyValPairs) { + Matcher matcher = KEY_VALUE_PATTERN.matcher(keyValPair.trim()); + + if (matcher.matches()) { + keyValMap.put(matcher.group(2).trim().toUpperCase(), matcher.group(3).trim().toUpperCase()); + } else { + throw new IllegalArgumentException("Invalid filter key=val pair: " + keyValPair); + } + } + return keyValMap; + } + + private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException { + try { + return new NamespaceId(namespaceId); + } catch (IllegalArgumentException e) { + throw new BadRequestException(e.getMessage(), e); + } + } + + /** throws {@link ForbiddenException} if the feature is disabled */ + private void checkSourceControlMultiAppFeatureFlag() throws ForbiddenException { + if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.isEnabled(featureFlagsProvider)) { + throw new ForbiddenException( + "Source Control Management Multiple Apps feature is not enabled."); + } + } + + private enum OperationFilterKey { + TYPE, + STATUS + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index febad517f029..beddd5e47467 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -17,6 +17,8 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; + +import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; @@ -78,6 +80,27 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, return currentLimit == 0; } + /** + * Retrieves details of an operation run identified by the provided {@code OperationRunId}. + * + * @param runId The unique identifier for the operation run. + * @return An {@code OperationRunDetail} object containing information about the specified operation run. + * @throws OperationRunNotFoundException If the specified operation run is not found. + */ + public OperationRunDetail getOperationRun(OperationRunId runId) + throws IOException, OperationRunNotFoundException { + OperationRunDetail operationRunDetail = + TransactionRunners.run( + transactionRunner, + context -> { + return getOperationRunStore(context).getOperation(runId); + }, + IOException.class, + OperationRunNotFoundException.class); + + return operationRunDetail; + } + /** * Runs a given operation. It is the responsibility of the caller to validate state transition. * diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java new file mode 100644 index 000000000000..49c6908df4fe --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java @@ -0,0 +1,192 @@ +package io.cdap.cdap.internal.operation; + +import com.google.common.io.Closeables; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.AppFabric; +import io.cdap.cdap.common.guice.ConfigModule; +import io.cdap.cdap.common.guice.LocalLocationModule; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.data.runtime.StorageModule; +import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.sql.PostgresInstantiator; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.StoreDefinition; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +public class OperationLifecycleManagerTest extends OperationTestBase { + protected static TransactionRunner transactionRunner; + private static final String testNamespace = "test"; + private static OperationLifecycleManager operationLifecycleManager; + private static int batchSize; + + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + private static EmbeddedPostgres postgres; + + @Before + public void before() throws Exception { + TransactionRunners.run( + transactionRunner, + context -> { + OperationRunStore operationRunsStore = new OperationRunStore(context); + operationRunsStore.clearData(); + }); + } + + @BeforeClass + public static void beforeClass() throws Exception { + CConfiguration cConf = CConfiguration.create(); + postgres = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); + Injector injector = + Guice.createInjector( + new ConfigModule(cConf), + new LocalLocationModule(), + new SystemDatasetRuntimeModule().getInMemoryModules(), + new StorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(MetricsCollectionService.class) + .to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); + } + }); + + transactionRunner = injector.getInstance(TransactionRunner.class); + operationLifecycleManager = + new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class)); + StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); + batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); + } + + @AfterClass + public static void afterClass() { + Closeables.closeQuietly(postgres); + } + + @Test + public void testScanOperations() throws Exception { + List insertedRuns = insertTestRuns(transactionRunner); + // get a filtered list of testNamespace runs + List testNamespaceRuns = + insertedRuns.stream() + .filter(detail -> detail.getRunId().getNamespace().equals(testNamespace)) + .collect(Collectors.toList()); + + TransactionRunners.run( + transactionRunner, + context -> { + List gotRuns = new ArrayList<>(); + List expectedRuns; + ScanOperationRunsRequest request; + + // verify the scan without filters picks all runs for testNamespace + request = ScanOperationRunsRequest.builder().setNamespace(testNamespace).build(); + operationLifecycleManager.scanOperations(request, batchSize, d -> gotRuns.add(d)); + expectedRuns = testNamespaceRuns; + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify limit + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder().setNamespace(testNamespace).setLimit(2).build(); + operationLifecycleManager.scanOperations(request, batchSize, d -> gotRuns.add(d)); + expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with type filter + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter(new OperationRunFilter(OperationType.PUSH_APPS, null)) + .build(); + operationLifecycleManager.scanOperations(request, batchSize, d -> gotRuns.add(d)); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PUSH_APPS)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with status filter and limit + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setLimit(2) + .setFilter( + new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) + .build(); + operationLifecycleManager.scanOperations(request, batchSize, d -> gotRuns.add(d)); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .limit(2) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with status filter + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter( + new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) + .build(); + operationLifecycleManager.scanOperations(request, batchSize, d -> gotRuns.add(d)); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + }); + } + + @Test + public void testGetOperation() throws Exception { + OperationRunDetail expectedDetail = + insertRun( + testNamespace, OperationType.PUSH_APPS, OperationRunStatus.RUNNING, transactionRunner); + String testId = expectedDetail.getRun().getId(); + OperationRunId runId = new OperationRunId(testNamespace, testId); + + TransactionRunners.run( + transactionRunner, + context -> { + OperationRunDetail gotDetail = operationLifecycleManager.getOperationRun(runId); + Assert.assertEquals(expectedDetail, gotDetail); + try { + operationLifecycleManager.getOperationRun( + new OperationRunId(Namespace.DEFAULT.getId(), testId)); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, + Exception.class); + } +} diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java index 4f15fab8a6a7..a68c37634b23 100644 --- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java +++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java @@ -38,6 +38,7 @@ public enum Feature { STREAMING_PIPELINE_NATIVE_STATE_TRACKING("6.8.0", false), PUSHDOWN_TRANSFORMATION_WINDOWAGGREGATION("6.9.1"), SOURCE_CONTROL_MANAGEMENT_GIT("6.9.0"), + SOURCE_CONTROL_MANAGEMENT_MULTI_APP("6.10.0"), WRANGLER_PRECONDITION_SQL("6.9.1"), WRANGLER_EXECUTION_SQL("6.10.0"), WRANGLER_SCHEMA_MANAGEMENT("6.10.0"),