Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run compaction as a supervisor on Overlord #16768

Merged
merged 35 commits into from
Sep 2, 2024

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jul 22, 2024

Description

Auto-compaction currently poses several challenges as it:

  1. may get stuck on a failing interval.
  2. may get stuck on the latest interval if more data keeps coming into it.
  3. always picks the latest interval regardless of the level of compaction in it.
  4. may never pick a datasource if its intervals are not very recent.
  5. requires setting an explicit period which does not cater to the changing needs of a Druid cluster.

This PR introduces various improvements to compaction scheduling to tackle the above problems.

Change Summary

  1. Run compaction for a datasource as a supervisor of type autocompact on Overlord.
  2. Make compaction policy extensible and configurable.
  3. Track status of recently submitted compaction tasks and pass this info to policy.
  4. Add /simulate API on both Coordinator and Overlord to run compaction simulations.
  5. Redirect compaction status APIs to the Overlord when compaction supervisors are enabled.

Usage

1. Enable compaction to run as supervisors

druid.supervisor.compaction.enabled=true

  • Set the above property in the runtime.properties for Overlord
  • This property is false by default.
  • No period needs to be specified as compaction supervisors run and trigger compaction as soon as compaction slots become available.
  • When the above property is set to true
    • Coordinator does not run auto-compaction duty.
    • Coordinator redirects compaction progress APIs to the leader Overlord instead.
  • When the above property is set to false (default)
    • Compaction supervisors do not run but supervisors may still be created, updated or deleted
    • Overlord returns 504 (service unavailable) response on compaction progress APIs

NOTE: This does not migrate existing compaction configs to the new supervisor flow.

2. Submit a compaction supervisor

POST /druid/indexer/v1/supervisor

Sample payload:

{
   "type": "autocompact",    // required
   "suspended": false,         // optional
   "spec": {                           // required
       "dataSource": "wikipedia",          // required
       "tuningConfig": {...},                    // optional
       "granularitySpec": {...},               // optional
       ... other fields supported in compaction config ...
   }
}

3. Suspend a compaction supervisor

POST /druid/indexer/v1/supervisor/{id}/suspend

  • This suspends the compaction supervisor i.e. no more compaction tasks would be submitted for this datasource.
  • Once suspended, the supervisor may not report compacted stats correctly.

4. Resume a compaction supervisor

POST /druid/indexer/v1/supervisor/{id}/resume

5. Update cluster-level compaction configs

POST /druid/coordinator/v1/config/compaction/cluster

This API was added in #16803

Payload structure:

{
   "maxCompactionTaskSlots": 4,         // optional
   "compactionTaskSlotRatio": 0.1,       // optional
   "compactionPolicy": {                 // optional
       "type": "newestSegmentFirst",     // required
       "priorityDatasource": "wikipedia" // optional
   }
}

6. Update policy in the compaction config

POST /druid/coordinator/v1/config/compaction/cluster

This API was added in #16803

Sample payloads:

[A] Update policy to smallestSegmentFirst and priorityDatasource to wikipedia

{
   "compactionPolicy": {
       "type": "newestSegmentFirst",
       "priorityDatasource": "wikipedia"
   }
}

7. Simulate compaction run on Overlord (when druid.supervisor.compaction.enabled=true)

POST /druid/indexer/v1/compaction/simulate

Payload structure: same as `/druid/coordinator/v1/config/compaction/cluster` Sample payloads: (see next section)

8. Simulate compaction run on Coordinator (when druid.supervisor.compaction.enabled=false)

POST /druid/coordinator/v1/compaction/simulate

Sample payloads:

[A] Simulate with no config update

{ }

[B] Simulate with new policy

{
   "compactionPolicy": {
       "type": "smallestSegmentFirst"
   }
}

[C] Simulate with new policy and priority datasource

{
   "compactionPolicy": {
       "type": "smallestSegmentFirst",
       "priorityDatasource": "wikipedia"
   }
}

