Skip to content

Commit

Permalink
Merge pull request #15320 from cdapio/operation-store-apis
Browse files Browse the repository at this point in the history
Add operation run store methods
  • Loading branch information
samdgupi authored Oct 11, 2023
2 parents 10cf313 + 6ef10d8 commit 30a1b67
Show file tree
Hide file tree
Showing 17 changed files with 1,232 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operations;

import com.google.inject.Inject;
import io.cdap.cdap.internal.app.store.OperationRunDetail;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.function.Consumer;

/**
* Service that manages lifecycle of Operation.
*/
public class OperationLifecycleManager {

private final TransactionRunner transactionRunner;

@Inject
OperationLifecycleManager(TransactionRunner transactionRunner) {
this.transactionRunner = transactionRunner;
}

/**
* Scan operations in a namespace.
*
* @param request scan request including filters and limit
* @param txBatchSize batch size of transaction
* @param consumer {@link Consumer} to process each scanned run
* @return true if we have scanned till the request limit else return false. This will be used by
* the caller to identify if there is any further runs left to scan.
*/
public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
Consumer<OperationRunDetail<?>> consumer) throws OperationRunNotFoundException, IOException {
String lastKey = request.getScanAfter();
int currentLimit = request.getLimit();

while (currentLimit > 0) {
ScanOperationRunsRequest batchRequest = ScanOperationRunsRequest
.builder(request)
.setScanAfter(lastKey)
.setLimit(Math.min(txBatchSize, currentLimit))
.build();

request = batchRequest;

lastKey = TransactionRunners.run(transactionRunner, context -> {
return getOperationRunStore(context).scanOperations(batchRequest, consumer);
}, IOException.class, OperationRunNotFoundException.class);

if (lastKey == null) {
break;
}
currentLimit -= txBatchSize;
}
return currentLimit == 0;
}

private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operations;

import io.cdap.cdap.common.AlreadyExistsException;
import io.cdap.cdap.proto.operationrun.OperationRunStatus;

/**
* Thrown when an operation run already exists.
*/
public class OperationRunAlreadyExistsException extends AlreadyExistsException {

public OperationRunAlreadyExistsException(String operationId, OperationRunStatus status) {
super(String.format("Operation %s already exists with status %s", operationId, status));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operations;

import io.cdap.cdap.proto.operationrun.OperationRunStatus;
import javax.annotation.Nullable;

/**
* This class defined various filters that can be applied during operation runs scanning.
*/
public class OperationRunFilter {

@Nullable
private final String operationType;
@Nullable
private final OperationRunStatus status;
// TODO(samik) status and type filters as list

public OperationRunFilter(@Nullable String operationType, @Nullable OperationRunStatus status) {
this.operationType = operationType;
this.status = status;
}

@Nullable
public String getOperationType() {
return operationType;
}

@Nullable
public OperationRunStatus getStatus() {
return status;
}

public static OperationRunFilter emptyFilter() {
return new OperationRunFilter(null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operations;

import io.cdap.cdap.common.NotFoundException;

/**
* Exception thrown when an operation run with the specified id not found in the specified
* namespace.
*/
public class OperationRunNotFoundException extends NotFoundException {

public OperationRunNotFoundException(String namespace, String runId) {
super(String.format("Operation run %s does not exist in namespace %s", runId, namespace));
}
}
Loading

0 comments on commit 30a1b67

Please sign in to comment.