From 954aaafe0c85c1f4967bdb5798c17d4dc813ddd4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 Jul 2024 23:47:25 -0700 Subject: [PATCH] Refactor: Clean up compaction config classes (#16810) Changes: - Rename `CoordinatorCompactionConfig` to `DruidCompactionConfig` - Rename `CompactionConfigUpdateRequest` to `ClusterCompactionConfig` - Refactor methods in `DruidCompactionConfig` - Clean up `DataSourceCompactionConfigHistory` and its tests - Clean up tests and add new tests - Change API path `/druid/coordinator/v1/config/global` to `/druid/coordinator/v1/config/cluster` --- .../clients/CompactionResourceTestClient.java | 8 +- .../druid/testing/utils/CompactionUtil.java | 83 ------ .../ITAutoCompactionLockContentionTest.java | 27 +- .../duty/ITAutoCompactionTest.java | 48 ++-- .../duty/ITAutoCompactionUpgradeTest.java | 91 +++---- .../ClusterCompactionConfig.java} | 44 ++- .../coordinator/CoordinatorConfigManager.java | 28 +- .../DataSourceCompactionConfig.java | 128 ++++++++- .../DataSourceCompactionConfigAuditEntry.java | 58 +--- .../DataSourceCompactionConfigHistory.java | 61 ++--- ...Config.java => DruidCompactionConfig.java} | 109 +++++--- .../server/coordinator/DruidCoordinator.java | 2 +- .../DruidCoordinatorRuntimeParams.java | 26 +- .../DataSourceCompactionConfigBuilder.java | 155 ----------- .../coordinator/duty/CompactSegments.java | 6 +- .../duty/KillCompactionConfig.java | 12 +- .../CoordinatorCompactionConfigsResource.java | 79 +++--- .../CoordinatorCompactionConfigTest.java | 40 --- ...aSourceCompactionConfigAuditEntryTest.java | 108 ++++---- ...DataSourceCompactionConfigHistoryTest.java | 254 ++++++++++-------- .../DruidCompactionConfigTest.java | 104 +++++++ .../coordinator/DruidCoordinatorTest.java | 8 +- .../coordinator/duty/CompactSegmentsTest.java | 4 +- .../duty/KillCompactionConfigTest.java | 64 ++--- .../CoordinatorSimulationBuilder.java | 8 +- ...rdinatorCompactionConfigsResourceTest.java | 40 +-- 26 files changed, 759 insertions(+), 836 deletions(-) delete mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java rename server/src/main/java/org/apache/druid/server/{http/CompactionConfigUpdateRequest.java => coordinator/ClusterCompactionConfig.java} (63%) rename server/src/main/java/org/apache/druid/server/coordinator/{CoordinatorCompactionConfig.java => DruidCompactionConfig.java} (59%) delete mode 100644 server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java delete mode 100644 server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index 8e5ee737f03c..74d56065ef45 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -28,8 +28,8 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -86,7 +86,7 @@ public void submitCompactionConfig(final DataSourceCompactionConfig dataSourceCo } } - public void deleteCompactionConfig(final String dataSource) throws Exception + public void deleteDataSourceCompactionConfig(final String dataSource) throws Exception { String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); StatusResponseHolder response = httpClient.go(new Request(HttpMethod.DELETE, new URL(url)), responseHandler).get(); @@ -100,7 +100,7 @@ public void deleteCompactionConfig(final String dataSource) throws Exception } } - public CoordinatorCompactionConfig getCoordinatorCompactionConfigs() throws Exception + public DruidCompactionConfig getCompactionConfig() throws Exception { String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL()); StatusResponseHolder response = httpClient.go( @@ -113,7 +113,7 @@ public CoordinatorCompactionConfig getCoordinatorCompactionConfigs() throws Exce response.getContent() ); } - return jsonMapper.readValue(response.getContent(), new TypeReference() {}); + return jsonMapper.readValue(response.getContent(), new TypeReference() {}); } public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSource) throws Exception diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java deleted file mode 100644 index 63316e76687d..000000000000 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testing.utils; - -import org.apache.druid.data.input.MaxSizeSplitHintSpec; -import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; -import org.joda.time.Period; - -/** - * Contains utility methods for Compaction. - */ -public class CompactionUtil -{ - - private CompactionUtil() - { - // no instantiation - } - - public static DataSourceCompactionConfig createCompactionConfig( - String fullDatasourceName, - Integer maxRowsPerSegment, - Period skipOffsetFromLatest - ) - { - return new DataSourceCompactionConfig( - fullDatasourceName, - null, - null, - null, - skipOffsetFromLatest, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - new MaxSizeSplitHintSpec(null, 1), - new DynamicPartitionsSpec(maxRowsPerSegment, null), - null, - null, - null, - null, - null, - 1, - null, - null, - null, - null, - null, - 1, - null - ), - null, - null, - null, - null, - new UserCompactionTaskIOConfig(true), - null, - null - ); - } - -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 79d63cb4550a..0a1071c38e71 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -27,12 +27,11 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.testing.clients.CompactionResourceTestClient; import org.apache.druid.testing.clients.TaskResponseObject; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.CompactionUtil; import org.apache.druid.testing.utils.EventSerializer; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.KafkaUtil; @@ -305,26 +304,26 @@ private boolean shouldSkipTest(boolean testEnableTransaction) */ private void submitAndVerifyCompactionConfig() throws Exception { - final DataSourceCompactionConfig compactionConfig = CompactionUtil - .createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO); + final DataSourceCompactionConfig dataSourceCompactionConfig = DataSourceCompactionConfig + .builder() + .forDataSource(fullDatasourceName) + .withSkipOffsetFromLatest(Period.ZERO) + .withMaxRowsPerSegment(Specs.MAX_ROWS_PER_SEGMENT) + .build(); compactionResource.updateCompactionTaskSlot(0.5, 10, null); - compactionResource.submitCompactionConfig(compactionConfig); + compactionResource.submitCompactionConfig(dataSourceCompactionConfig); // Wait for compaction config to persist Thread.sleep(2000); // Verify that the compaction config is updated correctly. - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - DataSourceCompactionConfig observedCompactionConfig = null; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) { - observedCompactionConfig = dataSourceCompactionConfig; - } - } - Assert.assertEquals(observedCompactionConfig, compactionConfig); + DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig(); + DataSourceCompactionConfig observedCompactionConfig + = compactionConfig.findConfigForDatasource(fullDatasourceName).orNull(); + Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig); observedCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName); - Assert.assertEquals(observedCompactionConfig, compactionConfig); + Assert.assertEquals(observedCompactionConfig, dataSourceCompactionConfig); } /** diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index e0400d3cce2e..d09bced3313c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -54,8 +54,8 @@ import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; @@ -1605,17 +1605,17 @@ public void testUpdateCompactionTaskSlotWithUseAutoScaleSlots() throws Exception { // First try update without useAutoScaleSlots updateCompactionTaskSlot(3, 5, null); - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig(); // Should be default value which is false - Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots()); + Assert.assertFalse(compactionConfig.isUseAutoScaleSlots()); // Now try update from default value to useAutoScaleSlots=true updateCompactionTaskSlot(3, 5, true); - coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots()); + compactionConfig = compactionResource.getCompactionConfig(); + Assert.assertTrue(compactionConfig.isUseAutoScaleSlots()); // Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false updateCompactionTaskSlot(3, 5, false); - coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots()); + compactionConfig = compactionResource.getCompactionConfig(); + Assert.assertFalse(compactionConfig.isUseAutoScaleSlots()); } private void loadData(String indexTask) throws Exception @@ -1802,7 +1802,7 @@ private void submitCompactionConfig( @Nullable CompactionEngine engine ) throws Exception { - DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( + DataSourceCompactionConfig dataSourceCompactionConfig = new DataSourceCompactionConfig( fullDatasourceName, null, null, @@ -1837,19 +1837,15 @@ private void submitCompactionConfig( engine, ImmutableMap.of("maxNumTasks", 2) ); - compactionResource.submitCompactionConfig(compactionConfig); + compactionResource.submitCompactionConfig(dataSourceCompactionConfig); // Wait for compaction config to persist Thread.sleep(2000); // Verify that the compaction config is updated correctly. - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - DataSourceCompactionConfig foundDataSourceCompactionConfig = null; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) { - foundDataSourceCompactionConfig = dataSourceCompactionConfig; - } - } + DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig(); + DataSourceCompactionConfig foundDataSourceCompactionConfig + = compactionConfig.findConfigForDatasource(fullDatasourceName).orNull(); Assert.assertNotNull(foundDataSourceCompactionConfig); Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig()); Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), partitionsSpec); @@ -1864,16 +1860,12 @@ private void submitCompactionConfig( private void deleteCompactionConfig() throws Exception { - compactionResource.deleteCompactionConfig(fullDatasourceName); + compactionResource.deleteDataSourceCompactionConfig(fullDatasourceName); // Verify that the compaction config is updated correctly. - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - DataSourceCompactionConfig foundDataSourceCompactionConfig = null; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) { - foundDataSourceCompactionConfig = dataSourceCompactionConfig; - } - } + DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig(); + DataSourceCompactionConfig foundDataSourceCompactionConfig + = compactionConfig.findConfigForDatasource(fullDatasourceName).orNull(); Assert.assertNull(foundDataSourceCompactionConfig); } @@ -1955,11 +1947,11 @@ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCom { compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots); // Verify that the compaction config is updated correctly. - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio); - Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots); + DruidCompactionConfig compactionConfig = compactionResource.getCompactionConfig(); + Assert.assertEquals(compactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio); + Assert.assertEquals(compactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots); if (useAutoScaleSlots != null) { - Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue()); + Assert.assertEquals(compactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue()); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java index 9e1b54143bbd..b2ca693a4422 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java @@ -24,9 +24,8 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; @@ -44,7 +43,6 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest { - private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class); private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest"; @Inject @@ -58,67 +56,56 @@ public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVers { // Verify that compaction config already exist. This config was inserted manually into the database using SQL script. // This auto compaction configuration payload is from Druid 0.21.0 - CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - DataSourceCompactionConfig foundDataSourceCompactionConfig = null; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) { - foundDataSourceCompactionConfig = dataSourceCompactionConfig; - } - } + DruidCompactionConfig coordinatorCompactionConfig = compactionResource.getCompactionConfig(); + DataSourceCompactionConfig foundDataSourceCompactionConfig + = coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull(); Assert.assertNotNull(foundDataSourceCompactionConfig); // Now submit a new auto compaction configuration PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null); Period newSkipOffset = Period.seconds(0); - DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( - UPGRADE_DATASOURCE_NAME, - null, - null, - null, - newSkipOffset, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - new MaxSizeSplitHintSpec(null, 1), - newPartitionsSpec, - null, - null, - null, - null, - null, - 1, - null, - null, - null, - null, - null, - 1, - null - ), - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), - null, - null, - null, - new UserCompactionTaskIOConfig(true), - null, - null - ); + DataSourceCompactionConfig compactionConfig = DataSourceCompactionConfig + .builder() + .forDataSource(UPGRADE_DATASOURCE_NAME) + .withSkipOffsetFromLatest(newSkipOffset) + .withTuningConfig( + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + new MaxSizeSplitHintSpec(null, 1), + newPartitionsSpec, + null, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + 1, + null + ) + ) + .withGranularitySpec( + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null) + ) + .withIoConfig(new UserCompactionTaskIOConfig(true)) + .build(); compactionResource.submitCompactionConfig(compactionConfig); // Wait for compaction config to persist Thread.sleep(2000); // Verify that compaction was successfully updated - coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); - foundDataSourceCompactionConfig = null; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) { - foundDataSourceCompactionConfig = dataSourceCompactionConfig; - } - } + coordinatorCompactionConfig = compactionResource.getCompactionConfig(); + foundDataSourceCompactionConfig + = coordinatorCompactionConfig.findConfigForDatasource(UPGRADE_DATASOURCE_NAME).orNull(); Assert.assertNotNull(foundDataSourceCompactionConfig); Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig()); Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec); diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java similarity index 63% rename from server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java rename to server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java index 72ab23fde8a7..6009dc12cf4c 100644 --- a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java @@ -17,38 +17,39 @@ * under the License. */ -package org.apache.druid.server.http; +package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.CompactionEngine; import javax.annotation.Nullable; +import java.util.Objects; /** - * Payload to update the cluster-level compaction config. - * All fields of this class must be nullable. A non-value indicates that the - * corresponding field is being updated. + * Cluster-level compaction configs. + * All fields of this class are nullable. A non-null value denotes that the + * corresponding field has been explicitly specified. */ -public class CompactionConfigUpdateRequest +public class ClusterCompactionConfig { private final Double compactionTaskSlotRatio; private final Integer maxCompactionTaskSlots; private final Boolean useAutoScaleSlots; - private final CompactionEngine compactionEngine; + private final CompactionEngine engine; @JsonCreator - public CompactionConfigUpdateRequest( + public ClusterCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine + @JsonProperty("engine") @Nullable CompactionEngine engine ) { this.compactionTaskSlotRatio = compactionTaskSlotRatio; this.maxCompactionTaskSlots = maxCompactionTaskSlots; this.useAutoScaleSlots = useAutoScaleSlots; - this.compactionEngine = compactionEngine; + this.engine = engine; } @Nullable @@ -74,9 +75,30 @@ public Boolean getUseAutoScaleSlots() @Nullable @JsonProperty - public CompactionEngine getCompactionEngine() + public CompactionEngine getEngine() { - return compactionEngine; + return engine; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClusterCompactionConfig that = (ClusterCompactionConfig) o; + return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio) + && Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots) + && Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots) + && engine == that.engine; + } + + @Override + public int hashCode() + { + return Objects.hash(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, engine); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index c82adfef1e8d..e8265bcdd1af 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -31,7 +31,7 @@ /** * Manager to fetch and update dynamic configs {@link CoordinatorDynamicConfig} - * and {@link CoordinatorCompactionConfig}. + * and {@link DruidCompactionConfig}. */ public class CoordinatorConfigManager { @@ -71,12 +71,12 @@ public ConfigManager.SetResult setDynamicConfig(CoordinatorDynamicConfig config, ); } - public CoordinatorCompactionConfig getCurrentCompactionConfig() + public DruidCompactionConfig getCurrentCompactionConfig() { - CoordinatorCompactionConfig config = jacksonConfigManager.watch( - CoordinatorCompactionConfig.CONFIG_KEY, - CoordinatorCompactionConfig.class, - CoordinatorCompactionConfig.empty() + DruidCompactionConfig config = jacksonConfigManager.watch( + DruidCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.class, + DruidCompactionConfig.empty() ).get(); return Preconditions.checkNotNull(config, "Got null config from watcher?!"); @@ -91,7 +91,7 @@ public CoordinatorCompactionConfig getCurrentCompactionConfig() * or if the update was successful. */ public ConfigManager.SetResult getAndUpdateCompactionConfig( - UnaryOperator operator, + UnaryOperator operator, AuditInfo auditInfo ) { @@ -102,16 +102,16 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( tablesConfig.getConfigTable(), MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, - CoordinatorCompactionConfig.CONFIG_KEY + DruidCompactionConfig.CONFIG_KEY ); - CoordinatorCompactionConfig current = convertBytesToCompactionConfig(currentBytes); - CoordinatorCompactionConfig updated = operator.apply(current); + DruidCompactionConfig current = convertBytesToCompactionConfig(currentBytes); + DruidCompactionConfig updated = operator.apply(current); if (current.equals(updated)) { return ConfigManager.SetResult.ok(); } else { return jacksonConfigManager.set( - CoordinatorCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.CONFIG_KEY, currentBytes, updated, auditInfo @@ -119,12 +119,12 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( } } - public CoordinatorCompactionConfig convertBytesToCompactionConfig(byte[] bytes) + public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes) { return jacksonConfigManager.convertByteToConfig( bytes, - CoordinatorCompactionConfig.class, - CoordinatorCompactionConfig.empty() + DruidCompactionConfig.class, + DruidCompactionConfig.empty() ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 91af8b6267d8..193b53a5d058 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder; import org.joda.time.Period; import javax.annotation.Nullable; @@ -44,9 +43,9 @@ public class DataSourceCompactionConfig private final int taskPriority; private final long inputSegmentSizeBytes; - public static DataSourceCompactionConfigBuilder builder() + public static Builder builder() { - return new DataSourceCompactionConfigBuilder(); + return new Builder(); } /** @@ -234,4 +233,127 @@ public int hashCode() result = 31 * result + Arrays.hashCode(metricsSpec); return result; } + + public static class Builder + { + private String dataSource; + private Integer taskPriority; + private Long inputSegmentSizeBytes; + private Integer maxRowsPerSegment; + private Period skipOffsetFromLatest; + private UserCompactionTaskQueryTuningConfig tuningConfig; + private UserCompactionTaskGranularityConfig granularitySpec; + private UserCompactionTaskDimensionsConfig dimensionsSpec; + private AggregatorFactory[] metricsSpec; + private UserCompactionTaskTransformConfig transformSpec; + private UserCompactionTaskIOConfig ioConfig; + private CompactionEngine engine; + private Map taskContext; + + public DataSourceCompactionConfig build() + { + return new DataSourceCompactionConfig( + dataSource, + taskPriority, + inputSegmentSizeBytes, + maxRowsPerSegment, + skipOffsetFromLatest, + tuningConfig, + granularitySpec, + dimensionsSpec, + metricsSpec, + transformSpec, + ioConfig, + engine, + taskContext + ); + } + + public Builder forDataSource(String dataSource) + { + this.dataSource = dataSource; + return this; + } + + public Builder withTaskPriority(Integer taskPriority) + { + this.taskPriority = taskPriority; + return this; + } + + public Builder withInputSegmentSizeBytes(Long inputSegmentSizeBytes) + { + this.inputSegmentSizeBytes = inputSegmentSizeBytes; + return this; + } + + @Deprecated + public Builder withMaxRowsPerSegment(Integer maxRowsPerSegment) + { + this.maxRowsPerSegment = maxRowsPerSegment; + return this; + } + + public Builder withSkipOffsetFromLatest(Period skipOffsetFromLatest) + { + this.skipOffsetFromLatest = skipOffsetFromLatest; + return this; + } + + public Builder withTuningConfig( + UserCompactionTaskQueryTuningConfig tuningConfig + ) + { + this.tuningConfig = tuningConfig; + return this; + } + + public Builder withGranularitySpec( + UserCompactionTaskGranularityConfig granularitySpec + ) + { + this.granularitySpec = granularitySpec; + return this; + } + + public Builder withDimensionsSpec( + UserCompactionTaskDimensionsConfig dimensionsSpec + ) + { + this.dimensionsSpec = dimensionsSpec; + return this; + } + + public Builder withMetricsSpec(AggregatorFactory[] metricsSpec) + { + this.metricsSpec = metricsSpec; + return this; + } + + public Builder withTransformSpec( + UserCompactionTaskTransformConfig transformSpec + ) + { + this.transformSpec = transformSpec; + return this; + } + + public Builder withIoConfig(UserCompactionTaskIOConfig ioConfig) + { + this.ioConfig = ioConfig; + return this; + } + + public Builder withEngine(CompactionEngine engine) + { + this.engine = engine; + return this; + } + + public Builder withTaskContext(Map taskContext) + { + this.taskContext = taskContext; + return this; + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java index 3424212446f2..fdbbb57e53ef 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java @@ -20,24 +20,25 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.audit.AuditInfo; import org.joda.time.DateTime; +import java.util.Objects; + /** * A DTO containing audit information for compaction config for a datasource. */ public class DataSourceCompactionConfigAuditEntry { - private final GlobalCompactionConfig globalConfig; + private final ClusterCompactionConfig globalConfig; private final DataSourceCompactionConfig compactionConfig; private final AuditInfo auditInfo; private final DateTime auditTime; @JsonCreator public DataSourceCompactionConfigAuditEntry( - @JsonProperty("globalConfig") GlobalCompactionConfig globalConfig, + @JsonProperty("globalConfig") ClusterCompactionConfig globalConfig, @JsonProperty("compactionConfig") DataSourceCompactionConfig compactionConfig, @JsonProperty("auditInfo") AuditInfo auditInfo, @JsonProperty("auditTime") DateTime auditTime @@ -50,7 +51,7 @@ public DataSourceCompactionConfigAuditEntry( } @JsonProperty - public GlobalCompactionConfig getGlobalConfig() + public ClusterCompactionConfig getGlobalConfig() { return globalConfig; } @@ -73,52 +74,9 @@ public DateTime getAuditTime() return auditTime; } - /** - * A DTO containing compaction config for that affects the entire cluster. - */ - public static class GlobalCompactionConfig + public boolean hasSameConfig(DataSourceCompactionConfigAuditEntry other) { - private final double compactionTaskSlotRatio; - private final int maxCompactionTaskSlots; - private final boolean useAutoScaleSlots; - - @JsonCreator - public GlobalCompactionConfig( - @JsonProperty("compactionTaskSlotRatio") - double compactionTaskSlotRatio, - @JsonProperty("maxCompactionTaskSlots") int maxCompactionTaskSlots, - @JsonProperty("useAutoScaleSlots") boolean useAutoScaleSlots - ) - { - this.compactionTaskSlotRatio = compactionTaskSlotRatio; - this.maxCompactionTaskSlots = maxCompactionTaskSlots; - this.useAutoScaleSlots = useAutoScaleSlots; - } - - @JsonProperty - public double getCompactionTaskSlotRatio() - { - return compactionTaskSlotRatio; - } - - @JsonProperty - public int getMaxCompactionTaskSlots() - { - return maxCompactionTaskSlots; - } - - @JsonProperty - public boolean isUseAutoScaleSlots() - { - return useAutoScaleSlots; - } - - @JsonIgnore - public boolean hasSameConfig(CoordinatorCompactionConfig coordinatorCompactionConfig) - { - return useAutoScaleSlots == coordinatorCompactionConfig.isUseAutoScaleSlots() && - compactionTaskSlotRatio == coordinatorCompactionConfig.getCompactionTaskSlotRatio() && - coordinatorCompactionConfig.getMaxCompactionTaskSlots() == maxCompactionTaskSlots; - } + return Objects.equals(this.compactionConfig, other.compactionConfig) + && Objects.equals(this.globalConfig, other.globalConfig); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java index ceb4be0d8a6c..4ff9d57465b0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java @@ -27,7 +27,7 @@ /** * A utility class to build the config history for a datasource from audit entries for - * {@link CoordinatorCompactionConfig}. The {@link CoordinatorCompactionConfig} contains the entire config for the + * {@link DruidCompactionConfig}. The {@link DruidCompactionConfig} contains the entire config for the * cluster, so this class creates adds audit entires to the history only when a setting for this datasource or a global * setting has changed. */ @@ -41,54 +41,29 @@ public DataSourceCompactionConfigHistory(String dataSource) this.dataSource = dataSource; } - public void add(CoordinatorCompactionConfig coordinatorCompactionConfig, AuditInfo auditInfo, DateTime auditTime) + public void add(DruidCompactionConfig compactionConfig, AuditInfo auditInfo, DateTime auditTime) { - DataSourceCompactionConfigAuditEntry current = auditEntries.isEmpty() ? null : auditEntries.peek(); - DataSourceCompactionConfigAuditEntry newEntry = null; - boolean hasDataSourceCompactionConfig = false; - for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { - if (dataSource.equals(dataSourceCompactionConfig.getDataSource())) { - hasDataSourceCompactionConfig = true; - if ( - current == null || - ( - !dataSourceCompactionConfig.equals(current.getCompactionConfig()) || - !current.getGlobalConfig().hasSameConfig(coordinatorCompactionConfig) - ) - ) { - current = new DataSourceCompactionConfigAuditEntry( - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - coordinatorCompactionConfig.getCompactionTaskSlotRatio(), - coordinatorCompactionConfig.getMaxCompactionTaskSlots(), - coordinatorCompactionConfig.isUseAutoScaleSlots() - ), - dataSourceCompactionConfig, - auditInfo, - auditTime - ); - newEntry = current; - } - break; - } + final DataSourceCompactionConfigAuditEntry previousEntry = auditEntries.isEmpty() ? null : auditEntries.peek(); + final DataSourceCompactionConfigAuditEntry newEntry = new DataSourceCompactionConfigAuditEntry( + compactionConfig.clusterConfig(), + compactionConfig.findConfigForDatasource(dataSource).orNull(), + auditInfo, + auditTime + ); + + final boolean shouldAddEntry; + if (previousEntry == null) { + shouldAddEntry = newEntry.getCompactionConfig() != null; + } else { + shouldAddEntry = !newEntry.hasSameConfig(previousEntry); } - if (newEntry != null) { - auditEntries.push(newEntry); - } else if (current != null && !hasDataSourceCompactionConfig) { - newEntry = new DataSourceCompactionConfigAuditEntry( - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - coordinatorCompactionConfig.getCompactionTaskSlotRatio(), - coordinatorCompactionConfig.getMaxCompactionTaskSlots(), - coordinatorCompactionConfig.isUseAutoScaleSlots() - ), - null, - auditInfo, - auditTime - ); + + if (shouldAddEntry) { auditEntries.push(newEntry); } } - public List getHistory() + public List getEntries() { return auditEntries; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java similarity index 59% rename from server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java rename to server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java index 15e19cdbd77d..7793b55c4b94 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java @@ -21,70 +21,72 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; +import com.google.common.base.Optional; import org.apache.druid.common.config.Configs; import org.apache.druid.indexer.CompactionEngine; -import org.apache.druid.server.http.CompactionConfigUpdateRequest; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; -public class CoordinatorCompactionConfig +public class DruidCompactionConfig { public static final String CONFIG_KEY = "coordinator.compaction.config"; - private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1; - private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE; - private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false; - private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = CompactionEngine.NATIVE; + private static final DruidCompactionConfig EMPTY_INSTANCE + = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null); private final List compactionConfigs; private final double compactionTaskSlotRatio; private final int maxCompactionTaskSlots; private final boolean useAutoScaleSlots; - private final CompactionEngine compactionEngine; + private final CompactionEngine engine; - public static CoordinatorCompactionConfig from( - CoordinatorCompactionConfig baseConfig, + public DruidCompactionConfig withDatasourceConfigs( List compactionConfigs ) { - return new CoordinatorCompactionConfig( + return new DruidCompactionConfig( compactionConfigs, - baseConfig.compactionTaskSlotRatio, - baseConfig.maxCompactionTaskSlots, - baseConfig.useAutoScaleSlots, - baseConfig.compactionEngine + compactionTaskSlotRatio, + maxCompactionTaskSlots, + useAutoScaleSlots, + engine ); } - public static CoordinatorCompactionConfig from( - CoordinatorCompactionConfig baseConfig, - CompactionConfigUpdateRequest update + public DruidCompactionConfig withClusterConfig( + ClusterCompactionConfig update ) { - return new CoordinatorCompactionConfig( - baseConfig.compactionConfigs, - Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), baseConfig.compactionTaskSlotRatio), - Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), baseConfig.maxCompactionTaskSlots), - Configs.valueOrDefault(update.getUseAutoScaleSlots(), baseConfig.useAutoScaleSlots), - Configs.valueOrDefault(update.getCompactionEngine(), baseConfig.compactionEngine) + return new DruidCompactionConfig( + this.compactionConfigs, + Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio), + Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots), + Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots), + Configs.valueOrDefault(update.getEngine(), engine) ); } - public static CoordinatorCompactionConfig from(List compactionConfigs) + public DruidCompactionConfig withDatasourceConfig(DataSourceCompactionConfig dataSourceConfig) { - return new CoordinatorCompactionConfig(compactionConfigs, null, null, null, null); + final Map configs = dataSourceToCompactionConfigMap(); + configs.put(dataSourceConfig.getDataSource(), dataSourceConfig); + return withDatasourceConfigs(new ArrayList<>(configs.values())); } - public static CoordinatorCompactionConfig empty() + public static DruidCompactionConfig empty() { - return new CoordinatorCompactionConfig(ImmutableList.of(), null, null, null, null); + return EMPTY_INSTANCE; } @JsonCreator - public CoordinatorCompactionConfig( + public DruidCompactionConfig( @JsonProperty("compactionConfigs") List compactionConfigs, @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @@ -92,11 +94,11 @@ public CoordinatorCompactionConfig( @JsonProperty("engine") @Nullable CompactionEngine compactionEngine ) { - this.compactionConfigs = compactionConfigs; - this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, DEFAULT_COMPACTION_TASK_RATIO); - this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, DEFAULT_MAX_COMPACTION_TASK_SLOTS); - this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, DEFAULT_USE_AUTO_SCALE_SLOTS); - this.compactionEngine = Configs.valueOrDefault(compactionEngine, DEFAULT_COMPACTION_ENGINE); + this.compactionConfigs = Configs.valueOrDefault(compactionConfigs, Collections.emptyList()); + this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, 0.1); + this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE); + this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false); + this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE); } @JsonProperty @@ -126,7 +128,36 @@ public boolean isUseAutoScaleSlots() @JsonProperty public CompactionEngine getEngine() { - return compactionEngine; + return engine; + } + + + // Null-safe getters not used for serialization + public ClusterCompactionConfig clusterConfig() + { + return new ClusterCompactionConfig( + compactionTaskSlotRatio, + maxCompactionTaskSlots, + useAutoScaleSlots, + engine + ); + } + + public Map dataSourceToCompactionConfigMap() + { + return getCompactionConfigs().stream().collect( + Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()) + ); + } + + public Optional findConfigForDatasource(String dataSource) + { + for (DataSourceCompactionConfig dataSourceConfig : getCompactionConfigs()) { + if (dataSource.equals(dataSourceConfig.getDataSource())) { + return Optional.of(dataSourceConfig); + } + } + return Optional.absent(); } @Override @@ -138,11 +169,11 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o; + DruidCompactionConfig that = (DruidCompactionConfig) o; return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 && maxCompactionTaskSlots == that.maxCompactionTaskSlots && useAutoScaleSlots == that.useAutoScaleSlots && - compactionEngine == that.compactionEngine && + engine == that.engine && Objects.equals(compactionConfigs, that.compactionConfigs); } @@ -154,7 +185,7 @@ public int hashCode() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - compactionEngine + engine ); } @@ -166,7 +197,7 @@ public String toString() ", compactionTaskSlotRatio=" + compactionTaskSlotRatio + ", maxCompactionTaskSlots=" + maxCompactionTaskSlots + ", useAutoScaleSlots=" + useAutoScaleSlots + - ", compactionEngine=" + compactionEngine + + ", engine=" + engine + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 9710bda79b44..5e468182fa71 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -706,7 +706,7 @@ public void run() = metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments(); final CoordinatorDynamicConfig dynamicConfig = metadataManager.configs().getCurrentDynamicConfig(); - final CoordinatorCompactionConfig compactionConfig = metadataManager.configs().getCurrentCompactionConfig(); + final DruidCompactionConfig compactionConfig = metadataManager.configs().getCurrentCompactionConfig(); DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder(coordinatorStartTime) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index ebdbd4f500e8..45f3993caa5e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -67,7 +67,7 @@ private static TreeSet createUsedSegmentsSet(Iterable private final @Nullable TreeSet usedSegments; private final @Nullable DataSourcesSnapshot dataSourcesSnapshot; private final CoordinatorDynamicConfig coordinatorDynamicConfig; - private final CoordinatorCompactionConfig coordinatorCompactionConfig; + private final DruidCompactionConfig compactionConfig; private final SegmentLoadingConfig segmentLoadingConfig; private final CoordinatorRunStats stats; private final BalancerStrategy balancerStrategy; @@ -81,7 +81,7 @@ private DruidCoordinatorRuntimeParams( @Nullable TreeSet usedSegments, @Nullable DataSourcesSnapshot dataSourcesSnapshot, CoordinatorDynamicConfig coordinatorDynamicConfig, - CoordinatorCompactionConfig coordinatorCompactionConfig, + DruidCompactionConfig compactionConfig, SegmentLoadingConfig segmentLoadingConfig, CoordinatorRunStats stats, BalancerStrategy balancerStrategy, @@ -95,7 +95,7 @@ private DruidCoordinatorRuntimeParams( this.usedSegments = usedSegments; this.dataSourcesSnapshot = dataSourcesSnapshot; this.coordinatorDynamicConfig = coordinatorDynamicConfig; - this.coordinatorCompactionConfig = coordinatorCompactionConfig; + this.compactionConfig = compactionConfig; this.segmentLoadingConfig = segmentLoadingConfig; this.stats = stats; this.balancerStrategy = balancerStrategy; @@ -151,9 +151,9 @@ public CoordinatorDynamicConfig getCoordinatorDynamicConfig() return coordinatorDynamicConfig; } - public CoordinatorCompactionConfig getCoordinatorCompactionConfig() + public DruidCompactionConfig getCompactionConfig() { - return coordinatorCompactionConfig; + return compactionConfig; } public SegmentLoadingConfig getSegmentLoadingConfig() @@ -197,7 +197,7 @@ public Builder buildFromExisting() usedSegments, dataSourcesSnapshot, coordinatorDynamicConfig, - coordinatorCompactionConfig, + compactionConfig, segmentLoadingConfig, stats, balancerStrategy, @@ -215,7 +215,7 @@ public static class Builder private @Nullable TreeSet usedSegments; private @Nullable DataSourcesSnapshot dataSourcesSnapshot; private CoordinatorDynamicConfig coordinatorDynamicConfig; - private CoordinatorCompactionConfig coordinatorCompactionConfig; + private DruidCompactionConfig compactionConfig; private SegmentLoadingConfig segmentLoadingConfig; private CoordinatorRunStats stats; private BalancerStrategy balancerStrategy; @@ -225,7 +225,7 @@ private Builder(DateTime coordinatorStartTime) { this.coordinatorStartTime = coordinatorStartTime; this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); - this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty(); + this.compactionConfig = DruidCompactionConfig.empty(); this.broadcastDatasources = Collections.emptySet(); } @@ -237,7 +237,7 @@ private Builder( @Nullable TreeSet usedSegments, @Nullable DataSourcesSnapshot dataSourcesSnapshot, CoordinatorDynamicConfig coordinatorDynamicConfig, - CoordinatorCompactionConfig coordinatorCompactionConfig, + DruidCompactionConfig compactionConfig, SegmentLoadingConfig segmentLoadingConfig, CoordinatorRunStats stats, BalancerStrategy balancerStrategy, @@ -251,7 +251,7 @@ private Builder( this.usedSegments = usedSegments; this.dataSourcesSnapshot = dataSourcesSnapshot; this.coordinatorDynamicConfig = coordinatorDynamicConfig; - this.coordinatorCompactionConfig = coordinatorCompactionConfig; + this.compactionConfig = compactionConfig; this.segmentLoadingConfig = segmentLoadingConfig; this.stats = stats; this.balancerStrategy = balancerStrategy; @@ -271,7 +271,7 @@ public DruidCoordinatorRuntimeParams build() usedSegments, dataSourcesSnapshot, coordinatorDynamicConfig, - coordinatorCompactionConfig, + compactionConfig, segmentLoadingConfig, stats, balancerStrategy, @@ -367,9 +367,9 @@ public Builder withSegmentLoadingConfig(SegmentLoadingConfig config) return this; } - public Builder withCompactionConfig(CoordinatorCompactionConfig config) + public Builder withCompactionConfig(DruidCompactionConfig config) { - this.coordinatorCompactionConfig = config; + this.compactionConfig = config; return this; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java deleted file mode 100644 index 9cb00dfa614e..000000000000 --- a/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator.config; - -import org.apache.druid.indexer.CompactionEngine; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig; -import org.joda.time.Period; - -import java.util.Map; - -public class DataSourceCompactionConfigBuilder -{ - private String dataSource; - private Integer taskPriority; - private Long inputSegmentSizeBytes; - private Integer maxRowsPerSegment; - private Period skipOffsetFromLatest; - private UserCompactionTaskQueryTuningConfig tuningConfig; - private UserCompactionTaskGranularityConfig granularitySpec; - private UserCompactionTaskDimensionsConfig dimensionsSpec; - private AggregatorFactory[] metricsSpec; - private UserCompactionTaskTransformConfig transformSpec; - private UserCompactionTaskIOConfig ioConfig; - private CompactionEngine engine; - private Map taskContext; - - public DataSourceCompactionConfig build() - { - return new DataSourceCompactionConfig( - dataSource, - taskPriority, - inputSegmentSizeBytes, - maxRowsPerSegment, - skipOffsetFromLatest, - tuningConfig, - granularitySpec, - dimensionsSpec, - metricsSpec, - transformSpec, - ioConfig, - engine, - taskContext - ); - } - - public DataSourceCompactionConfigBuilder forDataSource(String dataSource) - { - this.dataSource = dataSource; - return this; - } - - public DataSourceCompactionConfigBuilder withTaskPriority(Integer taskPriority) - { - this.taskPriority = taskPriority; - return this; - } - - public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long inputSegmentSizeBytes) - { - this.inputSegmentSizeBytes = inputSegmentSizeBytes; - return this; - } - - @Deprecated - public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer maxRowsPerSegment) - { - this.maxRowsPerSegment = maxRowsPerSegment; - return this; - } - - public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period skipOffsetFromLatest) - { - this.skipOffsetFromLatest = skipOffsetFromLatest; - return this; - } - - public DataSourceCompactionConfigBuilder withTuningConfig( - UserCompactionTaskQueryTuningConfig tuningConfig - ) - { - this.tuningConfig = tuningConfig; - return this; - } - - public DataSourceCompactionConfigBuilder withGranularitySpec( - UserCompactionTaskGranularityConfig granularitySpec - ) - { - this.granularitySpec = granularitySpec; - return this; - } - - public DataSourceCompactionConfigBuilder withDimensionsSpec( - UserCompactionTaskDimensionsConfig dimensionsSpec - ) - { - this.dimensionsSpec = dimensionsSpec; - return this; - } - - public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[] metricsSpec) - { - this.metricsSpec = metricsSpec; - return this; - } - - public DataSourceCompactionConfigBuilder withTransformSpec( - UserCompactionTaskTransformConfig transformSpec - ) - { - this.transformSpec = transformSpec; - return this; - } - - public DataSourceCompactionConfigBuilder withIoConfig(UserCompactionTaskIOConfig ioConfig) - { - this.ioConfig = ioConfig; - return this; - } - - public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine) - { - this.engine = engine; - return this; - } - - public DataSourceCompactionConfigBuilder withTaskContext(Map taskContext) - { - this.taskContext = taskContext; - return this; - } -} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index d6e25fc9ac69..7b2392b8c66c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -49,8 +49,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; @@ -120,7 +120,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { LOG.info("Running CompactSegments duty"); - final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig(); + final DruidCompactionConfig dynamicConfig = params.getCompactionConfig(); final int maxCompactionTaskSlots = dynamicConfig.getMaxCompactionTaskSlots(); if (maxCompactionTaskSlots <= 0) { LOG.info("Skipping compaction as maxCompactionTaskSlots is [%d].", maxCompactionTaskSlots); @@ -344,7 +344,7 @@ private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec; } - private int getCompactionTaskCapacity(CoordinatorCompactionConfig dynamicConfig) + private int getCompactionTaskCapacity(DruidCompactionConfig dynamicConfig) { int totalWorkerCapacity = CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java index c231c31c5436..7395be6d8434 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java @@ -26,9 +26,9 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.SegmentsMetadataManager; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.config.MetadataCleanupConfig; import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.DateTime; @@ -82,12 +82,12 @@ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) /** * Creates a new compaction config by deleting entries for inactive datasources. */ - private CoordinatorCompactionConfig deleteConfigsForInactiveDatasources( - CoordinatorCompactionConfig current + private DruidCompactionConfig deleteConfigsForInactiveDatasources( + DruidCompactionConfig current ) { // If current compaction config is empty then there is nothing to do - if (CoordinatorCompactionConfig.empty().equals(current)) { + if (DruidCompactionConfig.empty().equals(current)) { log.info("Nothing to do as compaction config is already empty."); return current; } @@ -102,7 +102,7 @@ private CoordinatorCompactionConfig deleteConfigsForInactiveDatasources( .filter(dataSourceCompactionConfig -> activeDatasources.contains(dataSourceCompactionConfig.getDataSource())) .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - return CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())); + return current.withDatasourceConfigs(ImmutableList.copyOf(updated.values())); } /** @@ -116,7 +116,7 @@ private int tryDeleteCompactionConfigs() throws RetryableException ConfigManager.SetResult result = configManager.getAndUpdateCompactionConfig( current -> { - final CoordinatorCompactionConfig updated = deleteConfigsForInactiveDatasources(current); + final DruidCompactionConfig updated = deleteConfigsForInactiveDatasources(current); int numCurrentConfigs = current.getCompactionConfigs() == null ? 0 : current.getCompactionConfigs().size(); int numUpdatedConfigs = updated.getCompactionConfigs() == null ? 0 : updated.getCompactionConfigs().size(); compactionConfigRemoved.set(Math.max(0, numCurrentConfigs - numUpdatedConfigs)); diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index e8baf8250909..7bd4ee851244 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -19,6 +19,7 @@ package org.apache.druid.server.http; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; @@ -33,11 +34,12 @@ import org.apache.druid.error.NotFound; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.utils.CollectionUtils; @@ -60,9 +62,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; @Path("/druid/coordinator/v1/config/compaction") @ResourceFilters(ConfigResourceFilter.class) @@ -87,21 +87,21 @@ public CoordinatorCompactionConfigsResource( @GET @Produces(MediaType.APPLICATION_JSON) - public Response getClusterCompactionConfig() + public Response getCompactionConfig() { return Response.ok(configManager.getCurrentCompactionConfig()).build(); } @POST - @Path("/global") + @Path("/cluster") @Consumes(MediaType.APPLICATION_JSON) public Response updateClusterCompactionConfig( - CompactionConfigUpdateRequest updatePayload, + ClusterCompactionConfig updatePayload, @Context HttpServletRequest req ) { - UnaryOperator operator = current -> { - final CoordinatorCompactionConfig newConfig = CoordinatorCompactionConfig.from(current, updatePayload); + UnaryOperator operator = current -> { + final DruidCompactionConfig newConfig = current.withClusterConfig(updatePayload); final List datasourceConfigs = newConfig.getCompactionConfigs(); if (CollectionUtils.isNullOrEmpty(datasourceConfigs) @@ -127,7 +127,11 @@ public Response updateClusterCompactionConfig( return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req)); } + /** + * @deprecated in favor of {@link #updateClusterCompactionConfig}. + */ @POST + @Deprecated @Path("/taskslots") @Consumes(MediaType.APPLICATION_JSON) public Response setCompactionTaskLimit( @@ -138,7 +142,7 @@ public Response setCompactionTaskLimit( ) { return updateClusterCompactionConfig( - new CompactionConfigUpdateRequest( + new ClusterCompactionConfig( compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, @@ -155,22 +159,14 @@ public Response addOrUpdateDatasourceCompactionConfig( @Context HttpServletRequest req ) { - UnaryOperator callable = current -> { - final CoordinatorCompactionConfig newCompactionConfig; - final Map newConfigs = current - .getCompactionConfigs() - .stream() - .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); + UnaryOperator callable = current -> { CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(newConfig, current.getEngine()); - if (!validationResult.isValid()) { + if (validationResult.isValid()) { + return current.withDatasourceConfig(newConfig); + } else { throw InvalidInput.exception("Compaction config not supported. Reason[%s].", validationResult.getReason()); } - // Don't persist config with the default engine if engine not specified, to enable update of the default. - newConfigs.put(newConfig.getDataSource(), newConfig); - newCompactionConfig = CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(newConfigs.values())); - - return newCompactionConfig; }; return updateConfigHelper( callable, @@ -183,18 +179,13 @@ public Response addOrUpdateDatasourceCompactionConfig( @Produces(MediaType.APPLICATION_JSON) public Response getDatasourceCompactionConfig(@PathParam("dataSource") String dataSource) { - final CoordinatorCompactionConfig current = configManager.getCurrentCompactionConfig(); - final Map configs = current - .getCompactionConfigs() - .stream() - .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - - final DataSourceCompactionConfig config = configs.get(dataSource); - if (config == null) { + final DruidCompactionConfig current = configManager.getCurrentCompactionConfig(); + final Optional config = current.findConfigForDatasource(dataSource); + if (config.isPresent()) { + return Response.ok().entity(config.get()).build(); + } else { return Response.status(Response.Status.NOT_FOUND).build(); } - - return Response.ok().entity(config).build(); } @GET @@ -211,25 +202,25 @@ public Response getCompactionConfigHistory( List auditEntries; if (theInterval == null && count != null) { auditEntries = auditManager.fetchAuditHistory( - CoordinatorCompactionConfig.CONFIG_KEY, - CoordinatorCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.CONFIG_KEY, count ); } else { auditEntries = auditManager.fetchAuditHistory( - CoordinatorCompactionConfig.CONFIG_KEY, - CoordinatorCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.CONFIG_KEY, + DruidCompactionConfig.CONFIG_KEY, theInterval ); } DataSourceCompactionConfigHistory history = new DataSourceCompactionConfigHistory(dataSource); for (AuditEntry audit : auditEntries) { - CoordinatorCompactionConfig coordinatorCompactionConfig = configManager.convertBytesToCompactionConfig( + DruidCompactionConfig compactionConfig = configManager.convertBytesToCompactionConfig( audit.getPayload().serialized().getBytes(StandardCharsets.UTF_8) ); - history.add(coordinatorCompactionConfig, audit.getAuditInfo(), audit.getAuditTime()); + history.add(compactionConfig, audit.getAuditInfo(), audit.getAuditTime()); } - return Response.ok(history.getHistory()).build(); + return Response.ok(history.getEntries()).build(); } catch (IllegalArgumentException e) { return Response.status(Response.Status.BAD_REQUEST) @@ -246,24 +237,20 @@ public Response deleteCompactionConfig( @Context HttpServletRequest req ) { - UnaryOperator callable = current -> { - final Map configs = current - .getCompactionConfigs() - .stream() - .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - + UnaryOperator callable = current -> { + final Map configs = current.dataSourceToCompactionConfigMap(); final DataSourceCompactionConfig config = configs.remove(dataSource); if (config == null) { throw NotFound.exception("datasource not found"); } - return CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())); + return current.withDatasourceConfigs(ImmutableList.copyOf(configs.values())); }; return updateConfigHelper(callable, AuthorizationUtils.buildAuditInfo(req)); } private Response updateConfigHelper( - UnaryOperator configOperator, + UnaryOperator configOperator, AuditInfo auditInfo ) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java deleted file mode 100644 index 6f39f5527916..000000000000 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -public class CoordinatorCompactionConfigTest -{ - private static final ObjectMapper MAPPER = new DefaultObjectMapper(); - - @Test - public void testSerdeDefaultConfig() throws Exception - { - final CoordinatorCompactionConfig defaultConfig = CoordinatorCompactionConfig.empty(); - final String json = MAPPER.writeValueAsString(defaultConfig); - - CoordinatorCompactionConfig deserialized = MAPPER.readValue(json, CoordinatorCompactionConfig.class); - Assert.assertEquals(defaultConfig, deserialized); - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java index 365c872165e6..27d80e3b5683 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java @@ -19,81 +19,85 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.audit.AuditInfo; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class DataSourceCompactionConfigAuditEntryTest { - private static final double COMPACTION_TASK_SLOT_RATIO = 0.1; - private static final int MAX_COMPACTION_SLOTS = 9; - private static final boolean USE_AUTO_SCALE_SLOTS = true; - - @Mock - private CoordinatorCompactionConfig coordinatorCompactionConfig; - - @Before - public void setUp() - { - Mockito.when(coordinatorCompactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO); - Mockito.when(coordinatorCompactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_SLOTS); - Mockito.when(coordinatorCompactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS); - } + private static final String DS_WIKI = "wiki"; + private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip"); + + private final DataSourceCompactionConfigAuditEntry firstEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ), + DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(), + auditInfo, + DateTimes.nowUtc() + ); @Test - public void testhasSameConfigWithSameBaseConfigShouldReturnTrue() + public void testhasSameConfigWithSameBaseConfigIsTrue() { - DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - COMPACTION_TASK_SLOT_RATIO, - MAX_COMPACTION_SLOTS, - USE_AUTO_SCALE_SLOTS - ); - - Assert.assertTrue(config.hasSameConfig(coordinatorCompactionConfig)); + final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ), + DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(), + auditInfo, + DateTimes.nowUtc() + ); + Assert.assertTrue(firstEntry.hasSameConfig(secondEntry)); + Assert.assertTrue(secondEntry.hasSameConfig(firstEntry)); } @Test - public void testhasSameConfigWithDifferentUseAutoScaleSlotsShouldReturnFalse() + public void testhasSameConfigWithDifferentClusterConfigIsFalse() { - DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - COMPACTION_TASK_SLOT_RATIO, - MAX_COMPACTION_SLOTS, - !USE_AUTO_SCALE_SLOTS - ); + DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, false, CompactionEngine.MSQ), + DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(), + auditInfo, + DateTimes.nowUtc() + ); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); + Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); - Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + secondEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE), + DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(), + auditInfo, + DateTimes.nowUtc() + ); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); + Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); } @Test - public void testhasSameConfigWithDifferentMaxCompactionSlotsShouldReturnFalse() + public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() { - DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - COMPACTION_TASK_SLOT_RATIO, - MAX_COMPACTION_SLOTS + 1, - USE_AUTO_SCALE_SLOTS - ); - - Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE), + DataSourceCompactionConfig.builder().forDataSource(DS_WIKI).build(), + auditInfo, + DateTimes.nowUtc() + ); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); + Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); } @Test - public void testhasSameConfigWithDifferentCompactionSlotRatioShouldReturnFalse() + public void testhasSameConfigWithNullDatasourceConfigIsFalse() { - DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = - new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( - COMPACTION_TASK_SLOT_RATIO - 0.03, - MAX_COMPACTION_SLOTS, - USE_AUTO_SCALE_SLOTS - ); - - Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( + new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE), + null, + auditInfo, + DateTimes.nowUtc() + ); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); + Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java index bf4badb44db7..60a638974f46 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java @@ -19,167 +19,183 @@ package org.apache.druid.server.coordinator; -import com.google.common.collect.ImmutableList; import org.apache.druid.audit.AuditInfo; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.joda.time.DateTime; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Answers; -import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.util.List; + @RunWith(MockitoJUnitRunner.class) public class DataSourceCompactionConfigHistoryTest { - private static final String DATASOURCE = "DATASOURCE"; - private static final String DATASOURCE_2 = "DATASOURCE_2"; - private static final String DATASOURCE_NOT_EXISTS = "DATASOURCE_NOT_EXISTS"; - private static final double COMPACTION_TASK_SLOT_RATIO = 0.1; - private static final int MAX_COMPACTION_TASK_SLOTS = 9; - private static final boolean USE_AUTO_SCALE_SLOTS = false; - private static final DateTime AUDIT_TIME = DateTimes.of(2023, 1, 13, 9, 0); - private static final DateTime AUDIT_TIME_2 = DateTimes.of(2023, 1, 13, 9, 30); - private static final DateTime AUDIT_TIME_3 = DateTimes.of(2023, 1, 13, 10, 0); - - @Mock - private CoordinatorCompactionConfig compactionConfig; - @Mock(answer = Answers.RETURNS_MOCKS) - private DataSourceCompactionConfig configForDataSource; - @Mock(answer = Answers.RETURNS_MOCKS) - private DataSourceCompactionConfig configForDataSourceWithChange; - @Mock(answer = Answers.RETURNS_MOCKS) - private DataSourceCompactionConfig configForDataSource2; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private AuditInfo auditInfo; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private AuditInfo auditInfo2; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private AuditInfo auditInfo3; - - private DataSourceCompactionConfigHistory target; + private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip"); + private final DataSourceCompactionConfig wikiCompactionConfig + = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(); + + private DataSourceCompactionConfigHistory wikiAuditHistory; @Before - public void setUp() + public void setup() { - Mockito.when(compactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO); - Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_TASK_SLOTS); - Mockito.when(compactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS); - Mockito.when(configForDataSource.getDataSource()).thenReturn(DATASOURCE); - Mockito.when(configForDataSourceWithChange.getDataSource()).thenReturn(DATASOURCE); - Mockito.when(configForDataSource2.getDataSource()).thenReturn(DATASOURCE_2); - Mockito.when(compactionConfig.getCompactionConfigs()) - .thenReturn(ImmutableList.of(configForDataSource, configForDataSource2)); - target = new DataSourceCompactionConfigHistory(DATASOURCE); + wikiAuditHistory = new DataSourceCompactionConfigHistory(DS.WIKI); } @Test - public void testAddCompactionConfigShouldAddToHistory() + public void testAddDatasourceConfigShouldAddToHistory() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Assert.assertEquals(1, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + final DateTime auditTime = DateTimes.nowUtc(); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig), + auditInfo, + auditTime + ); + + Assert.assertEquals(1, wikiAuditHistory.getEntries().size()); + DataSourceCompactionConfigAuditEntry auditEntry = wikiAuditHistory.getEntries().get(0); + Assert.assertEquals(wikiCompactionConfig, auditEntry.getCompactionConfig()); Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + Assert.assertEquals(auditTime, auditEntry.getAuditTime()); } @Test - public void testAddAndDeleteCompactionConfigShouldAddBothToHistory() + public void testAddDeleteDatasourceConfigShouldAddBothToHistory() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2)); - target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); - Assert.assertEquals(2, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); - auditEntry = target.getHistory().get(1); - Assert.assertEquals(null, auditEntry.getCompactionConfig()); - Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); + final DateTime auditTime = DateTimes.nowUtc(); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig), + auditInfo, + auditTime + ); + wikiAuditHistory.add(DruidCompactionConfig.empty(), auditInfo, auditTime.plusHours(2)); + + final List entries = wikiAuditHistory.getEntries(); + Assert.assertEquals(2, entries.size()); + + final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0); + Assert.assertEquals(wikiCompactionConfig, firstEntry.getCompactionConfig()); + Assert.assertEquals(auditInfo, firstEntry.getAuditInfo()); + Assert.assertEquals(auditTime, firstEntry.getAuditTime()); + + final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1); + Assert.assertNull(secondEntry.getCompactionConfig()); + Assert.assertEquals(firstEntry.getGlobalConfig(), secondEntry.getGlobalConfig()); + Assert.assertEquals(auditInfo, secondEntry.getAuditInfo()); + Assert.assertEquals(auditTime.plusHours(2), secondEntry.getAuditTime()); } @Test - public void testAddAndDeleteAnotherCompactionConfigShouldNotAddToHistory() + public void testAddDeleteAnotherDatasourceConfigShouldNotAddToHistory() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource)); - target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); - Assert.assertEquals(1, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + final DataSourceCompactionConfig koalaCompactionConfig + = DataSourceCompactionConfig.builder().forDataSource(DS.KOALA).build(); + + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(koalaCompactionConfig), + auditInfo, + DateTimes.nowUtc() + ); + wikiAuditHistory.add(DruidCompactionConfig.empty(), auditInfo, DateTimes.nowUtc()); + + Assert.assertTrue(wikiAuditHistory.getEntries().isEmpty()); } @Test - public void testAddDeletedAddCompactionConfigShouldAddAllToHistory() + public void testAddDeleteAddDatasourceConfigShouldAddAllToHistory() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2)); - target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); - Mockito.when(compactionConfig.getCompactionConfigs()) - .thenReturn(ImmutableList.of(configForDataSourceWithChange, configForDataSource2)); - target.add(compactionConfig, auditInfo3, AUDIT_TIME_3); - Assert.assertEquals(3, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); - auditEntry = target.getHistory().get(2); - Assert.assertEquals(configForDataSourceWithChange, auditEntry.getCompactionConfig()); - Assert.assertEquals(auditInfo3, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME_3, auditEntry.getAuditTime()); + final DateTime auditTime = DateTimes.nowUtc(); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig), + auditInfo, + auditTime + ); + wikiAuditHistory.add( + DruidCompactionConfig.empty(), + auditInfo, + auditTime.plusHours(2) + ); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig), + auditInfo, + auditTime.plusHours(3) + ); + + final List entries = wikiAuditHistory.getEntries(); + Assert.assertEquals(3, entries.size()); + + final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0); + final DataSourceCompactionConfigAuditEntry thirdEntry = entries.get(2); + Assert.assertTrue(firstEntry.hasSameConfig(thirdEntry)); } @Test - public void testAddAndChangeCompactionConfigShouldAddBothToHistory() + public void testAddModifyDatasourceConfigShouldAddBothToHistory() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSourceWithChange)); - target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); - Assert.assertEquals(2, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); - auditEntry = target.getHistory().get(1); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); + final DateTime auditTime = DateTimes.nowUtc(); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig), + auditInfo, + auditTime + ); + + + final DataSourceCompactionConfig updatedWikiConfig + = DataSourceCompactionConfig.builder() + .forDataSource(DS.WIKI) + .withSkipOffsetFromLatest(Period.hours(5)) + .build(); + wikiAuditHistory.add( + DruidCompactionConfig.empty().withDatasourceConfig(updatedWikiConfig), + auditInfo, + auditTime.plusHours(3) + ); + + final List entries = wikiAuditHistory.getEntries(); + Assert.assertEquals(2, entries.size()); + + final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0); + final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1); + Assert.assertEquals(firstEntry.getGlobalConfig(), secondEntry.getGlobalConfig()); + + Assert.assertEquals(wikiCompactionConfig, firstEntry.getCompactionConfig()); + Assert.assertEquals(updatedWikiConfig, secondEntry.getCompactionConfig()); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); } @Test - public void testAddAndChangeGlobalSettingsShouldAddTwice() + public void testAddAndModifyClusterConfigShouldAddTwice() { - target.add(compactionConfig, auditInfo, AUDIT_TIME); - int newMaxTaskSlots = MAX_COMPACTION_TASK_SLOTS - 1; - Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(newMaxTaskSlots); - target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); - Assert.assertEquals(2, target.getHistory().size()); - DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); - Assert.assertEquals(MAX_COMPACTION_TASK_SLOTS, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots()); - auditEntry = target.getHistory().get(1); - Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); - Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); - Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); - Assert.assertEquals(newMaxTaskSlots, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots()); + final DruidCompactionConfig originalConfig + = DruidCompactionConfig.empty().withDatasourceConfig(wikiCompactionConfig); + + wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc()); + + final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig( + new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ) + ); + wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc()); + + final List entries = wikiAuditHistory.getEntries(); + Assert.assertEquals(2, entries.size()); + + final DataSourceCompactionConfigAuditEntry firstEntry = entries.get(0); + final DataSourceCompactionConfigAuditEntry secondEntry = entries.get(1); + Assert.assertEquals(secondEntry.getCompactionConfig(), firstEntry.getCompactionConfig()); + + Assert.assertEquals(originalConfig.clusterConfig(), firstEntry.getGlobalConfig()); + Assert.assertEquals(updatedConfig.clusterConfig(), secondEntry.getGlobalConfig()); + Assert.assertFalse(firstEntry.hasSameConfig(secondEntry)); } - @Test - public void testAddCompactionConfigDoesNotHaveDataSourceWithNoHistoryShouldNotAdd() + private static class DS { - target = new DataSourceCompactionConfigHistory(DATASOURCE_NOT_EXISTS); - target.add(compactionConfig, auditInfo, AUDIT_TIME); - Assert.assertTrue(target.getHistory().isEmpty()); + static final String KOALA = "koala"; + static final String WIKI = "wiki"; } - } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java new file mode 100644 index 000000000000..bf13a94e0080 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class DruidCompactionConfigTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testSerdeDefaultConfig() throws Exception + { + final DruidCompactionConfig defaultConfig = DruidCompactionConfig.empty(); + final String json = MAPPER.writeValueAsString(defaultConfig); + + DruidCompactionConfig deserialized = MAPPER.readValue(json, DruidCompactionConfig.class); + Assert.assertEquals(defaultConfig, deserialized); + } + + @Test + public void testSerdeWithDatasourceConfigs() throws Exception + { + final DruidCompactionConfig config = new DruidCompactionConfig( + Arrays.asList( + DataSourceCompactionConfig + .builder() + .forDataSource(DS.WIKI) + .withSkipOffsetFromLatest(Period.hours(1)) + .build(), + DataSourceCompactionConfig + .builder() + .forDataSource(DS.KOALA) + .withSkipOffsetFromLatest(Period.hours(2)) + .build() + ), + null, + null, + null, + CompactionEngine.MSQ + ); + + final String json = MAPPER.writeValueAsString(config); + DruidCompactionConfig deserialized = MAPPER.readValue(json, DruidCompactionConfig.class); + Assert.assertEquals(config, deserialized); + } + + @Test + public void testCopyWithClusterConfig() + { + final DruidCompactionConfig config = DruidCompactionConfig.empty(); + + final ClusterCompactionConfig clusterConfig = new ClusterCompactionConfig(0.5, 10, false, CompactionEngine.MSQ); + final DruidCompactionConfig copy = config.withClusterConfig(clusterConfig); + + Assert.assertEquals(clusterConfig, copy.clusterConfig()); + Assert.assertNotEquals(clusterConfig, config.clusterConfig()); + } + + @Test + public void testCopyWithDatasourceConfigs() + { + final DruidCompactionConfig config = DruidCompactionConfig.empty(); + Assert.assertTrue(config.getCompactionConfigs().isEmpty()); + + final DataSourceCompactionConfig dataSourceConfig + = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).withEngine(CompactionEngine.NATIVE).build(); + final DruidCompactionConfig copy = config.withDatasourceConfigs(Collections.singletonList(dataSourceConfig)); + + Assert.assertEquals(1, copy.getCompactionConfigs().size()); + Assert.assertEquals(dataSourceConfig, copy.findConfigForDatasource(DS.WIKI).orNull()); + } + + private static class DS + { + static final String WIKI = "wiki"; + static final String KOALA = "koala"; + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 8c7e0ae14e5f..801016eb819a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -136,11 +136,11 @@ public void setUp() throws Exception ).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes(); EasyMock.expect( configManager.watch( - EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY), + EasyMock.eq(DruidCompactionConfig.CONFIG_KEY), EasyMock.anyObject(Class.class), EasyMock.anyObject() ) - ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes(); + ).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes(); EasyMock.replay(configManager); setupServerAndCurator(); curator.start(); @@ -734,11 +734,11 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception ).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes(); EasyMock.expect( configManager.watch( - EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY), + EasyMock.eq(DruidCompactionConfig.CONFIG_KEY), EasyMock.anyObject(Class.class), EasyMock.anyObject() ) - ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes(); + ).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes(); EasyMock.replay(configManager); DruidDataSource dataSource = new DruidDataSource("dataSource1", Collections.emptyMap()); DataSegment dataSegment = new DataSegment( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 236cfaf7da54..eb03acc48bfa 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -69,8 +69,8 @@ import org.apache.druid.segment.indexing.BatchIOConfig; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; @@ -1834,7 +1834,7 @@ private CoordinatorRunStats doCompactSegments( .newBuilder(DateTimes.nowUtc()) .withDataSourcesSnapshot(dataSources) .withCompactionConfig( - new CoordinatorCompactionConfig( + new DruidCompactionConfig( compactionConfigs, numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null numCompactionTaskSlots, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java index 3d441d9b06d9..3056a75c5649 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java @@ -28,9 +28,9 @@ import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.config.MetadataCleanupConfig; @@ -107,13 +107,13 @@ public void testRunDoNothingIfCurrentConfigIsEmpty() ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)) ).thenReturn(null); Mockito.when(mockJacksonConfigManager.convertByteToConfig( ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(CoordinatorCompactionConfig.empty()); + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty())) + ).thenReturn(DruidCompactionConfig.empty()); final MetadataCleanupConfig config = new MetadataCleanupConfig(true, new Duration("PT6S"), null); @@ -129,14 +129,14 @@ public void testRunDoNothingIfCurrentConfigIsEmpty() Mockito.verify(mockJacksonConfigManager).convertByteToConfig( ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty()) ); Mockito.verify(mockConnector).lookup( ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY) ); Mockito.verifyNoMoreInteractions(mockJacksonConfigManager); } @@ -177,24 +177,26 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() null, ImmutableMap.of("key", "val") ); - CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig)); + DruidCompactionConfig originalCurrentConfig = DruidCompactionConfig.empty().withDatasourceConfigs( + ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig) + ); byte[] originalCurrentConfigBytes = {1, 2, 3}; Mockito.when(mockConnector.lookup( ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)) ).thenReturn(originalCurrentConfigBytes); Mockito.when(mockJacksonConfigManager.convertByteToConfig( ArgumentMatchers.eq(originalCurrentConfigBytes), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty())) ).thenReturn(originalCurrentConfig); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName)); final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(DruidCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY), oldConfigCaptor.capture(), newConfigCaptor.capture(), ArgumentMatchers.any()) @@ -221,19 +223,19 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() Mockito.verify(mockJacksonConfigManager).convertByteToConfig( ArgumentMatchers.eq(originalCurrentConfigBytes), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty()) ); Mockito.verify(mockConnector).lookup( ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY) ); Mockito.verify(mockJacksonConfigManager).set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY), ArgumentMatchers.any(byte[].class), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(DruidCompactionConfig.class), ArgumentMatchers.any() ); Mockito.verifyNoMoreInteractions(mockJacksonConfigManager); @@ -261,24 +263,26 @@ public void testRunRetryForRetryableException() ImmutableMap.of("key", "val") ); - CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig)); + DruidCompactionConfig originalCurrentConfig = DruidCompactionConfig.empty().withDatasourceConfigs( + ImmutableList.of(inactiveDatasourceConfig) + ); byte[] originalCurrentConfigBytes = {1, 2, 3}; Mockito.when(mockConnector.lookup( ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY)) ).thenReturn(originalCurrentConfigBytes); Mockito.when(mockJacksonConfigManager.convertByteToConfig( ArgumentMatchers.eq(originalCurrentConfigBytes), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty())) ).thenReturn(originalCurrentConfig); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of()); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY), ArgumentMatchers.any(byte[].class), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(DruidCompactionConfig.class), ArgumentMatchers.any()) ).thenReturn( // Return fail result with RetryableException the first three calls to updated set @@ -304,21 +308,21 @@ public void testRunRetryForRetryableException() // Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig( ArgumentMatchers.eq(originalCurrentConfigBytes), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ArgumentMatchers.eq(DruidCompactionConfig.class), + ArgumentMatchers.eq(DruidCompactionConfig.empty()) ); Mockito.verify(mockConnector, Mockito.times(4)).lookup( ArgumentMatchers.anyString(), ArgumentMatchers.eq("name"), ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY) ); // Should call set (to try set new updated compaction config) four times due to RetryableException when failed Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.eq(DruidCompactionConfig.CONFIG_KEY), ArgumentMatchers.any(byte[].class), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(DruidCompactionConfig.class), ArgumentMatchers.any() ); Mockito.verifyNoMoreInteractions(mockJacksonConfigManager); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 7b6eb280a398..b32f8c055ca2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -37,9 +37,9 @@ import org.apache.druid.java.util.metrics.MetricsVerifier; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.MetadataManager; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; @@ -512,11 +512,11 @@ private JacksonConfigManager mockConfigManager() EasyMock.expect( jacksonConfigManager.watch( - EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY), - EasyMock.eq(CoordinatorCompactionConfig.class), + EasyMock.eq(DruidCompactionConfig.CONFIG_KEY), + EasyMock.eq(DruidCompactionConfig.class), EasyMock.anyObject() ) - ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes(); + ).andReturn(new AtomicReference<>(DruidCompactionConfig.empty())).anyTimes(); return jacksonConfigManager; } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 5be533c0e465..c44b9b2f358a 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -37,12 +37,12 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.TestMetadataStorageConnector; import org.apache.druid.metadata.TestMetadataStorageTablesConfig; -import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; -import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; @@ -97,9 +97,9 @@ public void tearDown() @Test public void testGetDefaultClusterConfig() { - Response response = resource.getClusterCompactionConfig(); - final CoordinatorCompactionConfig defaultConfig - = verifyAndGetPayload(response, CoordinatorCompactionConfig.class); + Response response = resource.getCompactionConfig(); + final DruidCompactionConfig defaultConfig + = verifyAndGetPayload(response, DruidCompactionConfig.class); Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), DELTA); Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots()); @@ -112,14 +112,14 @@ public void testGetDefaultClusterConfig() public void testUpdateGlobalConfig() { Response response = resource.updateClusterCompactionConfig( - new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ), + new ClusterCompactionConfig(0.5, 10, true, CompactionEngine.MSQ), mockHttpServletRequest ); verifyStatus(Response.Status.OK, response); - final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload( - resource.getClusterCompactionConfig(), - CoordinatorCompactionConfig.class + final DruidCompactionConfig updatedConfig = verifyAndGetPayload( + resource.getCompactionConfig(), + DruidCompactionConfig.class ); Assert.assertNotNull(updatedConfig); @@ -132,14 +132,14 @@ public void testUpdateGlobalConfig() @Test public void testSetCompactionTaskLimit() { - final CoordinatorCompactionConfig defaultConfig - = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + final DruidCompactionConfig defaultConfig + = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); Response response = resource.setCompactionTaskLimit(0.5, 9, true, mockHttpServletRequest); verifyStatus(Response.Status.OK, response); - final CoordinatorCompactionConfig updatedConfig - = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + final DruidCompactionConfig updatedConfig + = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); // Verify that the task slot fields have been updated Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA); @@ -170,8 +170,8 @@ public void testAddDatasourceConfig() = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class); Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig); - final CoordinatorCompactionConfig fullCompactionConfig - = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + final DruidCompactionConfig fullCompactionConfig + = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @@ -214,8 +214,8 @@ public void testUpdateDatasourceConfig() = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class); Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig); - final CoordinatorCompactionConfig fullCompactionConfig - = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + final DruidCompactionConfig fullCompactionConfig + = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @@ -274,7 +274,7 @@ public void testUpdateIsNotRetriedIfFailureIsNotRetryable() @Test public void testGetDatasourceConfigHistory() { - final DataSourceCompactionConfigBuilder builder + final DataSourceCompactionConfig.Builder builder = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI); final DataSourceCompactionConfig configV1 = builder.build(); @@ -340,7 +340,7 @@ public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest() verifyStatus(Response.Status.OK, response); response = resource.updateClusterCompactionConfig( - new CompactionConfigUpdateRequest(null, null, null, CompactionEngine.MSQ), + new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ), mockHttpServletRequest ); verifyStatus(Response.Status.BAD_REQUEST, response); @@ -480,7 +480,7 @@ public String getConfigTable() @Override public ConfigManager.SetResult getAndUpdateCompactionConfig( - UnaryOperator operator, + UnaryOperator operator, AuditInfo auditInfo ) {