Implementation

1. Run compaction for a datasource as a supervisor of type autocompact on Overlord.

Why run compaction as a supervisor?

  • The job of a supervisor is to manage indexing tasks of a certain type and react to changes in the cluster.
    Auto-compaction performs this same duty for tasks of type compact.
  • Polling task statuses becomes inexpensive since the info is already cached on the Overlord.
  • Auto-compaction can receive task update notifications from TaskRunner and be more reactive in nature.
  • Leverage existing supervisor mechanism and APIs that align perfectly to the use case:
    • suspend/resume compaction
    • get compaction stats
    • monitor compaction health
    • update compaction spec
Add OverlordCompactionScheduler to manage compaction on the leader Overlord

  • Manages compaction of all active supervisors
  • Polls segments from the metadata store and builds a timeline to identify compactible segments.
    This may lead to some increase in memory footprint of Overlord.
  • Runs the CompactSegments duty every 5 seconds to identify compactible intervals across all datasources.
    • Using the CompactSegments duty ensures that the logic remains unchanged from traditional auto-compaction.
  • Uses LocalOverlordClient which redirects all queries to TaskQueryTool or TaskQueue. In traditional auto-compaction, these queries are HTTP API calls from Coordinator to the Overlord.

2. Make compaction policy extensible and configurable.

Policy can now be easily extended and specified as a cluster-level dynamic config.

The base policy is PriorityBasedSegmentSearchPolicy.
A concrete policy can choose to implement the following methods:

  • getSegmentComparator() to prioritize between segments and intervals
  • shouldSkipCompaction() to completely skip a set of segments/intervals if say
    • interval was compacted recently, or
    • compaction tasks for interval are failing repeatedly due to corrupt data, or
    • interval has very segments that require compaction, etc.
  • priorityDatasource. Other datasources are picked for compaction only after all the eligible intervals of the priority datasource have been compacted.
Policy newestSegmentFirst

  • There are no functional changes to the default behaviour of the newestSegmentFirst policy.
  • It continues to be the default policy picked for compaction.
  • It supports declaring a priorityDatasource.
Policy smallestSegmentFirst

  • This is a new experimental policy and has been added merely to demonstrate how new policies can be written.
  • Supports priorityDatasource

3. Track status of recently submitted compaction tasks and pass this info to policy

  • Add CompactionStatusTracker that tracks status of recently submitted and completed compaction tasks.
  • This info can be used by the shouldSkipCompaction() method of the segment search policy.
  • Intervals for which tasks have been recently submitted and are still running are always skipped.

4. Add /simulate API to run compaction simulations

  • Simulates a run of the CompactSegments duty assuming unlimited task slots
  • Users can see the which intervals are already compacted, skipped or queued for compaction, etc.
  • Users can see how the duty will behave with a new compaction dynamic config before even persisting the change.
  • Makes debugging easier in case compaction seems to be stuck or misbehaving.

5. Redirect compaction status APIs to the Overlord when compaction supervisor is enabled

Overlord exposes a new API /druid/indexer/v1/compaction/isSupervisorEnabled which can be used by
the leader Coordinator to determine if compaction is enabled to run as a supervisor.
If this API returns true, the Coordinator does not run the auto-compaction duty at all and redirects the
compaction progress APIs /compaction/status and /compaction/progress to Overlord instead.

Main classes to review

  • CompactionSupervisor
  • CompactionScheduler
  • OverlordCompactionScheduler
  • CompactionStatusTracker
  • PriorityBasedSegmentSearchPolicy
  • CompactionRunSimulator

Testing

  • Small local Druid cluster
  • Unit tests
  • Integration tests
  • Large Druid cluster

Pending items

  • Ensure that LocalOverlordClient does not hit the metadata store too often

Release note

  • Pending

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz changed the title Add Compaction Scheduler Run compaction as a supervisor on Overlord (aka Compaction Scheduler) Aug 8, 2024
return 0;
}

