Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests for concurrent append and replace #16755

Merged
merged 17 commits into from
Sep 3, 2024

Conversation

AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Jul 18, 2024

Adds Integration tests for concurrent append and replace

  • Streaming ingestion concurrent with auto compaction with various granularities
  • concurrent batch and MSQ tasks

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as ready for review July 19, 2024 03:53
@AmatyaAvadhanula AmatyaAvadhanula requested a review from kfaraz July 19, 2024 03:55
@kfaraz
Copy link
Contributor

kfaraz commented Jul 19, 2024

Thanks for the changes, @AmatyaAvadhanula !

At first glance, I have a couple of questions:

  • Why do we need the new locks API? Adding an API just for testing seems a little unnecessary. Is there a way to use any of the existing APIs for this purpose instead?
  • Please rename the test to ITConcurrentAppendReplaceTest or ITConcurrentStreamAppendReplaceTest (ConcurrentAutoCompactionTest sounds a little confusing)

},
"ioConfig": {
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"%%STREAM_PROPERTIES_KEY%%": "%%STREAM_PROPERTIES_VALUE%%",

@@ -0,0 +1,60 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use an existing spec and just update the task context before submitting to the Overlord?

@@ -1383,6 +1383,20 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
return running;
}

Set<TaskLock> getLocksForDatasource(final String datasource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is not used anymore since we have already added the /activeLocks API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

/**
* Retries until the segment count is as expected.
*/
private void ensureSegmentsCount(int numExpectedSegments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a lot of the private methods were copied over from other ITs.
Is it possible reuse this code instead of copying it over?

checkAndSetConcurrentLocks();
}

// Verify the state with minute granularity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the MINUTE and ALL granularity verifications be separate tests of their own?

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the tests added here (MSQ INSERT + concurrent compaction and streaming ingest + concurrent compaction) seem to be inherently flaky. We are triggering the INSERT/streaming ingest 5 times in the hope that we get at least one case where there was an active REPLACE lock that overlapped with an active APPEND lock.

It would be better if we could force this condition somehow, possibly by manipulating the readiness condition of the underlying tasks.

See if the following approach could be viable:

  • Write a small extension (Maybe there is already an extension to be used by ITs only. If there is, you could just use that.)
  • This extension should register a new task ITCompactionTask for type compact.
  • The task can extend CompactionTask and override the isReady method.
  • In the isReady() method, wait on some condition to happen and only then move on to the super.isReady(). The condition can be based on some task context parameter.
  • While posting the compaction config in the new IT, make sure to set the above mention context parameter appropriately.
  • The setup for the new IT will need to have this extension in the load list. (I think this can be done more easily in the new revised IT framework).

Edit: A shorter alternative to writing an extension could be as follows:

  • Modify the isReady() method of AbstractBatchTask or CompactionTask to look for a context parameter readyCondition.
  • The parameter readyCondition can take a value of some Condition class which can be based on time delay for now.
  • The code should look for this parameter only if a certain environment variable is set (say DRUID_IT_IMAGE_NAME).
  • The variable being set indicates that the environment is an IT environment and we should honor the IT state.
  • If the environment variable, the isReady() check behaves as usual.
  • While submitting compaction config, make sure to set the context parameter correctly.

Cons: The only con of this approach is that it pollutes production code with test related logic.

But maybe that's not a bad thing. Maybe in the future we could have a variety of DruidEnvironments and code could behave differently based on the current environment: PROD, TEST, INTEGRATION_TEST, SIMULATION 🤔 .

private String fullDatasourceName;

private int currentRowCount = 0;
private boolean concurrentAppendAndReplaceLocksExisted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a field of the class? The checkAndSetConcurrentLocks method should return a boolean instead and the value should be verified in the relevant test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAndSetConcurrentLocks is called from several methods that retry to check a certain state. This is done to ensure that we check for these locks frequently.
The methods have return values of their own, which is why I tried using a class-level variable for this check.

Comment on lines 284 to 291
submitAndVerifyCompactionConfig(datasource, null);

for (int i = 0; i < 5; i++) {
// Submit the task and wait for the datasource to get loaded
msqHelper.submitMsqTaskSuccesfully(queryLocal, ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true));

ensureRowCount(datasource, (i + 2) * 3, function);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to forceTriggerCompaction for this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensureRowCount is one of the methods that checks for concurrent locks and forceTriggers compaction for every retry. ensureSegmentsCount is another.
Please let me know if this should be done differently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ensureRowCount sounds like a verification method. It shouldn't be responsible for triggering compaction too.

if (CollectionUtils.isNullOrEmpty(locks)) {
return;
}
LOG.info(locks.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more info in this log line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added for debugging and has been removed. Thanks

@AmatyaAvadhanula
Copy link
Contributor Author

@kfaraz, compaction is being forcefully triggered immediately after the submission of the MSQ and kafka tasks and I believe this should guarantee the existence of concurrent locks.
The reason we repeat it several times is to try to have multiple permutations of (replace lock, append lock, replace commit, append commit) within the same test.

@kfaraz
Copy link
Contributor

kfaraz commented Aug 6, 2024

@kfaraz, compaction is being forcefully triggered immediately after the submission of the MSQ and kafka tasks and I believe this should guarantee the existence of concurrent locks.

Okay, that works for me. If the test is not flaky, then we can proceed.
Are we asserting the presence of concurrent locks after every iteration or only at the end? Let's do it after every iteration to be sure.

The reason we repeat it several times is to try to have multiple permutations of (replace lock, append lock, replace commit, append commit) within the same test.

But these permutations are still not guaranteed, right? We might end up with different orders in each run of the IT.

@AmatyaAvadhanula
Copy link
Contributor Author

We are asserting the concurrent locks quite frequently when we verify the segment count or row count after each iteration.

Yes, the idea isn't to ensure that all 6 valid permutations happen in a single IT. It's just to ensure that the IT is more robust. The permutations themselves are exhaustively tested as Unit tests.

Thanks for your feedback. I'll address it ASAP

@kfaraz
Copy link
Contributor

kfaraz commented Aug 6, 2024

Yes, the idea isn't to ensure that all 6 valid permutations happen in a single IT. It's just to ensure that the IT is more robust. The permutations themselves are exhaustively tested as Unit tests.

Okay, we can proceed for now.

For completeness though, we need clarity on some points and would like to make these future improvements in this test:
A test is robust if is highly deterministic. A test that may end up trying out different things in each run is not.
With the multiple runs, what you are ensuring is that the code being tested is more robust, but not in a very
deterministic manner.

If unit tests have already covered most of the cases, then it is okay if we test only a few scenarios in the IT.
But whatever we decide to test, the test state must be exactly reproducible.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after CI passes

@@ -444,6 +444,12 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, @AmatyaAvadhanula ! I didn't realize that these tests were using TestNG.

You can leave out the Assume condition that I had suggested as that is the only thing that requires junit. No point having dependencies on both junit and TestNG.

@kfaraz
Copy link
Contributor

kfaraz commented Sep 3, 2024

Merging this PR.

@kfaraz kfaraz merged commit 70bad94 into apache:master Sep 3, 2024
90 checks passed
@kfaraz kfaraz deleted the its_concurrent_append_replace branch September 3, 2024 09:28
AmatyaAvadhanula added a commit to AmatyaAvadhanula/druid that referenced this pull request Sep 4, 2024
kfaraz pushed a commit that referenced this pull request Sep 4, 2024
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
IT for streaming tasks with concurrent compaction
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants