Skip to content

Commit

Permalink
Add separate test classes. Do some code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekrb19 committed Sep 27, 2023
1 parent 320cd90 commit d729fdf
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -68,7 +70,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), referenceTimestamp.plus(period))
return includeFuture ? new Interval(referenceTimestamp.minus(period), DateTimes.utc(JodaUtils.MAX_INSTANT))
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -69,8 +68,14 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
// return new Interval(DateTimes.utc(JodaUtils.MIN_INSTANT), referenceTimestamp.minus(period));
return new Interval(DateTimes.utc(Long.MIN_VALUE), referenceTimestamp.minus(period));
final DateTime end = referenceTimestamp.minus(period);
if (end.isBefore(DateTimes.MIN)) {
// We use Long.MIN_VALUE as the start here (instead of DateTimes.MIN) when end is < DateTimes.MIN because the
// resulting interval will be invalid where start > end. This is true for referenceTimestamp = DateTimes.MIN.
return new Interval(DateTimes.utc(Long.MIN_VALUE), end);
} else {
return new Interval(DateTimes.MIN, end);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -87,7 +86,7 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), DateTimes.utc(JodaUtils.MAX_INSTANT))
return includeFuture ? new Interval(referenceTimestamp.minus(period), DateTimes.MAX)
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -88,7 +87,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
@Override
public Interval getEligibleInterval(DateTime referenceTimestamp)
{
return includeFuture ? new Interval(referenceTimestamp.minus(period), DateTimes.utc(JodaUtils.MAX_INSTANT))
return includeFuture ? new Interval(referenceTimestamp.minus(period), DateTimes.MAX)
: new Interval(referenceTimestamp.minus(period), referenceTimestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public interface Rule
void run(DataSegment segment, SegmentActionHandler segmentHandler);

/**
* Return an eligible interval from the reference timestamp. Implementations
* must return a valid interval based on the rule type.
* @param referenceTimestamp base timestamp
* Returns the interval eligible for this rule. The interval must be computed based on the rule type
* optionally using {@code referenceTimestamp}. {@code referenceTimestamp} must be a timestamp
* between [{@link org.apache.druid.java.util.common.DateTimes.MIN}, {@link org.apache.druid.java.util.common.DateTimes.MAX}).
*/
Interval getEligibleInterval(DateTime referenceTimestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,22 @@ private Rules()
}

/**
* Validate rules. This method throws an exception if a rule contain an interval that
* fully covers another subsequent rules' interval in the list. Rules that will be evaluated at some point
* are considered to be legitimate.
* @param rules Datasource rules.
* Validates the given list of retention rules for a datasource (or cluster default).
* A rule is considered valid only if it will be evaluated at some point, i.e. its eligible interval
* is not fully covered by the eligible interval of any preceding rule in the list.
* <p>
* Consider two rules r1 and r2. Assume r1 and r2's eligible intervals at the time when the rules are evaluated are
* i1 and i2 respectively. r1 and r2 are invalid if:
* <ul>
* <li> i1 is eternity. i.e., eternity fully covers i2 and any other interval that follows it. Or </li>
* <li> i1 fully contains i2 and </li>
* <li> r1's eligible interval at i1's start and end fully contain r2's eligible interval at i1's start and end
* respectively. This boundary check is used to identify rules that will fire at some point. i.e., period based rules
* will return distinct eligible intervals at the boundaries, whereas broadcast and interval based rules will return
* fixed intervals regardless of the boundary. </li>
* </ul>
* </p>
* @throws org.apache.druid.error.DruidException with error code "invalidInput" if any of the given rules is not valid.
*/
public static void validateRules(final List<Rule> rules)
{
Expand All @@ -70,9 +82,6 @@ public static void validateRules(final List<Rule> rules)
final Rule nextRule = rules.get(j);
final Interval nextInterval = nextRule.getEligibleInterval(now);
if (currInterval.contains(nextInterval)) {
// If the current rule has eternity, it covers everything following it.
// Or if the current rule still covers the next rule at the current interval boundaries, then the
// next rule will never fire at any time, so throw an exception.
if (Intervals.ETERNITY.equals(currInterval) ||
(currRule.getEligibleInterval(currInterval.getStart())
.contains(nextRule.getEligibleInterval(currInterval.getStart()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.rules;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class RulesEligibleIntervalTest
{
private final DateTime referenceTime;

@Parameterized.Parameters
public static Object[] getReferenceTimestamps()
{
final DateTime now = DateTimes.nowUtc();
return new Object[]{
now,
now.minusYears(5),
now.plusYears(10),
DateTimes.utc(0),
DateTimes.utc(JodaUtils.MIN_INSTANT + 1),
DateTimes.utc(JodaUtils.MIN_INSTANT),
DateTimes.utc(JodaUtils.MAX_INSTANT - 1),
DateTimes.utc(JodaUtils.MAX_INSTANT)
};
}

public RulesEligibleIntervalTest(final DateTime referenceTimestamp)
{
this.referenceTime = referenceTimestamp;
}

@Test
public void testPeriodLoadEligibleInterval()
{
final Period period = new Period("P50Y");
final PeriodLoadRule loadPT1H = new PeriodLoadRule(
period,
true,
null,
null
);
Assert.assertEquals(
new Interval(this.referenceTime.minus(period), DateTimes.MAX),
loadPT1H.getEligibleInterval(this.referenceTime)
);
}

@Test
public void testPeriodLoadExcludingFutureEligibleInterval()
{
final Period period = new Period("PT1H");
final PeriodLoadRule rule = new PeriodLoadRule(
period,
false,
null,
null
);
Assert.assertEquals(new Interval(period, this.referenceTime), rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testIntervalLoadEligibleInterval()
{
final Interval interval = Intervals.of("2000/3000");
final IntervalLoadRule rule = new IntervalLoadRule(
interval,
null,
null
);
Assert.assertEquals(interval, rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testForeverLoadEligibleInterval()
{
final ForeverLoadRule rule = new ForeverLoadRule(ImmutableMap.of(), false);
Assert.assertEquals(Intervals.ETERNITY, rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testPeriodDropEligibleInterval()
{
final Period period = new Period("P5000Y");
final PeriodDropRule rule = new PeriodDropRule(
period,
true
);
Assert.assertEquals(
new Interval(this.referenceTime.minus(period), DateTimes.utc(JodaUtils.MAX_INSTANT)),
rule.getEligibleInterval(this.referenceTime)
);
}

@Test
public void testPeriodDropExcludingFutureEligibleInterval()
{
final Period period = new Period("P50Y");
final PeriodDropRule rule = new PeriodDropRule(
period,
false
);
Assert.assertEquals(
new Interval(this.referenceTime.minus(period), this.referenceTime),
rule.getEligibleInterval(this.referenceTime)
);
}

@Test
public void testPeriodDropBeforeEligibleInterval()
{
final Period period = new Period("P50Y");
final PeriodDropBeforeRule rule = new PeriodDropBeforeRule(period);

if (this.referenceTime.minus(period).isBefore(DateTimes.MIN)) {
Assert.assertEquals(
new Interval(DateTimes.utc(Long.MIN_VALUE), this.referenceTime.minus(period)),
rule.getEligibleInterval(this.referenceTime)
);
} else {
Assert.assertEquals(
new Interval(DateTimes.MIN, this.referenceTime.minus(period)),
rule.getEligibleInterval(this.referenceTime)
);
}
}

@Test
public void testForeverDropEligibleInterval()
{
final ForeverDropRule rule = new ForeverDropRule();
Assert.assertEquals(Intervals.ETERNITY, rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testPeriodBroadcastEligibleInterval()
{
final Period period = new Period("P15Y");
final PeriodBroadcastDistributionRule rule = new PeriodBroadcastDistributionRule(period, true);
Assert.assertEquals(
new Interval(referenceTime.minus(period), DateTimes.MAX),
rule.getEligibleInterval(referenceTime)
);
}

@Test
public void testPeriodBroadcastExcludingFutureEligibleInterval()
{
final Period period = new Period("P15Y");
final PeriodBroadcastDistributionRule rule = new PeriodBroadcastDistributionRule(period, false);
Assert.assertEquals(new Interval(period, this.referenceTime), rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testForeverBroadcastEligibleInterval()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
Assert.assertEquals(Intervals.ETERNITY, rule.getEligibleInterval(this.referenceTime));
}

@Test
public void testIntervalBroadcastEligibleInterval()
{
final Interval interval = Intervals.of("1993/2070");
final IntervalBroadcastDistributionRule rule = new IntervalBroadcastDistributionRule(interval);
Assert.assertEquals(interval, rule.getEligibleInterval(this.referenceTime));
}
}
Loading

0 comments on commit d729fdf

Please sign in to comment.