public enum State implements SupervisorStateManager.State

Check notice

Code scanning / CodeQL

Class has same name as super class Note

State has the same name as its supertype
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
.
@kfaraz kfaraz changed the title Run compaction as a supervisor on Overlord (aka Compaction Scheduler) Run compaction as a supervisor on Overlord Aug 10, 2024
Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much memory usage is going to increase on Overlord after it starts doing the segment polling?

* search policy to skip an interval if it has been recently compacted or if it
* keeps failing repeatedly.
*/
public class CompactionStatusTracker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a way to forget the state. That is don't compact an interval if its been failing but then try again after X minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That logic would reside in the policy itself.
I will write an experimental policy to demonstrate this usage.

services/src/main/java/org/apache/druid/cli/CliRouter.java Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this interface allow for policies that are at interval level e.g. compacting intervals with the least compacted data first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All policies are at interval level since we can either choose to compact interval or not compact it at all.
(The current state of compaction in an interval can be partial though, if segments were appended to that interval after a prior round of compaction.)

The object SegmentsToCompact represents all segments to compact in a single interval.
Do you think some rename of the base policy would help?

Comment on lines 134 to 143
private Response buildErrorResponseWhenRunningAsSupervisor()
{
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
ImmutableMap.of(
"error",
"Compaction has been disabled on the Coordinator."
+ " Use Overlord APIs to fetch compaction status."
)
).build();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a problem during rolling config changes when you are going from overlord based compaction to coordinator based compaction. Router will start issuing requests to coordinator but coordinator will reject till it has been restarted with the property. maybe we can return an error code (302) and router can send the request to overlord?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we don't return an error but log a warning?

Copy link
Contributor Author

@kfaraz kfaraz Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just log a warning, we might return stale information from the coordinator, without the client ever realizing it.

Returning 302 would have sufficed if the path was the same for the two resources, but one path has prefix /druid/coordinator/v1/compaction and the other has /druid/indexer/v1/compaction.
The Druid clients DruidLeaderClient and ServiceClientImpl handle redirects but only update the service location and not the path.

Another option could be to just not do anything at the router, and simply use OverlordClient on the coordinator side and forward requests to the overlord if compaction supervisors are enabled.

Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekagarwal87 , to deal with the issue with rolling upgrades, I decide to do this instead.

  • The leader Overlord is the single source of truth on whether the feature is enabled.
  • API /druid/indexer/v1/compaction/isSupervisorEnabled can be used by the Coordinator to determine if the feature is enabled.
  • If enabled, Coordinator redirects the compaction status APIs to the leader Overlord using the OverlordClient.
  • Router does not perform any redirecting between Overlord and Coordinator.
  • Overlord does not redirect to Coordinator. When the feature is disabled, Overlord simply returns a 504 (service unavailable) with an appropriate error message. This is to keep things simple and to avoid getting into an infinite redirect situation.

The only reason we need the redirect is that a client (like web-console) using the Coordinator compaction status APIs continues to work seamlessly when we toggle the feature on or off. With a client explicitly using the new Overlord APIs, we don't have the burden of backward compatibility and thus no redirect is required.

