Skip to content

Commit

Permalink
LoadRules with 0 replicas should be treated as handoff complete (apac…
Browse files Browse the repository at this point in the history
…he#15274)

* LoadRules with 0 replicas should be treated as handoff complete

* fix it

* pr feedback

* fixit
  • Loading branch information
suneet-s authored Oct 30, 2023
1 parent 3173093 commit e6b7c36
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator.rules;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DruidServer;
Expand All @@ -40,11 +41,14 @@ public abstract class LoadRule implements Rule
*/
private final boolean useDefaultTierForNull;

private final boolean shouldSegmentBeLoaded;

protected LoadRule(Map<String, Integer> tieredReplicants, Boolean useDefaultTierForNull)
{
this.useDefaultTierForNull = Configs.valueOrDefault(useDefaultTierForNull, true);
this.tieredReplicants = handleNullTieredReplicants(tieredReplicants, this.useDefaultTierForNull);
validateTieredReplicants(this.tieredReplicants);
this.shouldSegmentBeLoaded = this.tieredReplicants.values().stream().reduce(0, Integer::sum) > 0;
}

@JsonProperty
Expand All @@ -65,6 +69,18 @@ public void run(DataSegment segment, SegmentActionHandler handler)
handler.replicateSegment(segment, getTieredReplicants());
}


/**
* @return Whether a segment that matches this rule needs to be loaded on a tier.
*
* Used in making handoff decisions.
*/
@JsonIgnore
public boolean shouldMatchingSegmentBeLoaded()
{
return shouldSegmentBeLoaded;
}

/**
* Returns the given {@code tieredReplicants} map unchanged if it is non-null (including empty).
* Returns the following default values if the given map is null.
Expand All @@ -73,10 +89,16 @@ public void run(DataSegment segment, SegmentActionHandler handler)
* <li>If {@code useDefaultTierForNull} is false, returns an empty map. This causes segments to have a replication factor of 0 and not get assigned to any historical.</li>
* </ul>
*/
private static Map<String, Integer> handleNullTieredReplicants(final Map<String, Integer> tieredReplicants, boolean useDefaultTierForNull)
private static Map<String, Integer> handleNullTieredReplicants(
final Map<String, Integer> tieredReplicants,
boolean useDefaultTierForNull
)
{
if (useDefaultTierForNull) {
return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS));
return Configs.valueOrDefault(
tieredReplicants,
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
);
} else {
return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of());
}
Expand All @@ -86,10 +108,17 @@ private static void validateTieredReplicants(final Map<String, Integer> tieredRe
{
for (Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) {
if (entry.getValue() == null) {
throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value must not be null.", entry.getKey());
throw InvalidInput.exception(
"Invalid number of replicas for tier [%s]. Value must not be null.",
entry.getKey()
);
}
if (entry.getValue() < 0) {
throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value [%d] must be positive.", entry.getKey(), entry.getValue());
throw InvalidInput.exception(
"Invalid number of replicas for tier [%s]. Value [%d] must be positive.",
entry.getKey(),
entry.getValue()
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.stream.Collectors;

/**
*
*/
@Path("/druid/coordinator/v1/datasources")
public class DataSourcesResource
Expand Down Expand Up @@ -186,7 +187,8 @@ private interface MarkSegments
@ResourceFilters(DatasourceResourceFilter.class)
public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName)
{
MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(dataSourceName);
MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(
dataSourceName);
return doMarkSegments("markAsUsedAllNonOvershadowedSegments", dataSourceName, markSegments);
}

Expand Down Expand Up @@ -480,7 +482,8 @@ public Response getDatasourceLoadstatus(
return Response.ok(
ImmutableMap.of(
dataSourceName,
100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments())
100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments())
/ (double) segmentsLoadStatistics.getNumPublishedSegments())
)
).build();
}
Expand Down Expand Up @@ -873,16 +876,14 @@ public Response isHandOffComplete(
final DateTime now = DateTimes.nowUtc();

// A segment that is not eligible for load will never be handed off
boolean notEligibleForLoad = true;
boolean eligibleForLoad = false;
for (Rule rule : rules) {
if (rule.appliesTo(theInterval, now)) {
if (rule instanceof LoadRule) {
notEligibleForLoad = false;
}
eligibleForLoad = rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded();
break;
}
}
if (notEligibleForLoad) {
if (!eligibleForLoad) {
return Response.ok(true).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void testLoadRuleAssignsSegments()

final DataSegment segment = createDataSegment(DS_WIKI);
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2));
Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);

Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Expand Down Expand Up @@ -267,6 +268,7 @@ public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas()
.build();

LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0));
Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded());
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);

Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
Expand All @@ -284,7 +286,7 @@ public void testLoadIgnoresInvalidTiers()

final DataSegment segment = createDataSegment(DS_WIKI);
LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));

Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, "invalidTier", DS_WIKI));
Expand Down Expand Up @@ -347,6 +349,7 @@ public void testMaxLoadingQueueSize()
.build();

final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
CoordinatorRunStats stats1 = runRuleAndGetStats(rule, dataSegment1, params);
CoordinatorRunStats stats2 = runRuleAndGetStats(rule, dataSegment2, params);
CoordinatorRunStats stats3 = runRuleAndGetStats(rule, dataSegment3, params);
Expand All @@ -370,6 +373,7 @@ public void testSegmentIsAssignedOnlyToActiveServer()

// Load rule requires 1 replica on each tier
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1));
Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);

Expand Down Expand Up @@ -427,7 +431,7 @@ public void testDropDuringDecommissioning()

DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, segment1, segment2);
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0));

Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded());
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment1.getDataSource()));
Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1));
Expand Down Expand Up @@ -531,6 +535,7 @@ public void testEquals()
{
EqualsVerifier.forClass(LoadRule.class)
.withNonnullFields("tieredReplicants")
.withIgnoredFields("shouldSegmentBeLoaded")
.usingGetClass()
.verify();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public void testEquals()
{
EqualsVerifier.forClass(PeriodLoadRule.class)
.withNonnullFields("tieredReplicants")
.withIgnoredFields("shouldSegmentBeLoaded")
.usingGetClass()
.verify();
}
Expand Down

0 comments on commit e6b7c36

Please sign in to comment.