Skip to content

Commit

Permalink
Fix batch segment allocation failure with replicas (apache#17262)
Browse files Browse the repository at this point in the history
Fixes apache#16587

Streaming ingestion tasks operate by allocating segments before ingesting rows.
These allocations happen across replicas which may send different requests but
must get the same segment id for a given (datasource, interval, version, sequenceName)
across replicas.

This patch fixes the bug by ignoring the previousSegmentId when skipLineageCheck is true.
  • Loading branch information
AmatyaAvadhanula authored Oct 7, 2024
1 parent 6a4352f commit ff97c67
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
Expand Down Expand Up @@ -61,11 +63,18 @@
import org.junit.runners.Parameterized;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -122,6 +131,63 @@ public void tearDown()
}
}

@Test
public void testManySegmentsSameInterval_noLineageCheck() throws Exception
{
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}

final Task task = NoopTask.create();
final int numTasks = 2;
final int numRequests = 200;

taskActionTestKit.getTaskLockbox().add(task);

ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d");

final List<Callable<SegmentIdWithShardSpec>> allocateTasks = new ArrayList<>();
for (int i = 0; i < numRequests; i++) {
final String sequence = "sequence_" + (i % numTasks);
allocateTasks.add(() -> allocateWithoutLineageCheck(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
sequence,
TaskLockType.APPEND
));
}

Set<SegmentIdWithShardSpec> allocatedIds = new HashSet<>();
for (Future<SegmentIdWithShardSpec> future : allocatorService.invokeAll(allocateTasks)) {
allocatedIds.add(future.get());
}

Thread.sleep(1_000);
for (Future<SegmentIdWithShardSpec> future : allocatorService.invokeAll(allocateTasks)) {
allocatedIds.add(future.get());
}


final TaskLock lock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(input -> input.getInterval().contains(PARTY_TIME))
);
Set<SegmentIdWithShardSpec> expectedIds = new HashSet<>();
for (int i = 0; i < numTasks; i++) {
expectedIds.add(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
lock.getVersion(),
new NumberedShardSpec(i, 0)
)
);
}
Assert.assertEquals(expectedIds, allocatedIds);
}

@Test
public void testManySegmentsSameInterval()
{
Expand Down Expand Up @@ -1122,6 +1188,41 @@ private SegmentIdWithShardSpec allocate(
);
}

private SegmentIdWithShardSpec allocateWithoutLineageCheck(
final Task task,
final DateTime timestamp,
final Granularity queryGranularity,
final Granularity preferredSegmentGranularity,
final String sequenceName,
final TaskLockType taskLockType
)
{
final SegmentAllocateAction action = new SegmentAllocateAction(
DATA_SOURCE,
timestamp,
queryGranularity,
preferredSegmentGranularity,
sequenceName,
// prevSegmentId can vary across replicas and isn't deterministic
"random_" + ThreadLocalRandom.current().nextInt(),
true,
NumberedPartialShardSpec.instance(),
lockGranularity,
taskLockType
);

try {
if (useBatch) {
return action.performAsync(task, taskActionTestKit.getTaskActionToolbox()).get();
} else {
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,8 @@ public UniqueAllocateRequest(
{
this.interval = interval;
this.sequenceName = request.getSequenceName();
this.previousSegmentId = request.getPreviousSegmentId();
// Even if the previousSegmentId is set, disregard it when skipping lineage check for streaming ingestion
this.previousSegmentId = skipSegmentLineageCheck ? null : request.getPreviousSegmentId();
this.skipSegmentLineageCheck = skipSegmentLineageCheck;

this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, skipSegmentLineageCheck);
Expand Down

0 comments on commit ff97c67

Please sign in to comment.