@JacksonInject CompactionScheduler scheduler
)
{
final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gargvishesh , as discussed, I have added this validation.

Unfortunately, with the present structure, I cannot add a validation in CoordinatorCompactionConfigsResource.updateClusterCompactionConfig() since that class is in druid-server module and the CompactionSupervisorSpec class resides in druid-indexing-service.

If needed, we can try to do it in a follow up PR by moving CoordinatorCompactionConfigsResource to druid-indexing-service module.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 1, 2024
* compacting intervals or segments that do not fulfil some required criteria.
*/
boolean isEligibleForCompaction(
CompactionCandidate candidate,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'candidate' is never used.
*/
boolean isEligibleForCompaction(
CompactionCandidate candidate,
CompactionStatus currentCompactionStatus,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'currentCompactionStatus' is never used.
boolean isEligibleForCompaction(
CompactionCandidate candidate,
CompactionStatus currentCompactionStatus,
CompactionTaskStatus latestTaskStatus

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'latestTaskStatus' is never used.
@kfaraz kfaraz closed this Sep 1, 2024
@kfaraz kfaraz reopened this Sep 1, 2024
@kfaraz kfaraz merged commit fe3d589 into apache:master Sep 2, 2024
101 checks passed
@kfaraz kfaraz deleted the compact_scheduler branch September 2, 2024 02:23
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
Description
-----------
Auto-compaction currently poses several challenges as it:
1. may get stuck on a failing interval.
2. may get stuck on the latest interval if more data keeps coming into it.
3. always picks the latest interval regardless of the level of compaction in it.
4. may never pick a datasource if its intervals are not very recent.
5. requires setting an explicit period which does not cater to the changing needs of a Druid cluster.

This PR introduces various improvements to compaction scheduling to tackle the above problems.

Change Summary
--------------
1. Run compaction for a datasource as a supervisor of type `autocompact` on Overlord.
2. Make compaction policy extensible and configurable.
3. Track status of recently submitted compaction tasks and pass this info to policy.
4. Add `/simulate` API on both Coordinator and Overlord to run compaction simulations.
5. Redirect compaction status APIs to the Overlord when compaction supervisors are enabled.
kfaraz added a commit that referenced this pull request Sep 5, 2024
Description:
#16768 introduces new compaction APIs on the Overlord `/compact/status` and `/compact/progress`.
But the corresponding `OverlordClient` methods do not return an object compatible with the actual
endpoints defined in `OverlordCompactionResource`.

This patch ensures that the objects are compatible.

Changes:
- Add `CompactionStatusResponse` and `CompactionProgressResponse`
- Use these as the return type in `OverlordClient` methods and as the response entity in `OverlordCompactionResource`
- Add `SupervisorCleanupModule` bound on the Coordinator to perform cleanup of supervisors.
Without this module, Coordinator cannot deserialize compaction supervisors.
kfaraz pushed a commit that referenced this pull request Sep 24, 2024
#16768 added the functionality to run compaction as a supervisor on the overlord.
This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only.
With these changes, users can no longer add MSQ engine as part of datasource compaction config,
or as the default cluster-level compaction engine, on the Coordinator. 

The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>`
to specify the default engine for compaction supervisors.

Since these updates require major changes to existing MSQ compaction integration tests,
this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR.

Key changed/added classes in this patch:
* CompactionSupervisor
* CompactionSupervisorSpec
* CoordinatorCompactionConfigsResource
* OverlordCompactionScheduler
kfaraz pushed a commit to kfaraz/druid that referenced this pull request Sep 24, 2024
apache#16768 added the functionality to run compaction as a supervisor on the overlord.
This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only.
With these changes, users can no longer add MSQ engine as part of datasource compaction config,
or as the default cluster-level compaction engine, on the Coordinator. 

The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>`
to specify the default engine for compaction supervisors.

Since these updates require major changes to existing MSQ compaction integration tests,
this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR.

Key changed/added classes in this patch:
* CompactionSupervisor
* CompactionSupervisorSpec
* CoordinatorCompactionConfigsResource
* OverlordCompactionScheduler
kfaraz added a commit that referenced this pull request Sep 25, 2024
…17143)

#16768 added the functionality to run compaction as a supervisor on the overlord.
This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only.
With these changes, users can no longer add MSQ engine as part of datasource compaction config,
or as the default cluster-level compaction engine, on the Coordinator. 

The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>`
to specify the default engine for compaction supervisors.

Since these updates require major changes to existing MSQ compaction integration tests,
this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR.

Key changed/added classes in this patch:
* CompactionSupervisor
* CompactionSupervisorSpec
* CoordinatorCompactionConfigsResource
* OverlordCompactionScheduler

Co-authored-by: Vishesh Garg <[email protected]>
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Compaction Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants