From bd0e9d93d62a7d37825e0baa4e0d00cbb1b073b4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Jul 2024 20:44:51 -0700 Subject: [PATCH] Add API to update compaction engine (#16803) Changes: - Add API `/druid/coordinator/v1/config/compaction/global` to update cluster level compaction config - Add class `CompactionConfigUpdateRequest` - Fix bug in `CoordinatorCompactionConfig` which caused compaction engine to not be persisted. Use json field name `engine` instead of `compactionEngine` because JSON field names must align with the getter name. - Update MSQ validation error messages - Complete overhaul of `CoordinatorCompactionConfigResourceTest` to remove unnecessary mocking and add more meaningful tests. - Add `TuningConfigBuilder` to easily build tuning configs for tests. - Add `DatasourceCompactionConfigBuilder` --- .../NewestSegmentFirstPolicyBenchmark.java | 21 +- .../msq/indexing/MSQCompactionRunner.java | 2 +- .../msq/indexing/MSQCompactionRunnerTest.java | 2 +- .../common/task/NativeCompactionRunner.java | 2 +- .../indexing/ClientCompactionRunnerInfo.java | 42 +- .../CompactionConfigValidationResult.java | 15 +- .../CoordinatorCompactionConfig.java | 17 +- .../DataSourceCompactionConfig.java | 7 + .../DataSourceCompactionConfigBuilder.java | 155 ++++ .../http/CompactionConfigUpdateRequest.java | 82 ++ .../CoordinatorCompactionConfigsResource.java | 59 +- .../ClientCompactionRunnerInfoTest.java | 9 +- .../CoordinatorCompactionConfigTest.java | 40 + .../DataSourceCompactionConfigTest.java | 372 +++----- ...rdinatorCompactionConfigsResourceTest.java | 849 +++++++++--------- 15 files changed, 926 insertions(+), 748 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java create mode 100644 server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 98c27c4b2b8c5..87e92ab6fb1ca 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -92,21 +92,12 @@ public void setup() final String dataSource = DATA_SOURCE_PREFIX + i; compactionConfigs.put( dataSource, - new DataSourceCompactionConfig( - dataSource, - 0, - inputSegmentSizeBytes, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ) + DataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withTaskPriority(0) + .withInputSegmentSizeBytes(inputSegmentSizeBytes) + .build() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index fb25097e9800f..28ec422e2ace9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -143,7 +143,7 @@ public CompactionConfigValidationResult validateCompactionTask( return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() - .orElse(new CompactionConfigValidationResult(true, null)); + .orElse(CompactionConfigValidationResult.success()); } @Override diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index b95243f7783f8..3f73a9b513efb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -211,7 +211,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.", + "MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator", validationResult.getReason() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index 30761b674e54d..2074d14f0f90c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -88,7 +88,7 @@ public CompactionConfigValidationResult validateCompactionTask( CompactionTask compactionTask ) { - return new CompactionConfigValidationResult(true, null); + return CompactionConfigValidationResult.success(); } /** diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index ed9e22dfaa290..74504184608d4 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -92,7 +92,7 @@ public static CompactionConfigValidationResult validateCompactionConfig( { CompactionEngine compactionEngine = newConfig.getEngine() == null ? defaultCompactionEngine : newConfig.getEngine(); if (compactionEngine == CompactionEngine.NATIVE) { - return new CompactionConfigValidationResult(true, null); + return CompactionConfigValidationResult.success(); } else { return compactionConfigSupportedByMSQEngine(newConfig); } @@ -125,7 +125,7 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() - .orElse(new CompactionConfigValidationResult(true, null)); + .orElse(CompactionConfigValidationResult.success()); } /** @@ -135,22 +135,19 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(Part { if (!(partitionsSpec instanceof DimensionRangePartitionsSpec || partitionsSpec instanceof DynamicPartitionsSpec)) { - return new CompactionConfigValidationResult( - false, - "Invalid partitionsSpec type[%s] for MSQ engine. Type must be either 'dynamic' or 'range'.", + return CompactionConfigValidationResult.failure( + "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or 'range'", partitionsSpec.getClass().getSimpleName() ); } if (partitionsSpec instanceof DynamicPartitionsSpec && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) { - return new CompactionConfigValidationResult( - false, - "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ engine.", - ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() + return CompactionConfigValidationResult.failure( + "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning" ); } - return new CompactionConfigValidationResult(true, null); + return CompactionConfigValidationResult.success(); } /** @@ -162,12 +159,11 @@ public static CompactionConfigValidationResult validateRollupForMSQ( ) { if (metricsSpec != null && isRollup != null && !isRollup) { - return new CompactionConfigValidationResult( - false, - "rollup in granularitySpec must be set to True if metricsSpec is specifed for MSQ engine." + return CompactionConfigValidationResult.failure( + "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified" ); } - return new CompactionConfigValidationResult(true, null); + return CompactionConfigValidationResult.success(); } /** @@ -179,14 +175,13 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map @@ -206,11 +201,10 @@ public static CompactionConfigValidationResult validateMetricsSpecForMSQ(Aggrega .equals(aggregatorFactory.getName()))) .findFirst() .map(aggregatorFactory -> - new CompactionConfigValidationResult( - false, - "Different name[%s] and fieldName(s)[%s] for aggregator unsupported for MSQ engine.", + CompactionConfigValidationResult.failure( + "MSQ: Different name[%s] and fieldName(s)[%s] for aggregator", aggregatorFactory.getName(), aggregatorFactory.requiredFields() - )).orElse(new CompactionConfigValidationResult(true, null)); + )).orElse(CompactionConfigValidationResult.success()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java index 88eaa3e923a35..d482903e0d6b7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java @@ -23,10 +23,23 @@ public class CompactionConfigValidationResult { + private static final CompactionConfigValidationResult SUCCESS + = new CompactionConfigValidationResult(true, null); + private final boolean valid; private final String reason; - public CompactionConfigValidationResult(boolean valid, String format, Object... args) + public static CompactionConfigValidationResult success() + { + return SUCCESS; + } + + public static CompactionConfigValidationResult failure(String msgFormat, Object... args) + { + return new CompactionConfigValidationResult(false, msgFormat, args); + } + + private CompactionConfigValidationResult(boolean valid, String format, Object... args) { this.valid = valid; this.reason = format == null ? null : StringUtils.format(format, args); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java index 036c53121e91b..15e19cdbd77de 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.common.config.Configs; import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.server.http.CompactionConfigUpdateRequest; import javax.annotation.Nullable; import java.util.List; @@ -54,23 +55,21 @@ public static CoordinatorCompactionConfig from( baseConfig.compactionTaskSlotRatio, baseConfig.maxCompactionTaskSlots, baseConfig.useAutoScaleSlots, - null + baseConfig.compactionEngine ); } public static CoordinatorCompactionConfig from( CoordinatorCompactionConfig baseConfig, - @Nullable Double compactionTaskSlotRatio, - @Nullable Integer maxCompactionTaskSlots, - @Nullable Boolean useAutoScaleSlots + CompactionConfigUpdateRequest update ) { return new CoordinatorCompactionConfig( baseConfig.compactionConfigs, - compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : compactionTaskSlotRatio, - maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots, - useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : useAutoScaleSlots, - null + Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), baseConfig.compactionTaskSlotRatio), + Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), baseConfig.maxCompactionTaskSlots), + Configs.valueOrDefault(update.getUseAutoScaleSlots(), baseConfig.useAutoScaleSlots), + Configs.valueOrDefault(update.getCompactionEngine(), baseConfig.compactionEngine) ); } @@ -90,7 +89,7 @@ public CoordinatorCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine + @JsonProperty("engine") @Nullable CompactionEngine compactionEngine ) { this.compactionConfigs = compactionConfigs; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 767e8218f3195..91af8b6267d84 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder; import org.joda.time.Period; import javax.annotation.Nullable; @@ -42,6 +43,12 @@ public class DataSourceCompactionConfig private final String dataSource; private final int taskPriority; private final long inputSegmentSizeBytes; + + public static DataSourceCompactionConfigBuilder builder() + { + return new DataSourceCompactionConfigBuilder(); + } + /** * The number of input segments is limited because the byte size of a serialized task spec is limited by * org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig.maxZnodeBytes. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java new file mode 100644 index 0000000000000..9cb00dfa614e1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.config; + +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig; +import org.joda.time.Period; + +import java.util.Map; + +public class DataSourceCompactionConfigBuilder +{ + private String dataSource; + private Integer taskPriority; + private Long inputSegmentSizeBytes; + private Integer maxRowsPerSegment; + private Period skipOffsetFromLatest; + private UserCompactionTaskQueryTuningConfig tuningConfig; + private UserCompactionTaskGranularityConfig granularitySpec; + private UserCompactionTaskDimensionsConfig dimensionsSpec; + private AggregatorFactory[] metricsSpec; + private UserCompactionTaskTransformConfig transformSpec; + private UserCompactionTaskIOConfig ioConfig; + private CompactionEngine engine; + private Map taskContext; + + public DataSourceCompactionConfig build() + { + return new DataSourceCompactionConfig( + dataSource, + taskPriority, + inputSegmentSizeBytes, + maxRowsPerSegment, + skipOffsetFromLatest, + tuningConfig, + granularitySpec, + dimensionsSpec, + metricsSpec, + transformSpec, + ioConfig, + engine, + taskContext + ); + } + + public DataSourceCompactionConfigBuilder forDataSource(String dataSource) + { + this.dataSource = dataSource; + return this; + } + + public DataSourceCompactionConfigBuilder withTaskPriority(Integer taskPriority) + { + this.taskPriority = taskPriority; + return this; + } + + public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long inputSegmentSizeBytes) + { + this.inputSegmentSizeBytes = inputSegmentSizeBytes; + return this; + } + + @Deprecated + public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer maxRowsPerSegment) + { + this.maxRowsPerSegment = maxRowsPerSegment; + return this; + } + + public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period skipOffsetFromLatest) + { + this.skipOffsetFromLatest = skipOffsetFromLatest; + return this; + } + + public DataSourceCompactionConfigBuilder withTuningConfig( + UserCompactionTaskQueryTuningConfig tuningConfig + ) + { + this.tuningConfig = tuningConfig; + return this; + } + + public DataSourceCompactionConfigBuilder withGranularitySpec( + UserCompactionTaskGranularityConfig granularitySpec + ) + { + this.granularitySpec = granularitySpec; + return this; + } + + public DataSourceCompactionConfigBuilder withDimensionsSpec( + UserCompactionTaskDimensionsConfig dimensionsSpec + ) + { + this.dimensionsSpec = dimensionsSpec; + return this; + } + + public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[] metricsSpec) + { + this.metricsSpec = metricsSpec; + return this; + } + + public DataSourceCompactionConfigBuilder withTransformSpec( + UserCompactionTaskTransformConfig transformSpec + ) + { + this.transformSpec = transformSpec; + return this; + } + + public DataSourceCompactionConfigBuilder withIoConfig(UserCompactionTaskIOConfig ioConfig) + { + this.ioConfig = ioConfig; + return this; + } + + public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine) + { + this.engine = engine; + return this; + } + + public DataSourceCompactionConfigBuilder withTaskContext(Map taskContext) + { + this.taskContext = taskContext; + return this; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java new file mode 100644 index 0000000000000..72ab23fde8a70 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.CompactionEngine; + +import javax.annotation.Nullable; + +/** + * Payload to update the cluster-level compaction config. + * All fields of this class must be nullable. A non-value indicates that the + * corresponding field is being updated. + */ +public class CompactionConfigUpdateRequest +{ + private final Double compactionTaskSlotRatio; + private final Integer maxCompactionTaskSlots; + private final Boolean useAutoScaleSlots; + private final CompactionEngine compactionEngine; + + @JsonCreator + public CompactionConfigUpdateRequest( + @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, + @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, + @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, + @JsonProperty("compactionEngine") @Nullable CompactionEngine compactionEngine + ) + { + this.compactionTaskSlotRatio = compactionTaskSlotRatio; + this.maxCompactionTaskSlots = maxCompactionTaskSlots; + this.useAutoScaleSlots = useAutoScaleSlots; + this.compactionEngine = compactionEngine; + } + + @Nullable + @JsonProperty + public Double getCompactionTaskSlotRatio() + { + return compactionTaskSlotRatio; + } + + @Nullable + @JsonProperty + public Integer getMaxCompactionTaskSlots() + { + return maxCompactionTaskSlots; + } + + @Nullable + @JsonProperty + public Boolean getUseAutoScaleSlots() + { + return useAutoScaleSlots; + } + + @Nullable + @JsonProperty + public CompactionEngine getCompactionEngine() + { + return compactionEngine; + } + +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index 0bba5cf63fa1b..e8baf82509099 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -40,6 +40,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -69,7 +70,7 @@ public class CoordinatorCompactionConfigsResource { private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class); private static final long UPDATE_RETRY_DELAY = 1000; - static final int UPDATE_NUM_RETRY = 5; + static final int MAX_UPDATE_RETRIES = 5; private final CoordinatorConfigManager configManager; private final AuditManager auditManager; @@ -86,11 +87,46 @@ public CoordinatorCompactionConfigsResource( @GET @Produces(MediaType.APPLICATION_JSON) - public Response getCompactionConfig() + public Response getClusterCompactionConfig() { return Response.ok(configManager.getCurrentCompactionConfig()).build(); } + @POST + @Path("/global") + @Consumes(MediaType.APPLICATION_JSON) + public Response updateClusterCompactionConfig( + CompactionConfigUpdateRequest updatePayload, + @Context HttpServletRequest req + ) + { + UnaryOperator operator = current -> { + final CoordinatorCompactionConfig newConfig = CoordinatorCompactionConfig.from(current, updatePayload); + + final List datasourceConfigs = newConfig.getCompactionConfigs(); + if (CollectionUtils.isNullOrEmpty(datasourceConfigs) + || current.getEngine() == newConfig.getEngine()) { + return newConfig; + } + + // Validate all the datasource configs against the new engine + for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) { + CompactionConfigValidationResult validationResult = + ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, newConfig.getEngine()); + if (!validationResult.isValid()) { + throw InvalidInput.exception( + "Cannot update engine to [%s] as it does not support" + + " compaction config of DataSource[%s]. Reason[%s].", + newConfig.getEngine(), datasourceConfig.getDataSource(), validationResult.getReason() + ); + } + } + + return newConfig; + }; + return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req)); + } + @POST @Path("/taskslots") @Consumes(MediaType.APPLICATION_JSON) @@ -101,19 +137,20 @@ public Response setCompactionTaskLimit( @Context HttpServletRequest req ) { - UnaryOperator operator = - current -> CoordinatorCompactionConfig.from( - current, + return updateClusterCompactionConfig( + new CompactionConfigUpdateRequest( compactionTaskSlotRatio, maxCompactionTaskSlots, - useAutoScaleSlots - ); - return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req)); + useAutoScaleSlots, + null + ), + req + ); } @POST @Consumes(MediaType.APPLICATION_JSON) - public Response addOrUpdateCompactionConfig( + public Response addOrUpdateDatasourceCompactionConfig( final DataSourceCompactionConfig newConfig, @Context HttpServletRequest req ) @@ -144,7 +181,7 @@ public Response addOrUpdateCompactionConfig( @GET @Path("/{dataSource}") @Produces(MediaType.APPLICATION_JSON) - public Response getCompactionConfig(@PathParam("dataSource") String dataSource) + public Response getDatasourceCompactionConfig(@PathParam("dataSource") String dataSource) { final CoordinatorCompactionConfig current = configManager.getCurrentCompactionConfig(); final Map configs = current @@ -233,7 +270,7 @@ private Response updateConfigHelper( int attemps = 0; SetResult setResult = null; try { - while (attemps < UPDATE_NUM_RETRY) { + while (attemps < MAX_UPDATE_RETRIES) { setResult = configManager.getAndUpdateCompactionConfig(configOperator, auditInfo); if (setResult.isOk() || !setResult.isRetryable()) { break; diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index f6d4a2b6e581d..01daa1da4af5f 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -64,8 +64,7 @@ public void testMSQEngineWithHashedPartitionsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "Invalid partitionsSpec type[HashedPartitionsSpec] for MSQ engine." - + " Type must be either 'dynamic' or 'range'.", + "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 'dynamic' or 'range'", validationResult.getReason() ); } @@ -85,7 +84,7 @@ public void testMSQEngineWithMaxTotalRowsIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "maxTotalRows[100] in DynamicPartitionsSpec not supported for MSQ engine.", + "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning", validationResult.getReason() ); } @@ -144,7 +143,7 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "rollup in granularitySpec must be set to True if metricsSpec is specifed for MSQ engine.", + "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified", validationResult.getReason() ); } @@ -167,7 +166,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.", + "MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator", validationResult.getReason() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java new file mode 100644 index 0000000000000..6f39f55279162 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class CoordinatorCompactionConfigTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testSerdeDefaultConfig() throws Exception + { + final CoordinatorCompactionConfig defaultConfig = CoordinatorCompactionConfig.empty(); + final String json = MAPPER.writeValueAsString(defaultConfig); + + CoordinatorCompactionConfig deserialized = MAPPER.readValue(json, CoordinatorCompactionConfig.class); + Assert.assertEquals(defaultConfig, deserialized); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index a9334f077a479..67f276f765120 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -42,9 +42,7 @@ import org.joda.time.Duration; import org.joda.time.Period; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.io.IOException; @@ -52,27 +50,15 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest { private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - @Test public void testSerdeBasic() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - null, - null, - new Period(3600), - null, - null, - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withSkipOffsetFromLatest(new Period(3600)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -90,21 +76,15 @@ public void testSerdeBasic() throws IOException @Test public void testSerdeWithMaxRowsPerSegment() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - 30, - new Period(3600), - null, - null, - null, - null, - null, - null, - CompactionEngine.MSQ, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withMaxRowsPerSegment(30) + .withSkipOffsetFromLatest(new Period(3600)) + .withEngine(CompactionEngine.MSQ) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -121,41 +101,14 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException @Test public void testSerdeWithMaxTotalRows() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - 10000L, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - CompactionEngine.NATIVE, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withEngine(CompactionEngine.NATIVE) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -172,42 +125,14 @@ public void testSerdeWithMaxTotalRows() throws IOException @Test public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - 10000, - new Period(3600), - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - 10000L, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); - + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withMaxRowsPerSegment(10000) + .withSkipOffsetFromLatest(new Period(3600)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -301,21 +226,14 @@ public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws @Test public void testSerdeGranularitySpec() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -332,21 +250,14 @@ public void testSerdeGranularitySpec() throws IOException @Test public void testSerdeGranularitySpecWithQueryGranularity() throws Exception { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -366,21 +277,13 @@ public void testSerdeGranularitySpecWithQueryGranularity() throws Exception @Test public void testSerdeWithNullGranularitySpec() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - null, - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -397,21 +300,14 @@ public void testSerdeWithNullGranularitySpec() throws IOException @Test public void testSerdeGranularitySpecWithNullValues() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(null, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, null, null)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -428,21 +324,14 @@ public void testSerdeGranularitySpecWithNullValues() throws IOException @Test public void testSerdeGranularitySpecWithRollup() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(null, null, true), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, null, true)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -462,21 +351,15 @@ public void testSerdeGranularitySpecWithRollup() throws IOException @Test public void testSerdeIOConfigWithNonNullDropExisting() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - new UserCompactionTaskIOConfig(true), - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null)) + .withIoConfig(new UserCompactionTaskIOConfig(true)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -494,21 +377,15 @@ public void testSerdeIOConfigWithNonNullDropExisting() throws IOException @Test public void testSerdeIOConfigWithNullDropExisting() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - new UserCompactionTaskIOConfig(null), - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null)) + .withIoConfig(new UserCompactionTaskIOConfig(null)) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -526,21 +403,18 @@ public void testSerdeIOConfigWithNullDropExisting() throws IOException @Test public void testSerdeDimensionsSpec() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - null, - new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))), - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withDimensionsSpec( + new UserCompactionTaskDimensionsConfig( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")) + ) + ) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -558,21 +432,14 @@ public void testSerdeDimensionsSpec() throws IOException public void testSerdeTransformSpec() throws IOException { NullHandling.initializeForTests(); - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - null, - null, - null, - new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", "foo", null)), - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withTransformSpec(new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", "foo", null))) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); @@ -589,21 +456,14 @@ public void testSerdeTransformSpec() throws IOException @Test public void testSerdeMetricsSpec() throws IOException { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - null, - null, - new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, - null, - null, - null, - ImmutableMap.of("key", "val") - ); + final DataSourceCompactionConfig config = DataSourceCompactionConfig + .builder() + .forDataSource("dataSource") + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(new Period(3600)) + .withMetricsSpec(new AggregatorFactory[]{new CountAggregatorFactory("cnt")}) + .withTaskContext(ImmutableMap.of("key", "val")) + .build(); final String json = OBJECT_MAPPER.writeValueAsString(config); final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 17db22854779b..5be533c0e4655 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -19,528 +19,529 @@ package org.apache.druid.server.http; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import org.apache.druid.audit.AuditEntry; +import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientMSQContext; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; -import org.apache.druid.error.DruidException; +import org.apache.druid.common.config.TestConfigManagerConfig; import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.CompactionEngine; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataCASUpdate; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.TestMetadataStorageConnector; +import org.apache.druid.metadata.TestMetadataStorageTablesConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder; +import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.skife.jdbi.v2.Handle; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.UnaryOperator; @RunWith(MockitoJUnitRunner.class) public class CoordinatorCompactionConfigsResourceTest { - private static final DataSourceCompactionConfig OLD_CONFIG = new DataSourceCompactionConfig( - "oldDataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); - private static final DataSourceCompactionConfig NEW_CONFIG = new DataSourceCompactionConfig( - "newDataSource", - null, - 500L, - null, - new Period(1800), - null, - new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); - private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3}; - - private static final CoordinatorCompactionConfig ORIGINAL_CONFIG - = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG)); - - private static final String DATASOURCE_NOT_EXISTS = "notExists"; - - @Mock - private JacksonConfigManager mockJacksonConfigManager; + private static final double DELTA = 1e-9; + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); @Mock private HttpServletRequest mockHttpServletRequest; - @Mock - private MetadataStorageConnector mockConnector; - - @Mock - private MetadataStorageTablesConfig mockConnectorConfig; - - @Mock - private AuditManager mockAuditManager; - - private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource; + private TestCoordinatorConfigManager configManager; + private CoordinatorCompactionConfigsResource resource; @Before public void setup() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(OLD_CONFIG_IN_BYTES); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(ORIGINAL_CONFIG); - Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config"); - Mockito.when(mockAuditManager.fetchAuditHistory( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - ArgumentMatchers.any() - ) - ).thenReturn(ImmutableList.of()); - coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource( - new CoordinatorConfigManager(mockJacksonConfigManager, mockConnector, mockConnectorConfig), - mockAuditManager - ); Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123"); + final AuditManager auditManager = new TestAuditManager(); + configManager = TestCoordinatorConfigManager.create(auditManager); + resource = new CoordinatorCompactionConfigsResource(configManager, auditManager); + configManager.delegate.start(); + } + + @After + public void tearDown() + { + configManager.delegate.stop(); } @Test - public void testSetCompactionTaskLimitWithExistingConfig() + public void testGetDefaultClusterConfig() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - - double compactionTaskSlotRatio = 0.5; - int maxCompactionTaskSlots = 9; - Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit( - compactionTaskSlotRatio, - maxCompactionTaskSlots, - true, - mockHttpServletRequest - ); - Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); - Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); - Assert.assertNotNull(newConfigCaptor.getValue()); - Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots); - Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots()); - Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0); + Response response = resource.getClusterCompactionConfig(); + final CoordinatorCompactionConfig defaultConfig + = verifyAndGetPayload(response, CoordinatorCompactionConfig.class); + + Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), DELTA); + Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots()); + Assert.assertFalse(defaultConfig.isUseAutoScaleSlots()); + Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty()); + Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine()); } @Test - public void testAddOrUpdateCompactionConfigWithExistingConfig() + public void testUpdateGlobalConfig() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - - final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true), - null, - null, - null, - null, - CompactionEngine.NATIVE, - ImmutableMap.of("key", "val") - ); - Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - newConfig, + Response response = resource.updateClusterCompactionConfig( + new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ), mockHttpServletRequest ); - Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); - Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); - Assert.assertNotNull(newConfigCaptor.getValue()); - Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size()); - Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0)); - Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(1)); - Assert.assertEquals(newConfig.getEngine(), newConfigCaptor.getValue().getEngine()); + verifyStatus(Response.Status.OK, response); + + final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload( + resource.getClusterCompactionConfig(), + CoordinatorCompactionConfig.class + ); + + Assert.assertNotNull(updatedConfig); + Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA); + Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots()); + Assert.assertTrue(updatedConfig.isUseAutoScaleSlots()); + Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine()); } @Test - public void testDeleteCompactionConfigWithExistingConfig() + public void testSetCompactionTaskLimit() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - final String datasourceName = "dataSource"; - final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig( - datasourceName, - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); - final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete)); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(originalConfig); - - Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig( - datasourceName, - mockHttpServletRequest - ); - Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); - Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); - Assert.assertNotNull(newConfigCaptor.getValue()); - Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size()); + final CoordinatorCompactionConfig defaultConfig + = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + + Response response = resource.setCompactionTaskLimit(0.5, 9, true, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); + + final CoordinatorCompactionConfig updatedConfig + = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + + // Verify that the task slot fields have been updated + Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA); + Assert.assertEquals(9, updatedConfig.getMaxCompactionTaskSlots()); + Assert.assertTrue(updatedConfig.isUseAutoScaleSlots()); + + // Verify that the other fields are unchanged + Assert.assertEquals(defaultConfig.getCompactionConfigs(), updatedConfig.getCompactionConfigs()); + Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine()); + } + + @Test + public void testGetUnknownDatasourceConfigThrowsNotFound() + { + Response response = resource.getDatasourceCompactionConfig(DS.WIKI); + verifyStatus(Response.Status.NOT_FOUND, response); + } + + @Test + public void testAddDatasourceConfig() + { + final DataSourceCompactionConfig newDatasourceConfig + = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(); + Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); + + final DataSourceCompactionConfig fetchedDatasourceConfig + = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class); + Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig); + + final CoordinatorCompactionConfig fullCompactionConfig + = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); + Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @Test - public void testUpdateShouldRetryIfRetryableException() + public void testUpdateDatasourceConfig() { - Mockito.when( - mockJacksonConfigManager.set( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + final DataSourceCompactionConfig originalDatasourceConfig = DataSourceCompactionConfig + .builder() + .forDataSource(DS.WIKI) + .withInputSegmentSizeBytes(500L) + .withSkipOffsetFromLatest(Period.hours(1)) + .withGranularitySpec( + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true) ) - ).thenReturn(ConfigManager.SetResult.retryableFailure(new ISE("retryable"))); + .withEngine(CompactionEngine.NATIVE) + .build(); - coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - NEW_CONFIG, + Response response = resource.addOrUpdateDatasourceCompactionConfig( + originalDatasourceConfig, mockHttpServletRequest ); + verifyStatus(Response.Status.OK, response); + + final DataSourceCompactionConfig updatedDatasourceConfig = DataSourceCompactionConfig + .builder() + .forDataSource(DS.WIKI) + .withInputSegmentSizeBytes(1000L) + .withSkipOffsetFromLatest(Period.hours(3)) + .withGranularitySpec( + new UserCompactionTaskGranularityConfig(Granularities.DAY, null, true) + ) + .withEngine(CompactionEngine.MSQ) + .build(); - // Verify that the update is retried upto the max number of retries - Mockito.verify( - mockJacksonConfigManager, - Mockito.times(CoordinatorCompactionConfigsResource.UPDATE_NUM_RETRY) - ).set( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); + response = resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); + + final DataSourceCompactionConfig latestDatasourceConfig + = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), DataSourceCompactionConfig.class); + Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig); + + final CoordinatorCompactionConfig fullCompactionConfig + = verifyAndGetPayload(resource.getClusterCompactionConfig(), CoordinatorCompactionConfig.class); + Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); + Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @Test - public void testUpdateShouldNotRetryIfNotRetryableException() + public void testDeleteDatasourceConfig() { - Mockito.when( - mockJacksonConfigManager.set( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.failure(new ISE("retryable"))); + final DataSourceCompactionConfig datasourceConfig + = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(); + Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); - coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - NEW_CONFIG, + response = resource.deleteCompactionConfig(DS.WIKI, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); + + response = resource.getDatasourceCompactionConfig(DS.WIKI); + verifyStatus(Response.Status.NOT_FOUND, response); + } + + @Test + public void testDeleteUnknownDatasourceConfigThrowsNotFound() + { + Response response = resource.deleteCompactionConfig(DS.WIKI, mockHttpServletRequest); + verifyStatus(Response.Status.NOT_FOUND, response); + } + + @Test + public void testUpdateIsRetriedIfFailureIsRetryable() + { + configManager.configUpdateResult + = ConfigManager.SetResult.retryableFailure(new Exception("retryable")); + resource.addOrUpdateDatasourceCompactionConfig( + DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(), mockHttpServletRequest ); - // Verify that the update is tried only once - Mockito.verify(mockJacksonConfigManager, Mockito.times(1)).set( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals( + CoordinatorCompactionConfigsResource.MAX_UPDATE_RETRIES, + configManager.numUpdateAttempts ); } @Test - public void testSetCompactionTaskLimitWithoutExistingConfig() + public void testUpdateIsNotRetriedIfFailureIsNotRetryable() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(null); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(CoordinatorCompactionConfig.empty()); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - - double compactionTaskSlotRatio = 0.5; - int maxCompactionTaskSlots = 9; - Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit( - compactionTaskSlotRatio, - maxCompactionTaskSlots, - true, + configManager.configUpdateResult + = ConfigManager.SetResult.failure(new Exception("not retryable")); + resource.addOrUpdateDatasourceCompactionConfig( + DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(), mockHttpServletRequest ); - Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); - Assert.assertNull(oldConfigCaptor.getValue()); - Assert.assertNotNull(newConfigCaptor.getValue()); - Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots); - Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots()); - Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0); + + Assert.assertEquals(1, configManager.numUpdateAttempts); } @Test - public void testAddOrUpdateCompactionConfigWithoutExistingConfig() + public void testGetDatasourceConfigHistory() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(null); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(CoordinatorCompactionConfig.empty()); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - - final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - CompactionEngine.MSQ, - ImmutableMap.of("key", "val") - ); - String author = "maytas"; - String comment = "hello"; - Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - newConfig, - mockHttpServletRequest - ); - Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); - Assert.assertNull(oldConfigCaptor.getValue()); - Assert.assertNotNull(newConfigCaptor.getValue()); - Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size()); - Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(0)); - Assert.assertEquals(newConfig.getEngine(), newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine()); + final DataSourceCompactionConfigBuilder builder + = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI); + + final DataSourceCompactionConfig configV1 = builder.build(); + resource.addOrUpdateDatasourceCompactionConfig(configV1, mockHttpServletRequest); + + final DataSourceCompactionConfig configV2 = builder.withEngine(CompactionEngine.NATIVE).build(); + resource.addOrUpdateDatasourceCompactionConfig(configV2, mockHttpServletRequest); + + final DataSourceCompactionConfig configV3 = builder + .withEngine(CompactionEngine.MSQ) + .withSkipOffsetFromLatest(Period.hours(1)) + .build(); + resource.addOrUpdateDatasourceCompactionConfig(configV3, mockHttpServletRequest); + + Response response = resource.getCompactionConfigHistory(DS.WIKI, null, null); + verifyStatus(Response.Status.OK, response); + + final List history + = (List) response.getEntity(); + Assert.assertEquals(3, history.size()); + Assert.assertEquals(configV1, history.get(0).getCompactionConfig()); + Assert.assertEquals(configV2, history.get(1).getCompactionConfig()); + Assert.assertEquals(configV3, history.get(2).getCompactionConfig()); } @Test - public void testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull() + public void testGetHistoryOfUnknownDatasourceReturnsEmpty() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(null); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(CoordinatorCompactionConfig.empty()); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( - CoordinatorCompactionConfig.class); - Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any() - ) - ).thenReturn(ConfigManager.SetResult.ok()); - - final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - null, - ImmutableMap.of("key", "val") - ); - coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - newConfig, - mockHttpServletRequest - ); - Assert.assertEquals(null, newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine()); + Response response = resource.getCompactionConfigHistory(DS.WIKI, null, null); + verifyStatus(Response.Status.OK, response); + Assert.assertTrue(((List) response.getEntity()).isEmpty()); } @Test - public void testAddOrUpdateCompactionConfigWithInvalidMaxNumTasksForMSQEngine() + public void testAddInvalidDatasourceConfigThrowsBadRequest() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(null); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(CoordinatorCompactionConfig.empty()); - - int maxNumTasks = 1; - - final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - null, - new Period(3600), - null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), - null, - null, - null, - null, - CompactionEngine.MSQ, - ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks) - ); - Response response = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig( - newConfig, - mockHttpServletRequest - ); - Assert.assertEquals(DruidException.Category.INVALID_INPUT.getExpectedStatus(), response.getStatus()); + final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig + .builder() + .forDataSource(DS.WIKI) + .withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1)) + .withEngine(CompactionEngine.MSQ) + .build(); + + final Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.BAD_REQUEST, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); Assert.assertEquals( - "Compaction config not supported. Reason[MSQ context maxNumTasks [1] cannot be less than 2, " - + "since at least 1 controller and 1 worker is necessary.].", + "Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]" + + " must be at least 2 (1 controller + 1 worker)].", ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() ); } @Test - public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist() + public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest() { - Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) - ) - ).thenReturn(null); - Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) - ) - ).thenReturn(CoordinatorCompactionConfig.empty()); - Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig( - DATASOURCE_NOT_EXISTS, + final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig + .builder() + .forDataSource(DS.WIKI) + .withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1)) + .build(); + Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.OK, response); + + response = resource.updateClusterCompactionConfig( + new CompactionConfigUpdateRequest(null, null, null, CompactionEngine.MSQ), mockHttpServletRequest ); - Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), result.getStatus()); + verifyStatus(Response.Status.BAD_REQUEST, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "Cannot update engine to [msq] as it does not support compaction config of DataSource[wiki]." + + " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 worker)].", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); } - @Test - public void testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnEmptyList() + @SuppressWarnings("unchecked") + private T verifyAndGetPayload(Response response, Class type) { - Response response = coordinatorCompactionConfigsResource.getCompactionConfigHistory( - DATASOURCE_NOT_EXISTS, - null, - null - ); Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - Assert.assertTrue(((Collection) response.getEntity()).isEmpty()); + + Assert.assertTrue(type.isInstance(response.getEntity())); + return (T) response.getEntity(); + } + + private void verifyStatus(Response.Status expectedStatus, Response response) + { + Assert.assertEquals(expectedStatus.getStatusCode(), response.getStatus()); + } + + /** + * Test implementation of AuditManager that keeps audit entries in memory. + */ + private static class TestAuditManager implements AuditManager + { + private final List audits = new ArrayList<>(); + + @Override + public void doAudit(AuditEntry event, Handle handle) + { + // do nothing + } + + @Override + public void doAudit(AuditEntry event) + { + final String json; + try { + json = OBJECT_MAPPER.writeValueAsString(event.getPayload().raw()); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + final AuditEntry eventWithSerializedPayload + = AuditEntry.builder() + .key(event.getKey()) + .type(event.getType()) + .auditInfo(event.getAuditInfo()) + .auditTime(event.getAuditTime()) + .request(event.getRequest()) + .serializedPayload(json) + .build(); + audits.add(eventWithSerializedPayload); + } + + @Override + public List fetchAuditHistory(String key, String type, Interval interval) + { + return audits; + } + + @Override + public List fetchAuditHistory(String type, int limit) + { + return audits; + } + + @Override + public List fetchAuditHistory(String type, Interval interval) + { + return audits; + } + + @Override + public List fetchAuditHistory(String key, String type, int limit) + { + return audits; + } + + @Override + public int removeAuditLogsOlderThan(long timestamp) + { + return 0; + } + } + + /** + * Test implementation of CoordinatorConfigManager to track number of update attempts. + */ + private static class TestCoordinatorConfigManager extends CoordinatorConfigManager + { + private final ConfigManager delegate; + private int numUpdateAttempts; + private ConfigManager.SetResult configUpdateResult; + + static TestCoordinatorConfigManager create(AuditManager auditManager) + { + final MetadataStorageTablesConfig tablesConfig = new TestMetadataStorageTablesConfig() + { + @Override + public String getConfigTable() + { + return "druid_config"; + } + }; + + final TestDBConnector dbConnector = new TestDBConnector(); + final ConfigManager configManager = new ConfigManager( + dbConnector, + Suppliers.ofInstance(tablesConfig), + Suppliers.ofInstance(new TestConfigManagerConfig()) + ); + + return new TestCoordinatorConfigManager(configManager, dbConnector, tablesConfig, auditManager); + } + + TestCoordinatorConfigManager( + ConfigManager configManager, + TestDBConnector dbConnector, + MetadataStorageTablesConfig tablesConfig, + AuditManager auditManager + ) + { + super( + new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager), + dbConnector, + tablesConfig + ); + this.delegate = configManager; + } + + @Override + public ConfigManager.SetResult getAndUpdateCompactionConfig( + UnaryOperator operator, + AuditInfo auditInfo + ) + { + ++numUpdateAttempts; + if (configUpdateResult == null) { + return super.getAndUpdateCompactionConfig(operator, auditInfo); + } else { + return configUpdateResult; + } + } + } + + /** + * Test implementation for in-memory insert, lookup and compareAndSwap operations. + */ + private static class TestDBConnector extends TestMetadataStorageConnector + { + private final Map, byte[]> values = new HashMap<>(); + + @Override + public Void insertOrUpdate(String tableName, String keyColumn, String valueColumn, String key, byte[] value) + { + values.put( + Arrays.asList(tableName, keyColumn, valueColumn, key), + value + ); + return null; + } + + @Nullable + @Override + public byte[] lookup(String tableName, String keyColumn, String valueColumn, String key) + { + return values.get(Arrays.asList(tableName, keyColumn, valueColumn, key)); + } + + @Override + public boolean compareAndSwap(List updates) + { + for (MetadataCASUpdate update : updates) { + final List key = Arrays.asList( + update.getTableName(), + update.getKeyColumn(), + update.getValueColumn(), + update.getKey() + ); + + final byte[] currentValue = values.get(key); + if (currentValue == null || Arrays.equals(currentValue, update.getOldValue())) { + values.put(key, update.getNewValue()); + } else { + return false; + } + } + + return true; + } + } + + private static class DS + { + static final String WIKI = "wiki"; } }