-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration tests for concurrent append and replace #16755
Integration tests for concurrent append and replace #16755
Conversation
Thanks for the changes, @AmatyaAvadhanula ! At first glance, I have a couple of questions:
|
}, | ||
"ioConfig": { | ||
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", | ||
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, | |
"%%STREAM_PROPERTIES_KEY%%": "%%STREAM_PROPERTIES_VALUE%%", |
integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,60 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use an existing spec and just update the task context before submitting to the Overlord?
@@ -1383,6 +1383,20 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge | |||
return running; | |||
} | |||
|
|||
Set<TaskLock> getLocksForDatasource(final String datasource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is not used anymore since we have already added the /activeLocks
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
/** | ||
* Retries until the segment count is as expected. | ||
*/ | ||
private void ensureSegmentsCount(int numExpectedSegments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a lot of the private methods were copied over from other ITs.
Is it possible reuse this code instead of copying it over?
checkAndSetConcurrentLocks(); | ||
} | ||
|
||
// Verify the state with minute granularity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the MINUTE and ALL granularity verifications be separate tests of their own?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the tests added here (MSQ INSERT + concurrent compaction and streaming ingest + concurrent compaction) seem to be inherently flaky. We are triggering the INSERT/streaming ingest 5 times in the hope that we get at least one case where there was an active REPLACE lock that overlapped with an active APPEND lock.
It would be better if we could force this condition somehow, possibly by manipulating the readiness condition of the underlying tasks.
See if the following approach could be viable:
- Write a small extension (Maybe there is already an extension to be used by ITs only. If there is, you could just use that.)
- This extension should register a new task
ITCompactionTask
for typecompact
. - The task can extend
CompactionTask
and override theisReady
method. - In the
isReady()
method, wait on some condition to happen and only then move on to thesuper.isReady()
. The condition can be based on some task context parameter. - While posting the compaction config in the new IT, make sure to set the above mention context parameter appropriately.
- The setup for the new IT will need to have this extension in the load list. (I think this can be done more easily in the new revised IT framework).
Edit: A shorter alternative to writing an extension could be as follows:
- Modify the
isReady()
method ofAbstractBatchTask
orCompactionTask
to look for a context parameterreadyCondition
. - The parameter
readyCondition
can take a value of someCondition
class which can be based on time delay for now. - The code should look for this parameter only if a certain environment variable is set (say
DRUID_IT_IMAGE_NAME
). - The variable being set indicates that the environment is an IT environment and we should honor the IT state.
- If the environment variable, the
isReady()
check behaves as usual. - While submitting compaction config, make sure to set the context parameter correctly.
Cons: The only con of this approach is that it pollutes production code with test related logic.
But maybe that's not a bad thing. Maybe in the future we could have a variety of DruidEnvironment
s and code could behave differently based on the current environment: PROD, TEST, INTEGRATION_TEST, SIMULATION
🤔 .
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Show resolved
Hide resolved
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
private String fullDatasourceName; | ||
|
||
private int currentRowCount = 0; | ||
private boolean concurrentAppendAndReplaceLocksExisted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a field of the class? The checkAndSetConcurrentLocks
method should return a boolean instead and the value should be verified in the relevant test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAndSetConcurrentLocks is called from several methods that retry to check a certain state. This is done to ensure that we check for these locks frequently.
The methods have return values of their own, which is why I tried using a class-level variable for this check.
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
submitAndVerifyCompactionConfig(datasource, null); | ||
|
||
for (int i = 0; i < 5; i++) { | ||
// Submit the task and wait for the datasource to get loaded | ||
msqHelper.submitMsqTaskSuccesfully(queryLocal, ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)); | ||
|
||
ensureRowCount(datasource, (i + 2) * 3, function); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to forceTriggerCompaction
for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensureRowCount is one of the methods that checks for concurrent locks and forceTriggers compaction for every retry. ensureSegmentsCount is another.
Please let me know if this should be done differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ensureRowCount
sounds like a verification method. It shouldn't be responsible for triggering compaction too.
if (CollectionUtils.isNullOrEmpty(locks)) { | ||
return; | ||
} | ||
LOG.info(locks.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more info in this log line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added for debugging and has been removed. Thanks
@kfaraz, compaction is being forcefully triggered immediately after the submission of the MSQ and kafka tasks and I believe this should guarantee the existence of concurrent locks. |
Okay, that works for me. If the test is not flaky, then we can proceed.
But these permutations are still not guaranteed, right? We might end up with different orders in each run of the IT. |
We are asserting the concurrent locks quite frequently when we verify the segment count or row count after each iteration. Yes, the idea isn't to ensure that all 6 valid permutations happen in a single IT. It's just to ensure that the IT is more robust. The permutations themselves are exhaustively tested as Unit tests. Thanks for your feedback. I'll address it ASAP |
Okay, we can proceed for now. For completeness though, we need clarity on some points and would like to make these future improvements in this test: If unit tests have already covered most of the cases, then it is okay if we test only a few scenarios in the IT. |
...sts/src/test/java/org/apache/druid/tests/coordinator/duty/ITConcurrentAppendReplaceTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 after CI passes
integration-tests/pom.xml
Outdated
@@ -444,6 +444,12 @@ | |||
<groupId>org.testng</groupId> | |||
<artifactId>testng</artifactId> | |||
</dependency> | |||
<dependency> | |||
<groupId>junit</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, @AmatyaAvadhanula ! I didn't realize that these tests were using TestNG.
You can leave out the Assume
condition that I had suggested as that is the only thing that requires junit. No point having dependencies on both junit and TestNG.
Merging this PR. |
IT for streaming tasks with concurrent compaction
…he#16755)" (apache#17000) This reverts commit 70bad94.
Adds Integration tests for concurrent append and replace
This PR has: