Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operation run store methods #15320

Merged
merged 1 commit into from
Oct 11, 2023
Merged

Add operation run store methods #15320

merged 1 commit into from
Oct 11, 2023

Conversation

samdgupi
Copy link
Contributor

@samdgupi samdgupi commented Sep 13, 2023

Added

  • create run
  • update run status
  • update run metadata
  • fail run
  • get run
  • scan run

@samdgupi samdgupi changed the base branch from develop to operation-store September 13, 2023 18:10
@samdgupi samdgupi force-pushed the operation-store branch 5 times, most recently from 5c8c266 to 1941854 Compare September 21, 2023 15:35
Base automatically changed from operation-store to develop September 22, 2023 06:54
@samdgupi samdgupi force-pushed the operation-store-apis branch 4 times, most recently from 93d5151 to 52f7e9d Compare September 26, 2023 07:24
@samdgupi samdgupi added the build Triggers github actions build label Sep 26, 2023
@samdgupi samdgupi force-pushed the operation-store-apis branch from 52f7e9d to 03beee6 Compare September 26, 2023 12:45
@samdgupi samdgupi requested review from albertshau and GnsP September 26, 2023 12:46
@samdgupi samdgupi marked this pull request as ready for review September 26, 2023 12:46
@samdgupi samdgupi force-pushed the operation-store-apis branch 2 times, most recently from 23a35de to 3d30d35 Compare September 27, 2023 06:35
@samdgupi samdgupi requested a review from tivv September 28, 2023 08:52
@samdgupi samdgupi force-pushed the operation-store-apis branch from 3d30d35 to 18300f7 Compare September 29, 2023 17:34
* indexed column, or the type does not match the schema
* @throws IOException if there is an error scanning the table
*/
default CloseableIterator<StructuredRow> scan(Range keyRange, int limit,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only implemented in PostgresSql. Will handle nosql in a seperate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't needed anymore right?

Copy link
Contributor Author

@samdgupi samdgupi Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


@VisibleForTesting
// USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS
public void deleteAllTables() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

methods like this should be package private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better name would be clear() or clearData(). This name makes it sound like the table will be dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

);
}

private long getStartTime(OperationRunId runId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods like this can lead to unnecessary DB reads over time, as callers may do a normal read and then call this method. We should remove this and force callers to get the start time from the row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved the logic directly to caller as there is only one usage

return fields;
}

