-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run compaction as a supervisor on Overlord #16768
Conversation
server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Fixed
Show fixed
Hide fixed
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/PriorityBasedSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/PriorityBasedSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/PriorityBasedSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
return 0; | ||
} | ||
|
||
public enum State implements SupervisorStateManager.State |
Check notice
Code scanning / CodeQL
Class has same name as super class Note
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how much memory usage is going to increase on Overlord after it starts doing the segment polling?
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionStatusReport.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
Show resolved
Hide resolved
* search policy to skip an interval if it has been recently compacted or if it | ||
* keeps failing repeatedly. | ||
*/ | ||
public class CompactionStatusTracker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should be a way to forget the state. That is don't compact an interval if its been failing but then try again after X minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That logic would reside in the policy itself.
I will write an experimental policy to demonstrate this usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this interface allow for policies that are at interval level e.g. compacting intervals with the least compacted data first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All policies are at interval level since we can either choose to compact interval or not compact it at all.
(The current state of compaction in an interval can be partial though, if segments were appended to that interval after a prior round of compaction.)
The object SegmentsToCompact
represents all segments to compact in a single interval.
Do you think some rename of the base policy would help?
private Response buildErrorResponseWhenRunningAsSupervisor() | ||
{ | ||
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity( | ||
ImmutableMap.of( | ||
"error", | ||
"Compaction has been disabled on the Coordinator." | ||
+ " Use Overlord APIs to fetch compaction status." | ||
) | ||
).build(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a problem during rolling config changes when you are going from overlord based compaction to coordinator based compaction. Router will start issuing requests to coordinator but coordinator will reject till it has been restarted with the property. maybe we can return an error code (302) and router can send the request to overlord?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we don't return an error but log a warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we just log a warning, we might return stale information from the coordinator, without the client ever realizing it.
Returning 302 would have sufficed if the path was the same for the two resources, but one path has prefix /druid/coordinator/v1/compaction
and the other has /druid/indexer/v1/compaction
.
The Druid clients DruidLeaderClient
and ServiceClientImpl
handle redirects but only update the service location and not the path.
Another option could be to just not do anything at the router, and simply use OverlordClient
on the coordinator side and forward requests to the overlord if compaction supervisors are enabled.
Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekagarwal87 , to deal with the issue with rolling upgrades, I decide to do this instead.
- The leader Overlord is the single source of truth on whether the feature is enabled.
- API
/druid/indexer/v1/compaction/isSupervisorEnabled
can be used by the Coordinator to determine if the feature is enabled. - If enabled, Coordinator redirects the compaction status APIs to the leader Overlord using the
OverlordClient
. - Router does not perform any redirecting between Overlord and Coordinator.
- Overlord does not redirect to Coordinator. When the feature is disabled, Overlord simply returns a 504 (service unavailable) with an appropriate error message. This is to keep things simple and to avoid getting into an infinite redirect situation.
The only reason we need the redirect is that a client (like web-console) using the Coordinator compaction status APIs continues to work seamlessly when we toggle the feature on or off. With a client explicitly using the new Overlord APIs, we don't have the burden of backward compatibility and thus no redirect is required.
@JacksonInject CompactionScheduler scheduler | ||
) | ||
{ | ||
final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gargvishesh , as discussed, I have added this validation.
Unfortunately, with the present structure, I cannot add a validation in CoordinatorCompactionConfigsResource.updateClusterCompactionConfig()
since that class is in druid-server
module and the CompactionSupervisorSpec
class resides in druid-indexing-service
.
If needed, we can try to do it in a follow up PR by moving CoordinatorCompactionConfigsResource
to druid-indexing-service
module.
server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentSearchPolicy.java
Fixed
Show fixed
Hide fixed
* compacting intervals or segments that do not fulfil some required criteria. | ||
*/ | ||
boolean isEligibleForCompaction( | ||
CompactionCandidate candidate, |
Check notice
Code scanning / CodeQL
Useless parameter Note
*/ | ||
boolean isEligibleForCompaction( | ||
CompactionCandidate candidate, | ||
CompactionStatus currentCompactionStatus, |
Check notice
Code scanning / CodeQL
Useless parameter Note
boolean isEligibleForCompaction( | ||
CompactionCandidate candidate, | ||
CompactionStatus currentCompactionStatus, | ||
CompactionTaskStatus latestTaskStatus |
Check notice
Code scanning / CodeQL
Useless parameter Note
Description ----------- Auto-compaction currently poses several challenges as it: 1. may get stuck on a failing interval. 2. may get stuck on the latest interval if more data keeps coming into it. 3. always picks the latest interval regardless of the level of compaction in it. 4. may never pick a datasource if its intervals are not very recent. 5. requires setting an explicit period which does not cater to the changing needs of a Druid cluster. This PR introduces various improvements to compaction scheduling to tackle the above problems. Change Summary -------------- 1. Run compaction for a datasource as a supervisor of type `autocompact` on Overlord. 2. Make compaction policy extensible and configurable. 3. Track status of recently submitted compaction tasks and pass this info to policy. 4. Add `/simulate` API on both Coordinator and Overlord to run compaction simulations. 5. Redirect compaction status APIs to the Overlord when compaction supervisors are enabled.
Description: #16768 introduces new compaction APIs on the Overlord `/compact/status` and `/compact/progress`. But the corresponding `OverlordClient` methods do not return an object compatible with the actual endpoints defined in `OverlordCompactionResource`. This patch ensures that the objects are compatible. Changes: - Add `CompactionStatusResponse` and `CompactionProgressResponse` - Use these as the return type in `OverlordClient` methods and as the response entity in `OverlordCompactionResource` - Add `SupervisorCleanupModule` bound on the Coordinator to perform cleanup of supervisors. Without this module, Coordinator cannot deserialize compaction supervisors.
#16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler
apache#16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler
…17143) #16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler Co-authored-by: Vishesh Garg <[email protected]>
Description
Auto-compaction currently poses several challenges as it:
This PR introduces various improvements to compaction scheduling to tackle the above problems.
Change Summary
autocompact
on Overlord./simulate
API on both Coordinator and Overlord to run compaction simulations.Usage
1. Enable compaction to run as supervisors
druid.supervisor.compaction.enabled=true
runtime.properties
for Overlordfalse
by default.true
false
(default)NOTE: This does not migrate existing compaction configs to the new supervisor flow.
2. Submit a compaction supervisor
POST
/druid/indexer/v1/supervisor
Sample payload:
3. Suspend a compaction supervisor
POST
/druid/indexer/v1/supervisor/{id}/suspend
4. Resume a compaction supervisor
POST
/druid/indexer/v1/supervisor/{id}/resume
5. Update cluster-level compaction configs
POST
/druid/coordinator/v1/config/compaction/cluster
This API was added in #16803
Payload structure:
6. Update policy in the compaction config
POST
/druid/coordinator/v1/config/compaction/cluster
This API was added in #16803
Sample payloads:
[A] Update policy to
smallestSegmentFirst
andpriorityDatasource
towikipedia
7. Simulate compaction run on Overlord (when
druid.supervisor.compaction.enabled=true
)POST
/druid/indexer/v1/compaction/simulate
Payload structure: same as `/druid/coordinator/v1/config/compaction/cluster` Sample payloads: (see next section)
8. Simulate compaction run on Coordinator (when
druid.supervisor.compaction.enabled=false
)POST
/druid/coordinator/v1/compaction/simulate
Sample payloads:
[A] Simulate with no config update
[B] Simulate with new policy
[C] Simulate with new policy and priority datasource
Implementation
1. Run compaction for a datasource as a supervisor of type
autocompact
on Overlord.Why run compaction as a supervisor?
Auto-compaction performs this same duty for tasks of type
compact
.TaskRunner
and be more reactive in nature.Add
OverlordCompactionScheduler
to manage compaction on the leader OverlordThis may lead to some increase in memory footprint of Overlord.
CompactSegments
duty every 5 seconds to identify compactible intervals across all datasources.CompactSegments
duty ensures that the logic remains unchanged from traditional auto-compaction.LocalOverlordClient
which redirects all queries toTaskQueryTool
orTaskQueue
. In traditional auto-compaction, these queries are HTTP API calls from Coordinator to the Overlord.2. Make compaction policy extensible and configurable.
Policy can now be easily extended and specified as a cluster-level dynamic config.
The base policy is
PriorityBasedSegmentSearchPolicy
.A concrete policy can choose to implement the following methods:
getSegmentComparator()
to prioritize between segments and intervalsshouldSkipCompaction()
to completely skip a set of segments/intervals if saypriorityDatasource
. Other datasources are picked for compaction only after all the eligible intervals of the priority datasource have been compacted.Policy
newestSegmentFirst
newestSegmentFirst
policy.priorityDatasource
.Policy
smallestSegmentFirst
priorityDatasource
3. Track status of recently submitted compaction tasks and pass this info to policy
CompactionStatusTracker
that tracks status of recently submitted and completed compaction tasks.shouldSkipCompaction()
method of the segment search policy.4. Add
/simulate
API to run compaction simulationsCompactSegments
duty assuming unlimited task slots5. Redirect compaction status APIs to the Overlord when compaction supervisor is enabled
Overlord exposes a new API
/druid/indexer/v1/compaction/isSupervisorEnabled
which can be used bythe leader Coordinator to determine if compaction is enabled to run as a supervisor.
If this API returns true, the Coordinator does not run the auto-compaction duty at all and redirects the
compaction progress APIs
/compaction/status
and/compaction/progress
to Overlord instead.Main classes to review
CompactionSupervisor
CompactionScheduler
OverlordCompactionScheduler
CompactionStatusTracker
PriorityBasedSegmentSearchPolicy
CompactionRunSimulator
Testing
Pending items
LocalOverlordClient
does not hit the metadata store too oftenRelease note
This PR has: