diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 5d7b724c8451..5463dd854986 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.rules; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DruidServer; @@ -40,11 +41,14 @@ public abstract class LoadRule implements Rule */ private final boolean useDefaultTierForNull; + private final boolean shouldSegmentBeLoaded; + protected LoadRule(Map tieredReplicants, Boolean useDefaultTierForNull) { this.useDefaultTierForNull = Configs.valueOrDefault(useDefaultTierForNull, true); this.tieredReplicants = handleNullTieredReplicants(tieredReplicants, this.useDefaultTierForNull); validateTieredReplicants(this.tieredReplicants); + this.shouldSegmentBeLoaded = this.tieredReplicants.values().stream().reduce(0, Integer::sum) > 0; } @JsonProperty @@ -65,6 +69,18 @@ public void run(DataSegment segment, SegmentActionHandler handler) handler.replicateSegment(segment, getTieredReplicants()); } + + /** + * @return Whether a segment that matches this rule needs to be loaded on a tier. + * + * Used in making handoff decisions. + */ + @JsonIgnore + public boolean shouldMatchingSegmentBeLoaded() + { + return shouldSegmentBeLoaded; + } + /** * Returns the given {@code tieredReplicants} map unchanged if it is non-null (including empty). * Returns the following default values if the given map is null. @@ -73,10 +89,16 @@ public void run(DataSegment segment, SegmentActionHandler handler) *
  • If {@code useDefaultTierForNull} is false, returns an empty map. This causes segments to have a replication factor of 0 and not get assigned to any historical.
  • * */ - private static Map handleNullTieredReplicants(final Map tieredReplicants, boolean useDefaultTierForNull) + private static Map handleNullTieredReplicants( + final Map tieredReplicants, + boolean useDefaultTierForNull + ) { if (useDefaultTierForNull) { - return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)); + return Configs.valueOrDefault( + tieredReplicants, + ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS) + ); } else { return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of()); } @@ -86,10 +108,17 @@ private static void validateTieredReplicants(final Map tieredRe { for (Map.Entry entry : tieredReplicants.entrySet()) { if (entry.getValue() == null) { - throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value must not be null.", entry.getKey()); + throw InvalidInput.exception( + "Invalid number of replicas for tier [%s]. Value must not be null.", + entry.getKey() + ); } if (entry.getValue() < 0) { - throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value [%d] must be positive.", entry.getKey(), entry.getValue()); + throw InvalidInput.exception( + "Invalid number of replicas for tier [%s]. Value [%d] must be positive.", + entry.getKey(), + entry.getValue() + ); } } } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 301d9631b7d8..e362745a72a1 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -96,6 +96,7 @@ import java.util.stream.Collectors; /** + * */ @Path("/druid/coordinator/v1/datasources") public class DataSourcesResource @@ -186,7 +187,8 @@ private interface MarkSegments @ResourceFilters(DatasourceResourceFilter.class) public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName) { - MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(dataSourceName); + MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource( + dataSourceName); return doMarkSegments("markAsUsedAllNonOvershadowedSegments", dataSourceName, markSegments); } @@ -480,7 +482,8 @@ public Response getDatasourceLoadstatus( return Response.ok( ImmutableMap.of( dataSourceName, - 100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments()) + 100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) + / (double) segmentsLoadStatistics.getNumPublishedSegments()) ) ).build(); } @@ -873,16 +876,14 @@ public Response isHandOffComplete( final DateTime now = DateTimes.nowUtc(); // A segment that is not eligible for load will never be handed off - boolean notEligibleForLoad = true; + boolean eligibleForLoad = false; for (Rule rule : rules) { if (rule.appliesTo(theInterval, now)) { - if (rule instanceof LoadRule) { - notEligibleForLoad = false; - } + eligibleForLoad = rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded(); break; } } - if (notEligibleForLoad) { + if (!eligibleForLoad) { return Response.ok(true).build(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 801df8ebd76d..1e43d89bdda4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -114,6 +114,7 @@ public void testLoadRuleAssignsSegments() final DataSegment segment = createDataSegment(DS_WIKI); LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2)); + Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded()); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); @@ -267,6 +268,7 @@ public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas() .build(); LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0)); + Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded()); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI)); @@ -284,7 +286,7 @@ public void testLoadIgnoresInvalidTiers() final DataSegment segment = createDataSegment(DS_WIKI); LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1)); - + Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded()); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, "invalidTier", DS_WIKI)); @@ -347,6 +349,7 @@ public void testMaxLoadingQueueSize() .build(); final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); + Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded()); CoordinatorRunStats stats1 = runRuleAndGetStats(rule, dataSegment1, params); CoordinatorRunStats stats2 = runRuleAndGetStats(rule, dataSegment2, params); CoordinatorRunStats stats3 = runRuleAndGetStats(rule, dataSegment3, params); @@ -370,6 +373,7 @@ public void testSegmentIsAssignedOnlyToActiveServer() // Load rule requires 1 replica on each tier LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1)); + Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded()); DataSegment segment = createDataSegment(DS_WIKI); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); @@ -427,7 +431,7 @@ public void testDropDuringDecommissioning() DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, segment1, segment2); final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0)); - + Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded()); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment1.getDataSource())); Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1)); @@ -531,6 +535,7 @@ public void testEquals() { EqualsVerifier.forClass(LoadRule.class) .withNonnullFields("tieredReplicants") + .withIgnoredFields("shouldSegmentBeLoaded") .usingGetClass() .verify(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java index 7b1dd2085f02..86ef92a3ed5c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java @@ -280,6 +280,7 @@ public void testEquals() { EqualsVerifier.forClass(PeriodLoadRule.class) .withNonnullFields("tieredReplicants") + .withIgnoredFields("shouldSegmentBeLoaded") .usingGetClass() .verify(); }