Skip to content

Commit

Permalink
[Backport] Allow MSQ engine only for compaction supervisors (#17033) (#…
Browse files Browse the repository at this point in the history
…17143)

#16768 added the functionality to run compaction as a supervisor on the overlord.
This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only.
With these changes, users can no longer add MSQ engine as part of datasource compaction config,
or as the default cluster-level compaction engine, on the Coordinator. 

The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>`
to specify the default engine for compaction supervisors.

Since these updates require major changes to existing MSQ compaction integration tests,
this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR.

Key changed/added classes in this patch:
* CompactionSupervisor
* CompactionSupervisorSpec
* CoordinatorCompactionConfigsResource
* OverlordCompactionScheduler

Co-authored-by: Vishesh Garg <[email protected]>
  • Loading branch information
kfaraz and gargvishesh authored Sep 25, 2024
1 parent cf00b4c commit d06327a
Show file tree
Hide file tree
Showing 25 changed files with 229 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;

Expand Down Expand Up @@ -55,6 +56,13 @@ public void start()
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
} else if (!supervisorSpec.getValidationResult().isValid()) {
log.warn(
"Cannot start compaction supervisor for datasource[%s] since the compaction supervisor spec is invalid. "
+ "Reason[%s].",
dataSource,
supervisorSpec.getValidationResult().getReason()
);
} else {
log.info("Starting compaction for dataSource[%s].", dataSource);
scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
Expand All @@ -76,6 +84,13 @@ public SupervisorReport<AutoCompactionSnapshot> getStatus()
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build();
} else if (!supervisorSpec.getValidationResult().isValid()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withMessage(StringUtils.format(
"Compaction supervisor spec is invalid. Reason[%s].",
supervisorSpec.getValidationResult().getReason()
))
.build();
} else {
snapshot = scheduler.getCompactionSnapshot(dataSource);
}
Expand All @@ -90,6 +105,8 @@ public SupervisorStateManager.State getState()
return State.SCHEDULER_STOPPED;
} else if (supervisorSpec.isSuspended()) {
return State.SUSPENDED;
} else if (!supervisorSpec.getValidationResult().isValid()) {
return State.INVALID_SPEC;
} else {
return State.RUNNING;
}
Expand Down Expand Up @@ -132,6 +149,7 @@ public enum State implements SupervisorStateManager.State
SCHEDULER_STOPPED(true),
RUNNING(true),
SUSPENDED(true),
INVALID_SPEC(false),
UNHEALTHY(false);

private final boolean healthy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
Expand All @@ -40,6 +39,7 @@ public class CompactionSupervisorSpec implements SupervisorSpec
private final boolean suspended;
private final DataSourceCompactionConfig spec;
private final CompactionScheduler scheduler;
private final CompactionConfigValidationResult validationResult;

@JsonCreator
public CompactionSupervisorSpec(
Expand All @@ -48,14 +48,10 @@ public CompactionSupervisorSpec(
@JacksonInject CompactionScheduler scheduler
)
{
final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec);
if (!validationResult.isValid()) {
throw InvalidInput.exception("Compaction supervisor 'spec' is invalid. Reason[%s].", validationResult.getReason());
}

this.spec = spec;
this.suspended = Configs.valueOrDefault(suspended, false);
this.scheduler = scheduler;
this.validationResult = scheduler.validateCompactionConfig(spec);
}

@JsonProperty
Expand All @@ -77,6 +73,11 @@ public String getId()
return ID_PREFIX + spec.getDataSource();
}

public CompactionConfigValidationResult getValidationResult()
{
return validationResult;
}

@Override
public CompactionSupervisor createSupervisor()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompa
} else {
return ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
compactionConfigSupplier.get().getEngine()
supervisorConfig.getEngine()
);
}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ private synchronized void scheduledRun()
private synchronized void runCompactionDuty()
{
final CoordinatorRunStats stats = new CoordinatorRunStats();
duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), stats);
duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats);

// Emit stats only if emission period has elapsed
if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) {
Expand Down Expand Up @@ -309,7 +309,8 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon
if (isRunning()) {
return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig(
getLatestConfig().withClusterConfig(updateRequest),
getCurrentDatasourceTimelines()
getCurrentDatasourceTimelines(),
supervisorConfig.getEngine()
);
} else {
return new CompactionSimulateResult(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,29 @@ public void testSerdeOfSuspendedSpec()
}

@Test
public void testInvalidSpecThrowsException()
public void testGetStatusWithInvalidSpec()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
final DruidException exception = Assert.assertThrows(
DruidException.class,
() -> new CompactionSupervisorSpec(null, false, scheduler)
);
Assert.assertEquals(
"Compaction supervisor 'spec' is invalid. Reason[bad spec].",
exception.getMessage()
"Compaction supervisor spec is invalid. Reason[bad spec].", new CompactionSupervisorSpec(
new DataSourceCompactionConfig.Builder().forDataSource("datasource").build(),
false,
scheduler
).createSupervisor().getStatus().getPayload().getMessage()
);
}

@Test
public void testGetValidationResultForInvalidSpec()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
CompactionConfigValidationResult validationResult = new CompactionSupervisorSpec(null, false, scheduler).getValidationResult();
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals("bad spec", validationResult.getReason());
}

@Test
public void testGetIdAndDataSources()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setUp()
serviceEmitter = new StubServiceEmitter();
segmentsMetadataManager = new TestSegmentsMetadataManager();

supervisorConfig = new CompactionSupervisorConfig(true);
supervisorConfig = new CompactionSupervisorConfig(true, null);
compactionConfig = DruidCompactionConfig.empty();
coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null);

Expand Down Expand Up @@ -149,7 +149,7 @@ private void initScheduler()
@Test
public void testStartStopWhenSchedulerIsEnabled()
{
supervisorConfig = new CompactionSupervisorConfig(true);
supervisorConfig = new CompactionSupervisorConfig(true, null);
Assert.assertFalse(scheduler.isRunning());

scheduler.start();
Expand All @@ -168,7 +168,7 @@ public void testStartStopWhenSchedulerIsEnabled()
@Test
public void testStartStopWhenScheduledIsDisabled()
{
supervisorConfig = new CompactionSupervisorConfig(false);
supervisorConfig = new CompactionSupervisorConfig(false, null);
initScheduler();

Assert.assertFalse(scheduler.isRunning());
Expand All @@ -183,7 +183,7 @@ public void testStartStopWhenScheduledIsDisabled()
@Test
public void testSegmentsAreNotPolledWhenSchedulerIsDisabled()
{
supervisorConfig = new CompactionSupervisorConfig(false);
supervisorConfig = new CompactionSupervisorConfig(false, null);
initScheduler();

verifySegmentPolling(false);
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testRunSimulation()
);

final CompactionSimulateResult simulateResult = scheduler.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(null, null, null, null, null)
new ClusterCompactionConfig(null, null, null, null)
);
Assert.assertEquals(1, simulateResult.getCompactionStates().size());
final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING);
Expand All @@ -362,7 +362,7 @@ public void testRunSimulation()
scheduler.stopCompaction(TestDataSource.WIKI);

final CompactionSimulateResult simulateResultWhenDisabled = scheduler.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(null, null, null, null, null)
new ClusterCompactionConfig(null, null, null, null)
);
Assert.assertTrue(simulateResultWhenDisabled.getCompactionStates().isEmpty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
public class OverlordCompactionResourceTest
{
private static final CompactionSupervisorConfig SUPERVISOR_ENABLED
= new CompactionSupervisorConfig(true);
= new CompactionSupervisorConfig(true, null);
private static final CompactionSupervisorConfig SUPERVISOR_DISABLED
= new CompactionSupervisorConfig(false);
= new CompactionSupervisorConfig(false, null);

private CompactionScheduler scheduler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@DataProvider(name = "engine")
public static Object[][] engine()
{
return new Object[][]{{CompactionEngine.NATIVE}, {CompactionEngine.MSQ}};
return new Object[][]{{CompactionEngine.NATIVE}};
}

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
Expand Down Expand Up @@ -75,7 +76,8 @@ public CompactionRunSimulator(
*/
public CompactionSimulateResult simulateRunWithConfig(
DruidCompactionConfig compactionConfig,
Map<String, SegmentTimeline> datasourceTimelines
Map<String, SegmentTimeline> datasourceTimelines,
CompactionEngine defaultEngine
)
{
final Table compactedIntervals
Expand Down Expand Up @@ -138,13 +140,14 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan

// Unlimited task slots to ensure that simulator does not skip any interval
final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig(
new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null)
);

final CoordinatorRunStats stats = new CoordinatorRunStats();
new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
configWithUnlimitedTaskSlots,
datasourceTimelines,
defaultEngine,
stats
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.compaction.CompactionStatistics;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Objects;

Expand All @@ -41,6 +42,8 @@ public enum AutoCompactionScheduleStatus
@JsonProperty
private final AutoCompactionScheduleStatus scheduleStatus;
@JsonProperty
private final String message;
@JsonProperty
private final long bytesAwaitingCompaction;
@JsonProperty
private final long bytesCompacted;
Expand Down Expand Up @@ -68,6 +71,7 @@ public static Builder builder(String dataSource)
public AutoCompactionSnapshot(
@JsonProperty("dataSource") @NotNull String dataSource,
@JsonProperty("scheduleStatus") @NotNull AutoCompactionScheduleStatus scheduleStatus,
@JsonProperty("message") @Nullable String message,
@JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction,
@JsonProperty("bytesCompacted") long bytesCompacted,
@JsonProperty("bytesSkipped") long bytesSkipped,
Expand All @@ -81,6 +85,7 @@ public AutoCompactionSnapshot(
{
this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.message = message;
this.bytesAwaitingCompaction = bytesAwaitingCompaction;
this.bytesCompacted = bytesCompacted;
this.bytesSkipped = bytesSkipped;
Expand All @@ -104,6 +109,12 @@ public AutoCompactionScheduleStatus getScheduleStatus()
return scheduleStatus;
}

@Nullable
public String getMessage()
{
return message;
}

public long getBytesAwaitingCompaction()
{
return bytesAwaitingCompaction;
Expand Down Expand Up @@ -169,7 +180,8 @@ public boolean equals(Object o)
intervalCountCompacted == that.intervalCountCompacted &&
intervalCountSkipped == that.intervalCountSkipped &&
dataSource.equals(that.dataSource) &&
scheduleStatus == that.scheduleStatus;
scheduleStatus == that.scheduleStatus &&
Objects.equals(message, that.message);
}

@Override
Expand All @@ -178,6 +190,7 @@ public int hashCode()
return Objects.hash(
dataSource,
scheduleStatus,
message,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
Expand All @@ -194,6 +207,7 @@ public static class Builder
{
private final String dataSource;
private AutoCompactionScheduleStatus scheduleStatus;
private String message;

private final CompactionStatistics compactedStats = new CompactionStatistics();
private final CompactionStatistics skippedStats = new CompactionStatistics();
Expand All @@ -215,6 +229,12 @@ public Builder withStatus(AutoCompactionScheduleStatus status)
return this;
}

public Builder withMessage(String message)
{
this.message = message;
return this;
}

public void incrementWaitingStats(CompactionStatistics entry)
{
waitingStats.increment(entry);
Expand All @@ -235,6 +255,7 @@ public AutoCompactionSnapshot build()
return new AutoCompactionSnapshot(
dataSource,
scheduleStatus,
message,
waitingStats.getTotalBytes(),
compactedStats.getTotalBytes(),
skippedStats.getTotalBytes(),
Expand Down
Loading

0 comments on commit d06327a

Please sign in to comment.