From 94ffb456a263e1164dc5a20036d6e5d0a9272e28 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 24 Sep 2024 17:19:16 +0530 Subject: [PATCH] Allow MSQ engine only for compaction supervisors (#17033) #16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler --- .../compact/CompactionSupervisor.java | 18 ++++++ .../compact/CompactionSupervisorSpec.java | 13 ++-- .../compact/OverlordCompactionScheduler.java | 7 ++- .../compact/CompactionSupervisorSpecTest.java | 23 ++++--- .../OverlordCompactionSchedulerTest.java | 12 ++-- .../http/OverlordCompactionResourceTest.java | 4 +- .../duty/ITAutoCompactionTest.java | 2 +- .../compaction/CompactionRunSimulator.java | 7 ++- .../coordinator/AutoCompactionSnapshot.java | 23 ++++++- .../coordinator/ClusterCompactionConfig.java | 17 +----- .../CompactionSupervisorConfig.java | 19 ++++-- .../coordinator/DruidCompactionConfig.java | 18 +----- .../server/coordinator/DruidCoordinator.java | 4 +- .../coordinator/duty/CompactSegments.java | 5 +- .../CoordinatorCompactionConfigsResource.java | 39 ++---------- .../CompactionRunSimulatorTest.java | 4 +- .../AutoCompactionSnapshotTest.java | 5 +- .../CompactionSupervisorConfigTest.java | 60 +++++++++++++++++++ ...aSourceCompactionConfigAuditEntryTest.java | 15 +++-- ...DataSourceCompactionConfigHistoryTest.java | 3 +- .../DruidCompactionConfigTest.java | 2 - .../coordinator/DruidCoordinatorTest.java | 18 ++++++ .../coordinator/duty/CompactSegmentsTest.java | 1 - ...rdinatorCompactionConfigsResourceTest.java | 58 +++++++----------- .../CoordinatorCompactionResourceTest.java | 1 + 25 files changed, 229 insertions(+), 149 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java index d4a3c46ba0b1..a00a968b107a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -55,6 +56,13 @@ public void start() if (supervisorSpec.isSuspended()) { log.info("Suspending compaction for dataSource[%s].", dataSource); scheduler.stopCompaction(dataSource); + } else if (!supervisorSpec.getValidationResult().isValid()) { + log.warn( + "Cannot start compaction supervisor for datasource[%s] since the compaction supervisor spec is invalid. " + + "Reason[%s].", + dataSource, + supervisorSpec.getValidationResult().getReason() + ); } else { log.info("Starting compaction for dataSource[%s].", dataSource); scheduler.startCompaction(dataSource, supervisorSpec.getSpec()); @@ -76,6 +84,13 @@ public SupervisorReport getStatus() snapshot = AutoCompactionSnapshot.builder(dataSource) .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED) .build(); + } else if (!supervisorSpec.getValidationResult().isValid()) { + snapshot = AutoCompactionSnapshot.builder(dataSource) + .withMessage(StringUtils.format( + "Compaction supervisor spec is invalid. Reason[%s].", + supervisorSpec.getValidationResult().getReason() + )) + .build(); } else { snapshot = scheduler.getCompactionSnapshot(dataSource); } @@ -90,6 +105,8 @@ public SupervisorStateManager.State getState() return State.SCHEDULER_STOPPED; } else if (supervisorSpec.isSuspended()) { return State.SUSPENDED; + } else if (!supervisorSpec.getValidationResult().isValid()) { + return State.INVALID_SPEC; } else { return State.RUNNING; } @@ -132,6 +149,7 @@ public enum State implements SupervisorStateManager.State SCHEDULER_STOPPED(true), RUNNING(true), SUSPENDED(true), + INVALID_SPEC(false), UNHEALTHY(false); private final boolean healthy; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java index 6911f35f96e3..66e54e971cdb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -40,6 +39,7 @@ public class CompactionSupervisorSpec implements SupervisorSpec private final boolean suspended; private final DataSourceCompactionConfig spec; private final CompactionScheduler scheduler; + private final CompactionConfigValidationResult validationResult; @JsonCreator public CompactionSupervisorSpec( @@ -48,14 +48,10 @@ public CompactionSupervisorSpec( @JacksonInject CompactionScheduler scheduler ) { - final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec); - if (!validationResult.isValid()) { - throw InvalidInput.exception("Compaction supervisor 'spec' is invalid. Reason[%s].", validationResult.getReason()); - } - this.spec = spec; this.suspended = Configs.valueOrDefault(suspended, false); this.scheduler = scheduler; + this.validationResult = scheduler.validateCompactionConfig(spec); } @JsonProperty @@ -77,6 +73,11 @@ public String getId() return ID_PREFIX + spec.getDataSource(); } + public CompactionConfigValidationResult getValidationResult() + { + return validationResult; + } + @Override public CompactionSupervisor createSupervisor() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index e7b6440deb65..9e2668c81097 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -202,7 +202,7 @@ public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompa } else { return ClientCompactionRunnerInfo.validateCompactionConfig( compactionConfig, - compactionConfigSupplier.get().getEngine() + supervisorConfig.getEngine() ); } } @@ -272,7 +272,7 @@ private synchronized void scheduledRun() private synchronized void runCompactionDuty() { final CoordinatorRunStats stats = new CoordinatorRunStats(); - duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), stats); + duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats); // Emit stats only if emission period has elapsed if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) { @@ -309,7 +309,8 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon if (isRunning()) { return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig( getLatestConfig().withClusterConfig(updateRequest), - getCurrentDatasourceTimelines() + getCurrentDatasourceTimelines(), + supervisorConfig.getEngine() ); } else { return new CompactionSimulateResult(Collections.emptyMap()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java index 2e6a1cf8cc37..4b21ee7cca61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java @@ -82,20 +82,29 @@ public void testSerdeOfSuspendedSpec() } @Test - public void testInvalidSpecThrowsException() + public void testGetStatusWithInvalidSpec() { Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) .thenReturn(CompactionConfigValidationResult.failure("bad spec")); - final DruidException exception = Assert.assertThrows( - DruidException.class, - () -> new CompactionSupervisorSpec(null, false, scheduler) - ); Assert.assertEquals( - "Compaction supervisor 'spec' is invalid. Reason[bad spec].", - exception.getMessage() + "Compaction supervisor spec is invalid. Reason[bad spec].", new CompactionSupervisorSpec( + new DataSourceCompactionConfig.Builder().forDataSource("datasource").build(), + false, + scheduler + ).createSupervisor().getStatus().getPayload().getMessage() ); } + @Test + public void testGetValidationResultForInvalidSpec() + { + Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) + .thenReturn(CompactionConfigValidationResult.failure("bad spec")); + CompactionConfigValidationResult validationResult = new CompactionSupervisorSpec(null, false, scheduler).getValidationResult(); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals("bad spec", validationResult.getReason()); + } + @Test public void testGetIdAndDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index 3133e2b9466a..f48c1d87a2b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -120,7 +120,7 @@ public void setUp() serviceEmitter = new StubServiceEmitter(); segmentsMetadataManager = new TestSegmentsMetadataManager(); - supervisorConfig = new CompactionSupervisorConfig(true); + supervisorConfig = new CompactionSupervisorConfig(true, null); compactionConfig = DruidCompactionConfig.empty(); coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null); @@ -149,7 +149,7 @@ private void initScheduler() @Test public void testStartStopWhenSchedulerIsEnabled() { - supervisorConfig = new CompactionSupervisorConfig(true); + supervisorConfig = new CompactionSupervisorConfig(true, null); Assert.assertFalse(scheduler.isRunning()); scheduler.start(); @@ -168,7 +168,7 @@ public void testStartStopWhenSchedulerIsEnabled() @Test public void testStartStopWhenScheduledIsDisabled() { - supervisorConfig = new CompactionSupervisorConfig(false); + supervisorConfig = new CompactionSupervisorConfig(false, null); initScheduler(); Assert.assertFalse(scheduler.isRunning()); @@ -183,7 +183,7 @@ public void testStartStopWhenScheduledIsDisabled() @Test public void testSegmentsAreNotPolledWhenSchedulerIsDisabled() { - supervisorConfig = new CompactionSupervisorConfig(false); + supervisorConfig = new CompactionSupervisorConfig(false, null); initScheduler(); verifySegmentPolling(false); @@ -337,7 +337,7 @@ public void testRunSimulation() ); final CompactionSimulateResult simulateResult = scheduler.simulateRunWithConfigUpdate( - new ClusterCompactionConfig(null, null, null, null, null) + new ClusterCompactionConfig(null, null, null, null) ); Assert.assertEquals(1, simulateResult.getCompactionStates().size()); final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING); @@ -362,7 +362,7 @@ public void testRunSimulation() scheduler.stopCompaction(TestDataSource.WIKI); final CompactionSimulateResult simulateResultWhenDisabled = scheduler.simulateRunWithConfigUpdate( - new ClusterCompactionConfig(null, null, null, null, null) + new ClusterCompactionConfig(null, null, null, null) ); Assert.assertTrue(simulateResultWhenDisabled.getCompactionStates().isEmpty()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java index b93e6e7c1ac2..d0aba195c423 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java @@ -44,9 +44,9 @@ public class OverlordCompactionResourceTest { private static final CompactionSupervisorConfig SUPERVISOR_ENABLED - = new CompactionSupervisorConfig(true); + = new CompactionSupervisorConfig(true, null); private static final CompactionSupervisorConfig SUPERVISOR_DISABLED - = new CompactionSupervisorConfig(false); + = new CompactionSupervisorConfig(false, null); private CompactionScheduler scheduler; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index a7bae1b30b94..7895423268a9 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -118,7 +118,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest @DataProvider(name = "engine") public static Object[][] engine() { - return new Object[][]{{CompactionEngine.NATIVE}, {CompactionEngine.MSQ}}; + return new Object[][]{{CompactionEngine.NATIVE}}; } @Inject diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java index b51777c9e3b5..e3b1c66fba8e 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java @@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.report.TaskReport; @@ -75,7 +76,8 @@ public CompactionRunSimulator( */ public CompactionSimulateResult simulateRunWithConfig( DruidCompactionConfig compactionConfig, - Map datasourceTimelines + Map datasourceTimelines, + CompactionEngine defaultEngine ) { final Table compactedIntervals @@ -138,13 +140,14 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan // Unlimited task slots to ensure that simulator does not skip any interval final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig( - new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null) + new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null) ); final CoordinatorRunStats stats = new CoordinatorRunStats(); new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run( configWithUnlimitedTaskSlots, datasourceTimelines, + defaultEngine, stats ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index d6fa4835b48e..8aa8882b1cb0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.server.compaction.CompactionStatistics; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.util.Objects; @@ -41,6 +42,8 @@ public enum AutoCompactionScheduleStatus @JsonProperty private final AutoCompactionScheduleStatus scheduleStatus; @JsonProperty + private final String message; + @JsonProperty private final long bytesAwaitingCompaction; @JsonProperty private final long bytesCompacted; @@ -68,6 +71,7 @@ public static Builder builder(String dataSource) public AutoCompactionSnapshot( @JsonProperty("dataSource") @NotNull String dataSource, @JsonProperty("scheduleStatus") @NotNull AutoCompactionScheduleStatus scheduleStatus, + @JsonProperty("message") @Nullable String message, @JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction, @JsonProperty("bytesCompacted") long bytesCompacted, @JsonProperty("bytesSkipped") long bytesSkipped, @@ -81,6 +85,7 @@ public AutoCompactionSnapshot( { this.dataSource = dataSource; this.scheduleStatus = scheduleStatus; + this.message = message; this.bytesAwaitingCompaction = bytesAwaitingCompaction; this.bytesCompacted = bytesCompacted; this.bytesSkipped = bytesSkipped; @@ -104,6 +109,12 @@ public AutoCompactionScheduleStatus getScheduleStatus() return scheduleStatus; } + @Nullable + public String getMessage() + { + return message; + } + public long getBytesAwaitingCompaction() { return bytesAwaitingCompaction; @@ -169,7 +180,8 @@ public boolean equals(Object o) intervalCountCompacted == that.intervalCountCompacted && intervalCountSkipped == that.intervalCountSkipped && dataSource.equals(that.dataSource) && - scheduleStatus == that.scheduleStatus; + scheduleStatus == that.scheduleStatus && + Objects.equals(message, that.message); } @Override @@ -178,6 +190,7 @@ public int hashCode() return Objects.hash( dataSource, scheduleStatus, + message, bytesAwaitingCompaction, bytesCompacted, bytesSkipped, @@ -194,6 +207,7 @@ public static class Builder { private final String dataSource; private AutoCompactionScheduleStatus scheduleStatus; + private String message; private final CompactionStatistics compactedStats = new CompactionStatistics(); private final CompactionStatistics skippedStats = new CompactionStatistics(); @@ -215,6 +229,12 @@ public Builder withStatus(AutoCompactionScheduleStatus status) return this; } + public Builder withMessage(String message) + { + this.message = message; + return this; + } + public void incrementWaitingStats(CompactionStatistics entry) { waitingStats.increment(entry); @@ -235,6 +255,7 @@ public AutoCompactionSnapshot build() return new AutoCompactionSnapshot( dataSource, scheduleStatus, + message, waitingStats.getTotalBytes(), compactedStats.getTotalBytes(), skippedStats.getTotalBytes(), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java index e2b98a32a92c..2e0c070b1ed2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import javax.annotation.Nullable; @@ -37,7 +36,6 @@ public class ClusterCompactionConfig private final Double compactionTaskSlotRatio; private final Integer maxCompactionTaskSlots; private final Boolean useAutoScaleSlots; - private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; @JsonCreator @@ -45,7 +43,6 @@ public ClusterCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("engine") @Nullable CompactionEngine engine, @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy ) { @@ -53,7 +50,6 @@ public ClusterCompactionConfig( this.maxCompactionTaskSlots = maxCompactionTaskSlots; this.useAutoScaleSlots = useAutoScaleSlots; this.compactionPolicy = compactionPolicy; - this.engine = engine; } @Nullable @@ -77,13 +73,6 @@ public Boolean getUseAutoScaleSlots() return useAutoScaleSlots; } - @Nullable - @JsonProperty - public CompactionEngine getEngine() - { - return engine; - } - @Nullable @JsonProperty public CompactionCandidateSearchPolicy getCompactionPolicy() @@ -104,8 +93,7 @@ public boolean equals(Object o) return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio) && Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots) && Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots) - && Objects.equals(compactionPolicy, that.compactionPolicy) - && engine == that.engine; + && Objects.equals(compactionPolicy, that.compactionPolicy); } @Override @@ -115,8 +103,7 @@ public int hashCode() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - compactionPolicy, - engine + compactionPolicy ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java index e738a3e7f0bb..2cc2f0d133f7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.indexer.CompactionEngine; import javax.annotation.Nullable; import java.util.Objects; @@ -33,10 +34,12 @@ */ public class CompactionSupervisorConfig { - private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null); + private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null, null); @JsonProperty private final boolean enabled; + @JsonProperty + private final CompactionEngine engine; public static CompactionSupervisorConfig defaultConfig() { @@ -45,10 +48,12 @@ public static CompactionSupervisorConfig defaultConfig() @JsonCreator public CompactionSupervisorConfig( - @JsonProperty("enabled") @Nullable Boolean enabled + @JsonProperty("enabled") @Nullable Boolean enabled, + @JsonProperty("engine") @Nullable CompactionEngine engine ) { this.enabled = Configs.valueOrDefault(enabled, false); + this.engine = Configs.valueOrDefault(engine, CompactionEngine.NATIVE); } public boolean isEnabled() @@ -56,6 +61,11 @@ public boolean isEnabled() return enabled; } + public CompactionEngine getEngine() + { + return engine; + } + @Override public boolean equals(Object o) { @@ -66,13 +76,13 @@ public boolean equals(Object o) return false; } CompactionSupervisorConfig that = (CompactionSupervisorConfig) o; - return enabled == that.enabled; + return enabled == that.enabled && engine == that.engine; } @Override public int hashCode() { - return Objects.hashCode(enabled); + return Objects.hash(enabled, engine); } @Override @@ -80,6 +90,7 @@ public String toString() { return "CompactionSchedulerConfig{" + "enabled=" + enabled + + "engine=" + engine + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java index b35ba7d29389..96338f5b2ef6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import org.apache.druid.common.config.Configs; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; @@ -43,13 +42,12 @@ public class DruidCompactionConfig private static final CompactionCandidateSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy(null); private static final DruidCompactionConfig EMPTY_INSTANCE - = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null, null); + = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null); private final List compactionConfigs; private final double compactionTaskSlotRatio; private final int maxCompactionTaskSlots; private final boolean useAutoScaleSlots; - private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; public DruidCompactionConfig withDatasourceConfigs( @@ -61,7 +59,6 @@ public DruidCompactionConfig withDatasourceConfigs( compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -75,7 +72,6 @@ public DruidCompactionConfig withClusterConfig( Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio), Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots), Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots), - Configs.valueOrDefault(update.getEngine(), engine), Configs.valueOrDefault(update.getCompactionPolicy(), compactionPolicy) ); } @@ -98,7 +94,6 @@ public DruidCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("engine") @Nullable CompactionEngine compactionEngine, @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy ) { @@ -106,7 +101,6 @@ public DruidCompactionConfig( this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, 0.1); this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE); this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false); - this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE); this.compactionPolicy = Configs.valueOrDefault(compactionPolicy, DEFAULT_COMPACTION_POLICY); } @@ -134,12 +128,6 @@ public boolean isUseAutoScaleSlots() return useAutoScaleSlots; } - @JsonProperty - public CompactionEngine getEngine() - { - return engine; - } - // Null-safe getters not used for serialization public ClusterCompactionConfig clusterConfig() @@ -148,7 +136,6 @@ public ClusterCompactionConfig clusterConfig() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -189,7 +176,6 @@ public boolean equals(Object o) return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 && maxCompactionTaskSlots == that.maxCompactionTaskSlots && useAutoScaleSlots == that.useAutoScaleSlots && - engine == that.engine && Objects.equals(compactionPolicy, that.compactionPolicy) && Objects.equals(compactionConfigs, that.compactionConfigs); } @@ -202,7 +188,6 @@ public int hashCode() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -215,7 +200,6 @@ public String toString() ", compactionTaskSlotRatio=" + compactionTaskSlotRatio + ", maxCompactionTaskSlots=" + maxCompactionTaskSlots + ", useAutoScaleSlots=" + useAutoScaleSlots + - ", engine=" + engine + ", compactionPolicy=" + compactionPolicy + '}'; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 6f907354f081..262750b527a8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -40,6 +40,7 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -362,7 +363,8 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest), metadataManager.segments() .getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource() + .getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index a2f97f298afc..bf27af6358cb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -120,9 +120,11 @@ public OverlordClient getOverlordClient() @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + // Coordinator supports only native engine for compaction run( params.getCompactionConfig(), params.getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE, params.getCoordinatorStats() ); return params; @@ -131,6 +133,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) public void run( DruidCompactionConfig dynamicConfig, Map dataSources, + CompactionEngine defaultEngine, CoordinatorRunStats stats ) { @@ -234,7 +237,7 @@ public void run( currentRunAutoCompactionSnapshotBuilders, availableCompactionTaskSlots, iterator, - dynamicConfig.getEngine() + defaultEngine ); stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity); diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index 8f9dfb9ca853..5215f7de4e26 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -27,22 +27,20 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; -import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.error.NotFound; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.ClusterCompactionConfig; -import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory; import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -100,30 +98,7 @@ public Response updateClusterCompactionConfig( @Context HttpServletRequest req ) { - UnaryOperator operator = current -> { - final DruidCompactionConfig newConfig = current.withClusterConfig(updatePayload); - - final List datasourceConfigs = newConfig.getCompactionConfigs(); - if (CollectionUtils.isNullOrEmpty(datasourceConfigs) - || current.getEngine() == newConfig.getEngine()) { - return newConfig; - } - - // Validate all the datasource configs against the new engine - for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) { - CompactionConfigValidationResult validationResult = - ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, newConfig.getEngine()); - if (!validationResult.isValid()) { - throw InvalidInput.exception( - "Cannot update engine to [%s] as it does not support" - + " compaction config of DataSource[%s]. Reason[%s].", - newConfig.getEngine(), datasourceConfig.getDataSource(), validationResult.getReason() - ); - } - } - - return newConfig; - }; + UnaryOperator operator = current -> current.withClusterConfig(updatePayload); return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req)); } @@ -146,7 +121,6 @@ public Response setCompactionTaskLimit( compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - null, null ), req @@ -161,12 +135,11 @@ public Response addOrUpdateDatasourceCompactionConfig( ) { UnaryOperator callable = current -> { - CompactionConfigValidationResult validationResult = - ClientCompactionRunnerInfo.validateCompactionConfig(newConfig, current.getEngine()); - if (validationResult.isValid()) { - return current.withDatasourceConfig(newConfig); + if (newConfig.getEngine() == CompactionEngine.MSQ) { + throw InvalidInput.exception( + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord."); } else { - throw InvalidInput.exception("Compaction config not supported. Reason[%s].", validationResult.getReason()); + return current.withDatasourceConfig(newConfig); } }; return updateConfigHelper( diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java index 8b5c6bab6c40..aacf4216de78 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.jackson.DefaultObjectMapper; @@ -76,7 +77,8 @@ public void testSimulateClusterCompactionConfigUpdate() DataSourceCompactionConfig.builder().forDataSource("wiki").build() ), segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource() + .getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE ); Assert.assertNotNull(simulateResult); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index 9a8cd3cc8772..4ba65fe2df8c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -29,6 +29,7 @@ public class AutoCompactionSnapshotTest public void testAutoCompactionSnapshotBuilder() { final String expectedDataSource = "data"; + final String expectedMessage = "message"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); // Increment every stat twice @@ -38,7 +39,7 @@ public void testAutoCompactionSnapshotBuilder() builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } - final AutoCompactionSnapshot actual = builder.build(); + final AutoCompactionSnapshot actual = builder.withMessage(expectedMessage).build(); Assert.assertNotNull(actual); Assert.assertEquals(26, actual.getSegmentCountSkipped()); @@ -52,10 +53,12 @@ public void testAutoCompactionSnapshotBuilder() Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction()); Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus()); Assert.assertEquals(expectedDataSource, actual.getDataSource()); + Assert.assertEquals(expectedMessage, actual.getMessage()); AutoCompactionSnapshot expected = new AutoCompactionSnapshot( expectedDataSource, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, + expectedMessage, 26, 26, 26, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java new file mode 100644 index 000000000000..59cc3ecf1718 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class CompactionSupervisorConfigTest +{ + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + + @Test + public void testCompactionSupervisorConfigSerde() throws JsonProcessingException + { + final boolean enabled = true; + final CompactionEngine defaultEngine = CompactionEngine.MSQ; + CompactionSupervisorConfig compactionSupervisorConfig = + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), + CompactionSupervisorConfig.class + ); + Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); + } + + @Test + public void testCompactionSupervisorConfigEquality() + { + Assert.assertEquals( + new CompactionSupervisorConfig(true, CompactionEngine.MSQ), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals( + new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals(new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), "true"); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java index bdd028469211..4647ad8d9853 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestDataSource; import org.junit.Assert; @@ -34,7 +33,7 @@ public class DataSourceCompactionConfigAuditEntryTest private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip"); private final DataSourceCompactionConfigAuditEntry firstEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -44,7 +43,7 @@ public class DataSourceCompactionConfigAuditEntryTest public void testhasSameConfigWithSameBaseConfigIsTrue() { final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -57,7 +56,7 @@ public void testhasSameConfigWithSameBaseConfigIsTrue() public void testhasSameConfigWithDifferentClusterConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, false, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.2, 9, false, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -66,7 +65,7 @@ public void testhasSameConfigWithDifferentClusterConfigIsFalse() Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), + new ClusterCompactionConfig(0.1, 10, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -79,8 +78,8 @@ public void testhasSameConfigWithDifferentClusterConfigIsFalse() public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), - DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), + new ClusterCompactionConfig(0.1, 9, true, null), + DataSourceCompactionConfig.builder().forDataSource(TestDataSource.KOALA).build(), auditInfo, DateTimes.nowUtc() ); @@ -92,7 +91,7 @@ public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() public void testhasSameConfigWithNullDatasourceConfigIsFalse() { final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), + new ClusterCompactionConfig(0.1, 9, true, null), null, auditInfo, DateTimes.nowUtc() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java index 4426d58b258e..bcdee91ecd52 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestDataSource; import org.joda.time.DateTime; @@ -178,7 +177,7 @@ public void testAddAndModifyClusterConfigShouldAddTwice() wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc()); final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig( - new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null) + new ClusterCompactionConfig(0.2, null, null, null) ); wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java index c8ed8d9ba530..2f481727fa10 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java @@ -64,7 +64,6 @@ public void testSerdeWithDatasourceConfigs() throws Exception null, null, null, - CompactionEngine.MSQ, null ); @@ -82,7 +81,6 @@ public void testCopyWithClusterConfig() 0.5, 10, false, - CompactionEngine.MSQ, new NewestSegmentFirstPolicy(null) ); final DruidCompactionConfig copy = config.withClusterConfig(clusterConfig); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 1ebeb991ccbb..c59de936afee 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; @@ -930,6 +931,23 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception EasyMock.verify(metadataRuleManager); } + @Test + public void testSimulateRunWithEmptyDatasourceCompactionConfigs() + { + DruidDataSource dataSource = new DruidDataSource("dataSource", Collections.emptyMap()); + DataSourcesSnapshot dataSourcesSnapshot = + new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource())); + EasyMock + .expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()) + .andReturn(dataSourcesSnapshot) + .anyTimes(); + EasyMock.replay(segmentsMetadataManager); + CompactionSimulateResult result = coordinator.simulateRunWithConfigUpdate( + new ClusterCompactionConfig(0.2, null, null, null) + ); + Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates()); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( int latchCount, PathChildrenCache pathChildrenCache, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 8f84e860deae..7a49222e098d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -1888,7 +1888,6 @@ private CoordinatorRunStats doCompactSegments( numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null numCompactionTaskSlots, useAutoScaleSlots, - null, null ) ) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 24641c81a156..4cef66d2ac55 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -106,14 +106,13 @@ public void testGetDefaultClusterConfig() Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots()); Assert.assertFalse(defaultConfig.isUseAutoScaleSlots()); Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty()); - Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine()); } @Test public void testUpdateClusterConfig() { Response response = resource.updateClusterCompactionConfig( - new ClusterCompactionConfig(0.5, 10, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.5, 10, true, null), mockHttpServletRequest ); verifyStatus(Response.Status.OK, response); @@ -127,7 +126,6 @@ public void testUpdateClusterConfig() Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA); Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots()); Assert.assertTrue(updatedConfig.isUseAutoScaleSlots()); - Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine()); } @Test @@ -149,7 +147,6 @@ public void testSetCompactionTaskLimit() // Verify that the other fields are unchanged Assert.assertEquals(defaultConfig.getCompactionConfigs(), updatedConfig.getCompactionConfigs()); - Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine()); } @Test @@ -177,6 +174,23 @@ public void testAddDatasourceConfig() Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } + @Test + public void testAddDatasourceConfigWithMSQEngineIsInvalid() + { + final DataSourceCompactionConfig newDatasourceConfig + = DataSourceCompactionConfig.builder() + .forDataSource(TestDataSource.WIKI) + .withEngine(CompactionEngine.MSQ) + .build(); + Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.BAD_REQUEST, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + } + @Test public void testUpdateDatasourceConfig() { @@ -209,16 +223,16 @@ public void testUpdateDatasourceConfig() .build(); response = resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, mockHttpServletRequest); - verifyStatus(Response.Status.OK, response); + verifyStatus(Response.Status.BAD_REQUEST, response); final DataSourceCompactionConfig latestDatasourceConfig = verifyAndGetPayload(resource.getDatasourceCompactionConfig(TestDataSource.WIKI), DataSourceCompactionConfig.class); - Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig); + Assert.assertEquals(originalDatasourceConfig, latestDatasourceConfig); final DruidCompactionConfig fullCompactionConfig = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); - Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); + Assert.assertEquals(originalDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @Test @@ -285,7 +299,7 @@ public void testGetDatasourceConfigHistory() resource.addOrUpdateDatasourceCompactionConfig(configV2, mockHttpServletRequest); final DataSourceCompactionConfig configV3 = builder - .withEngine(CompactionEngine.MSQ) + .withEngine(CompactionEngine.NATIVE) .withSkipOffsetFromLatest(Period.hours(1)) .build(); resource.addOrUpdateDatasourceCompactionConfig(configV3, mockHttpServletRequest); @@ -323,36 +337,10 @@ public void testAddInvalidDatasourceConfigThrowsBadRequest() verifyStatus(Response.Status.BAD_REQUEST, response); Assert.assertTrue(response.getEntity() instanceof ErrorResponse); Assert.assertEquals( - "Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]" - + " must be at least 2 (1 controller + 1 worker)].", + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.", ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() ); } - - @Test - public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest() - { - final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig - .builder() - .forDataSource(TestDataSource.WIKI) - .withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1)) - .build(); - Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest); - verifyStatus(Response.Status.OK, response); - - response = resource.updateClusterCompactionConfig( - new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null), - mockHttpServletRequest - ); - verifyStatus(Response.Status.BAD_REQUEST, response); - Assert.assertTrue(response.getEntity() instanceof ErrorResponse); - Assert.assertEquals( - "Cannot update engine to [msq] as it does not support compaction config of DataSource[wiki]." - + " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 worker)].", - ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() - ); - } - @SuppressWarnings("unchecked") private T verifyAndGetPayload(Response response, Class type) { diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index 4a73047d1955..91b348b72f40 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -50,6 +50,7 @@ public class CoordinatorCompactionResourceTest private final AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot( dataSourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, + null, 1, 1, 1,