Skip to content

Commit

Permalink
Fail concurrent replace tasks with finer segment granularity than app…
Browse files Browse the repository at this point in the history
…end (apache#17265)
  • Loading branch information
AmatyaAvadhanula authored Oct 8, 2024
1 parent 5d7c7a8 commit f42ecc9
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
private static final Interval YEAR_23 = Intervals.of("2023/2024");
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
private static final Interval JAN_FEB_MAR_23 = Intervals.of("2023-01-01/2023-04-01");
private static final Interval APR_MAY_JUN_23 = Intervals.of("2023-04-01/2023-07-01");
private static final Interval JUL_AUG_SEP_23 = Intervals.of("2023-07-01/2023-10-01");
private static final Interval OCT_NOV_DEC_23 = Intervals.of("2023-10-01/2024-01-01");
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");

Expand Down Expand Up @@ -599,6 +602,185 @@ public void testAllocateLockReplaceDayAppendMonth()
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
}

@Test
public void testLockReplaceQuarterAllocateAppendYear()
{
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertTrue(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);
verifyIntervalHasUsedSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);

final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(JAN_FEB_MAR_23, pendingSegment.getInterval());
Assert.assertEquals(replaceLock.getVersion(), pendingSegment.getVersion());

final DataSegment appendedSegment = asSegment(pendingSegment);
appendTask.commitAppendSegments(appendedSegment);

verifyIntervalHasUsedSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
verifyIntervalHasVisibleSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
}

@Test
public void testLockAllocateAppendYearReplaceQuarter()
{
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());

final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertFalse(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
}

@Test
public void testLockAllocateReplaceQuarterAppendYear()
{
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertFalse(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);

final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
}

@Test
public void testAllocateLockReplaceQuarterAppendYear()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());

final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertFalse(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);

final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
}

@Test
public void testAllocateLockAppendYearReplaceQuarter()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());

final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertFalse(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
}

@Test
public void testAllocateAppendLockYearReplaceQuarter()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());

final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);

final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
Assert.assertNotNull(replaceLock);

final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());

Assert.assertTrue(
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
.isSuccess()
);

verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
}

@Test
public void testAllocateAppendMonthLockReplaceDay()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
Expand Down Expand Up @@ -900,7 +901,15 @@ private boolean shouldUpgradePendingSegment(
} else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
return false;
} else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
return false;
final SegmentId pendingSegmentId = pendingSegment.getId().asSegmentId();
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build(
"Replacing with a finer segment granularity than a concurrent append is unsupported."
+ " Cannot upgrade pendingSegment[%s] to version[%s] as the replace interval[%s]"
+ " does not fully contain the pendingSegment interval[%s].",
pendingSegmentId, replaceVersion, replaceInterval, pendingSegmentId.getInterval()
);
} else {
// Do not upgrade already upgraded pending segment
return pendingSegment.getSequenceName() == null
Expand Down Expand Up @@ -2200,10 +2209,16 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
newInterval = replaceInterval;
break;
} else if (replaceInterval.overlaps(oldInterval)) {
throw new ISE(
"Incompatible segment intervals for commit: [%s] and [%s].",
oldInterval, replaceInterval
);
final String conflictingSegmentId = oldSegment.getId().toString();
final String upgradeVersion = upgradeSegmentToLockVersion.get(conflictingSegmentId);
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build(
"Replacing with a finer segment granularity than a concurrent append is unsupported."
+ " Cannot upgrade segment[%s] to version[%s] as the replace interval[%s]"
+ " does not fully contain the pending segment interval[%s].",
conflictingSegmentId, upgradeVersion, replaceInterval, oldInterval
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,73 @@ public void testCommitAppendSegments()
Assert.assertEquals(replaceLock.getVersion(), Iterables.getOnlyElement(observedLockVersions));
}

@Test
public void testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
{
final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
final PendingSegmentRecord pendingSegmentForInterval = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"foo",
Intervals.of("2023-01-01/2024-01-01"),
"2023-01-02",
new NumberedShardSpec(100, 0)
),
"",
"",
null,
"append"
);
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
"2023-01-0" + i,
ImmutableMap.of("path", "a-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
segmentsAppendedWithReplaceLock.add(segment);
appendedSegmentToReplaceLockMap.put(segment, replaceLock);
}

segmentSchemaTestUtils.insertUsedSegments(segmentsAppendedWithReplaceLock, Collections.emptyMap());
derbyConnector.retryWithHandle(
handle -> coordinator.insertPendingSegmentsIntoMetastore(
handle,
ImmutableList.of(pendingSegmentForInterval),
"foo",
true
)
);
insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap, derbyConnectorRule.metadataTablesConfigSupplier().get());

final Set<DataSegment> replacingSegments = new HashSet<>();
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
Intervals.of("2023-01-01/2023-02-01"),
"2023-02-01",
ImmutableMap.of("path", "b-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NumberedShardSpec(i, 9),
9,
100
);
replacingSegments.add(segment);
}

Assert.assertFalse(
coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock), null)
.isSuccess()
);
}

@Test
public void testCommitReplaceSegments()
{
Expand Down

0 comments on commit f42ecc9

Please sign in to comment.