-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add operation run store methods #15320
Conversation
5c8c266
to
1941854
Compare
93d5151
to
52f7e9d
Compare
52f7e9d
to
03beee6
Compare
23a35de
to
3d30d35
Compare
3d30d35
to
18300f7
Compare
* indexed column, or the type does not match the schema | ||
* @throws IOException if there is an error scanning the table | ||
*/ | ||
default CloseableIterator<StructuredRow> scan(Range keyRange, int limit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently only implemented in PostgresSql. Will handle nosql in a seperate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't needed anymore right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
...abric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java
Outdated
Show resolved
Hide resolved
|
||
@VisibleForTesting | ||
// USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS | ||
public void deleteAllTables() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
methods like this should be package private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better name would be clear() or clearData(). This name makes it sound like the table will be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
); | ||
} | ||
|
||
private long getStartTime(OperationRunId runId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods like this can lead to unnecessary DB reads over time, as callers may do a normal read and then call this method. We should remove this and force callers to get the start time from the row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Moved the logic directly to caller as there is only one usage
return fields; | ||
} | ||
|
||
private OperationRunDetail<?> getCurrentRunDetail(OperationRunId runId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be getRunDetail
. The current
makes it sound like there could be a previous run with the same id.
Also, I'm not sure how this would work with the <?>
. What does it get deserialized to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generic type T refers to the type of operation request object. It is not needed for most of the cases when we retrieve the OperationRunDetail
except when we plan to launch the operation. The current approach I am taking is based on the operation type the request object type will be determined and the OperationRunDetail will be casted accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused how this will work, doesn't the type need to be passed into gson in order for it to be deserialized properly? If you just deserialize it here as ?
, don't you get a Map or a JsonObject or something as the field? I don't think you'd be able to cast it directly.
|
||
private Collection<Field<?>> getFilterIndexes(OperationRunFilter filter) { | ||
Collection<Field<?>> filterIndexes = new ArrayList<>(); | ||
if (filter == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need to handle his case, we should not allow the filter instance to be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter null represents no filter applied. I have added a new method in OperationRunFilter
to get a empty
filter to avoid a null check
* @throws OperationRunNotFoundException run with id does not exist in namespace | ||
*/ | ||
public OperationRunDetail<?> getOperation(OperationRunId runId) | ||
throws NotFoundException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throws OperationRunNotFoundException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun())); | ||
fields.add( | ||
Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, | ||
System.currentTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of System.currentTimeMillis(), it would be good to take a Clock in the constructor of this class and use that instead. It would make unit testing simpler, as you can expect a concrete time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @param limit number of result | ||
* @return a select query | ||
*/ | ||
private PreparedStatement prepareMultiScanQuery(Collection<Range> singletonRanges, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any logical changes being made here or is it just getting moved? (In the future, keep methods in the same location so it is easier to review the diff)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done to fix a checkstyle warning about keeping all the override
methods together. There is no logical change. I will revert it back to reduce the diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, if it is fixing checkstyle it's ok to move it.
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package io.cdap.cdap.proto.id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix checkstyle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -0,0 +1,90 @@ | |||
/* | |||
* Copyright © 2015-2019 Cask Data, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix copyright year
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
73ae585
to
ce08626
Compare
* @param txBatchSize batch size of transaction | ||
* @param consumer {@link Consumer} to process each scanned run | ||
*/ | ||
public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit test will be added in a followup PR. Will add a TODO
ce08626
to
cb66ab8
Compare
|
||
AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request); | ||
AtomicReference<String> lastKey = new AtomicReference<>(request.getScanAfter()); | ||
AtomicInteger currentLimit = new AtomicInteger(request.getLimit()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like it can be a normal int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
* @param request scan request including filters and limit | ||
* @param txBatchSize batch size of transaction | ||
* @param consumer {@link Consumer} to process each scanned run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc should have @return
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though returning whether the limit was reached is kind of a weird thing to return, not sure how it would be used. I would either make it void or return the total number of operations scanned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The boolean would be used in the handler to find if all the operations are scanned and send last operation id accordingly. We can use the total number of operation scanned also but wanted to keep this analogous to the application scan
see:
cdap/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java
Line 307 in 550a8f3
return !pageLimitReached || record == null ? null : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I see, that's ok then
requestRef.set(batchRequest); | ||
|
||
TransactionRunners.run(transactionRunner, context -> { | ||
lastKey.set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the run method with a return value instead of setting lastKey in the closure. Then it can be a normal String instead of an AtomicReference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
private static final String SMALLEST_POSSIBLE_STRING = ""; | ||
|
||
@Inject |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think we would use injection to create the store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@VisibleForTesting | ||
// USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkstyle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Range range = Range.create(endFields, endBound, startFields, startBound); | ||
Collection<Field<?>> filterIndexes = getFilterIndexes(request.getFilter()); | ||
StructuredTable table = getOperationRunsTable(context); | ||
AtomicReference<String> lastKey = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a normal String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* namespace to return applications for or null for all namespaces. | ||
*/ | ||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should not be nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
/** | ||
* namespace to return applications for or null for all namespaces. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update javadoc to not include null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public void testScanOperation() throws Exception { | ||
insertTestRuns(); | ||
|
||
// TODO(samik) verify the actual list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should do this now rather than later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"is not an indexed column or primary key"); | ||
} | ||
return scan(keyRange, limit, filterIndexes, Collections.singleton(orderByField), sortOrder, | ||
true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is really confusing that this scan method is an 'and' filter while the other ones are an 'or' filter. Can we just make the operation type part of the primary key? That way it can go in the key range and we can use the existing methods without adding more to StructuredTable. Would like to avoid bloating the StructuredTable class, it's already pretty confusing as-is.
FYI, whenever we go to CDAP 7.0, we would like to remove Hadoop support, which would allow us to remove the NoSql implementations which would let us use normal SQL instead of this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason operationtype is not part of the primary key because the operation id and namespace should uniquely identify a operation. This is in line with the API design /namespace/{namespace}/operation/{id}
. If we add the operationtype in primary key we have to change the API also.
The API is designed to be compatible with GCP operations api which also does not include types.
I think we do need a option to select how the filters should be joined. I can create two different top level functions for AND and OR filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed and filter and added operation type in the range key
af6aa13
to
57a1c6e
Compare
df24aec
to
cf0f87b
Compare
} | ||
return GSON.fromJson( | ||
row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), | ||
OperationRunDetail.class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not sure how this will work without a concrete class for the generic. Maybe it will be more clear when there is code around the caller using the result, but I get the feeling it will need to revisited.
* indexed column, or the type does not match the schema | ||
* @throws IOException if there is an error scanning the table | ||
*/ | ||
default CloseableIterator<StructuredRow> scan(Range keyRange, int limit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't needed anymore right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm assuming the new scan method is removed from StructuredTable
cf0f87b
to
26ad907
Compare
private final TransactionRunner transactionRunner; | ||
|
||
@Inject | ||
public OperationLifecycleService(TransactionRunner transactionRunner) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove public
for injected constructor to avoid direct instantiation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Service that manages lifecycle of Operation. | ||
*/ | ||
public class OperationLifecycleService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we only name a class with Service
suffix if it is a guava Service
. This class seems more like a Manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have classes with suffix service
like ProgramLifecycleService
which is not a guava
service but encapsulates all operations we can do upon an entity . I was following the apparent naming convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use Manager Suffix
*/ | ||
public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, | ||
Consumer<OperationRunDetail<?>> consumer) throws OperationRunNotFoundException, IOException { | ||
AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need an atomic reference? Seems like everything is happening in the local thread? A simple local variable would do the job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
currentLimit -= txBatchSize; | ||
} | ||
return currentLimit == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the above loop having lastKey == null
, the while loop would break and currentLimit may not be 0
. Is this expected? If the caller is also having a while loop until this method return false
, would it resulted in infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is expected to break the loop as lastKey == null will signify no more row left to scan.
The return value here will signify if the page limit was reached. The caller would also need to track the last record scanned as the following example in AppLifeCycleHttpHandler
boolean pageLimitReached = applicationLifecycleService.scanApplications(scanRequest,
appDetail -> {
ApplicationRecord record = new ApplicationRecord(appDetail);
jsonListResponder.send(record);
lastRecord.set(record);
});
ApplicationRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName() + EntityId.IDSTRING_PART_SEPARATOR + record.getAppVersion();
The caller is requierd to update the ScanAfter field in the request for the next call if this method is called from a while loop
private OperationRunsStore getOperationRunStore(StructuredTableContext context) { | ||
return new OperationRunsStore(context); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra new lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* This class defined various filters that can be applied during operation runs scanning. | ||
*/ | ||
public class OperationRunFilter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually a Filter is kind of a Function instead of just having getters. How is it supposed to be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just a container class to encapsulate what filter options are available for operations.
/** | ||
* Store for operation runs. | ||
*/ | ||
public class OperationRunsStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No plural in class name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private final Clock clock; | ||
|
||
private static final String SMALLEST_POSSIBLE_STRING = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group static fields together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private void writeOperationRun(OperationRunId runId, OperationRunDetail<?> detail) | ||
throws IOException { | ||
Collection<Field<?>> fields = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using Arrays.asList
or ImmutableList.of
to improve readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private OperationRunDetail<?> getRunDetail(OperationRunId runId) | ||
throws IOException, OperationRunNotFoundException { | ||
Optional<StructuredRow> row = getOperationRunInternal(runId); | ||
if (!row.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using isPresent
on Optional. In this case, use .map
and orElseThrow
.
return getOperationRunInternal(runId)
.map(this::rowToRunDetail)
.orElseThrow(() -> new OperationRunNotFoundException(...));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also fix other places that use isPresent
and replace with more expressive way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. There is one case where we throw error if the row is present. As we can't throw checked error from within ifPresent
I have not made any change
26ad907
to
6ef10d8
Compare
@chtyim Merging this PR to unblock followup PRs for LRO. Please add comments for any suggested change and I will raise a further PR to solve. |
Added