private OperationRunDetail<?> getCurrentRunDetail(OperationRunId runId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be getRunDetail. The current makes it sound like there could be a previous run with the same id.

Also, I'm not sure how this would work with the <?>. What does it get deserialized to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic type T refers to the type of operation request object. It is not needed for most of the cases when we retrieve the OperationRunDetail except when we plan to launch the operation. The current approach I am taking is based on the operation type the request object type will be determined and the OperationRunDetail will be casted accordingly.

Copy link
Contributor

@albertshau albertshau Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused how this will work, doesn't the type need to be passed into gson in order for it to be deserialized properly? If you just deserialize it here as ?, don't you get a Map or a JsonObject or something as the field? I don't think you'd be able to cast it directly.


private Collection<Field<?>> getFilterIndexes(OperationRunFilter filter) {
Collection<Field<?>> filterIndexes = new ArrayList<>();
if (filter == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to handle his case, we should not allow the filter instance to be null

Copy link
Contributor Author

@samdgupi samdgupi Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter null represents no filter applied. I have added a new method in OperationRunFilter to get a empty filter to avoid a null check

* @throws OperationRunNotFoundException run with id does not exist in namespace
*/
public OperationRunDetail<?> getOperation(OperationRunId runId)
throws NotFoundException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws OperationRunNotFoundException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()));
fields.add(
Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD,
System.currentTimeMillis()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of System.currentTimeMillis(), it would be good to take a Clock in the constructor of this class and use that instead. It would make unit testing simpler, as you can expect a concrete time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param limit number of result
* @return a select query
*/
private PreparedStatement prepareMultiScanQuery(Collection<Range> singletonRanges,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any logical changes being made here or is it just getting moved? (In the future, keep methods in the same location so it is easier to review the diff)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to fix a checkstyle warning about keeping all the override methods together. There is no logical change. I will revert it back to reduce the diff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if it is fixing checkstyle it's ok to move it.

* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.cdap.proto.id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix checkstyle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,90 @@
/*
* Copyright © 2015-2019 Cask Data, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix copyright year

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@samdgupi samdgupi force-pushed the operation-store-apis branch 2 times, most recently from 73ae585 to ce08626 Compare October 4, 2023 17:35
* @param txBatchSize batch size of transaction
* @param consumer {@link Consumer} to process each scanned run
*/
public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test will be added in a followup PR. Will add a TODO

@samdgupi samdgupi force-pushed the operation-store-apis branch from ce08626 to cb66ab8 Compare October 4, 2023 17:45
@samdgupi samdgupi requested a review from albertshau October 5, 2023 08:21

AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request);
AtomicReference<String> lastKey = new AtomicReference<>(request.getScanAfter());
AtomicInteger currentLimit = new AtomicInteger(request.getLimit());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it can be a normal int

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @param request scan request including filters and limit
* @param txBatchSize batch size of transaction
* @param consumer {@link Consumer} to process each scanned run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc should have @return as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though returning whether the limit was reached is kind of a weird thing to return, not sure how it would be used. I would either make it void or return the total number of operations scanned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boolean would be used in the handler to find if all the operations are scanned and send last operation id accordingly. We can use the total number of operation scanned also but wanted to keep this analogous to the application scan

see:

return !pageLimitReached || record == null ? null :

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I see, that's ok then

requestRef.set(batchRequest);

TransactionRunners.run(transactionRunner, context -> {
lastKey.set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the run method with a return value instead of setting lastKey in the closure. Then it can be a normal String instead of an AtomicReference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private static final String SMALLEST_POSSIBLE_STRING = "";

@Inject
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think we would use injection to create the store

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@VisibleForTesting
// USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkstyle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Range range = Range.create(endFields, endBound, startFields, startBound);
Collection<Field<?>> filterIndexes = getFilterIndexes(request.getFilter());
StructuredTable table = getOperationRunsTable(context);
AtomicReference<String> lastKey = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a normal String

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* namespace to return applications for or null for all namespaces.
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* namespace to return applications for or null for all namespaces.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update javadoc to not include null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public void testScanOperation() throws Exception {
insertTestRuns();

// TODO(samik) verify the actual list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should do this now rather than later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"is not an indexed column or primary key");
}
return scan(keyRange, limit, filterIndexes, Collections.singleton(orderByField), sortOrder,
true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really confusing that this scan method is an 'and' filter while the other ones are an 'or' filter. Can we just make the operation type part of the primary key? That way it can go in the key range and we can use the existing methods without adding more to StructuredTable. Would like to avoid bloating the StructuredTable class, it's already pretty confusing as-is.

FYI, whenever we go to CDAP 7.0, we would like to remove Hadoop support, which would allow us to remove the NoSql implementations which would let us use normal SQL instead of this interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason operationtype is not part of the primary key because the operation id and namespace should uniquely identify a operation. This is in line with the API design /namespace/{namespace}/operation/{id}. If we add the operationtype in primary key we have to change the API also.
The API is designed to be compatible with GCP operations api which also does not include types.
I think we do need a option to select how the filters should be joined. I can create two different top level functions for AND and OR filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed and filter and added operation type in the range key

@samdgupi samdgupi force-pushed the operation-store-apis branch 2 times, most recently from af6aa13 to 57a1c6e Compare October 6, 2023 16:56
@samdgupi samdgupi force-pushed the operation-store-apis branch 3 times, most recently from df24aec to cf0f87b Compare October 9, 2023 17:39
@samdgupi samdgupi requested a review from albertshau October 9, 2023 17:40
}
return GSON.fromJson(
row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD),
OperationRunDetail.class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure how this will work without a concrete class for the generic. Maybe it will be more clear when there is code around the caller using the result, but I get the feeling it will need to revisited.

* indexed column, or the type does not match the schema
* @throws IOException if there is an error scanning the table
*/
default CloseableIterator<StructuredRow> scan(Range keyRange, int limit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't needed anymore right?

Copy link
Contributor

@albertshau albertshau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm assuming the new scan method is removed from StructuredTable

@samdgupi samdgupi force-pushed the operation-store-apis branch from cf0f87b to 26ad907 Compare October 10, 2023 18:38
private final TransactionRunner transactionRunner;

@Inject
public OperationLifecycleService(TransactionRunner transactionRunner) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove public for injected constructor to avoid direct instantiation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Service that manages lifecycle of Operation.
*/
public class OperationLifecycleService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we only name a class with Service suffix if it is a guava Service. This class seems more like a Manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have classes with suffix service like ProgramLifecycleService which is not a guava service but encapsulates all operations we can do upon an entity . I was following the apparent naming convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use Manager Suffix

*/
public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
Consumer<OperationRunDetail<?>> consumer) throws OperationRunNotFoundException, IOException {
AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need an atomic reference? Seems like everything is happening in the local thread? A simple local variable would do the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
currentLimit -= txBatchSize;
}
return currentLimit == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the above loop having lastKey == null, the while loop would break and currentLimit may not be 0. Is this expected? If the caller is also having a while loop until this method return false, would it resulted in infinite loop?

Copy link
Contributor Author

@samdgupi samdgupi Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected to break the loop as lastKey == null will signify no more row left to scan.
The return value here will signify if the page limit was reached. The caller would also need to track the last record scanned as the following example in AppLifeCycleHttpHandler

 boolean pageLimitReached = applicationLifecycleService.scanApplications(scanRequest,
                appDetail -> {
                  ApplicationRecord record = new ApplicationRecord(appDetail);
                  jsonListResponder.send(record);
                  lastRecord.set(record);
                });
            ApplicationRecord record = lastRecord.get();
            return !pageLimitReached || record == null ? null :
                record.getName() + EntityId.IDSTRING_PART_SEPARATOR + record.getAppVersion();

The caller is requierd to update the ScanAfter field in the request for the next call if this method is called from a while loop

private OperationRunsStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunsStore(context);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra new lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* This class defined various filters that can be applied during operation runs scanning.
*/
public class OperationRunFilter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a Filter is kind of a Function instead of just having getters. How is it supposed to be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just a container class to encapsulate what filter options are available for operations.

/**
* Store for operation runs.
*/
public class OperationRunsStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No plural in class name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private final Clock clock;

private static final String SMALLEST_POSSIBLE_STRING = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group static fields together

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private void writeOperationRun(OperationRunId runId, OperationRunDetail<?> detail)
throws IOException {
Collection<Field<?>> fields = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using Arrays.asList or ImmutableList.of to improve readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private OperationRunDetail<?> getRunDetail(OperationRunId runId)
throws IOException, OperationRunNotFoundException {
Optional<StructuredRow> row = getOperationRunInternal(runId);
if (!row.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using isPresent on Optional. In this case, use .map and orElseThrow.

return getOperationRunInternal(runId)
    .map(this::rowToRunDetail)
    .orElseThrow(() -> new OperationRunNotFoundException(...));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also fix other places that use isPresent and replace with more expressive way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. There is one case where we throw error if the row is present. As we can't throw checked error from within ifPresent I have not made any change

@samdgupi samdgupi force-pushed the operation-store-apis branch from 26ad907 to 6ef10d8 Compare October 11, 2023 11:01
@samdgupi
Copy link
Contributor Author

@chtyim Merging this PR to unblock followup PRs for LRO. Please add comments for any suggested change and I will raise a further PR to solve.

@samdgupi samdgupi merged commit 30a1b67 into develop Oct 11, 2023
5 checks passed
@samdgupi samdgupi deleted the operation-store-apis branch October 11, 2023 13:02
@samdgupi samdgupi added the LRO label for tracking Long Running Operation support label Nov 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Triggers github actions build LRO label for tracking Long Running Operation support
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants