Skip to content

Commit

Permalink
Commit segments only when they are covered by active locks (apache#15027
Browse files Browse the repository at this point in the history
)

* Commit segments only when they are covered by active locks
  • Loading branch information
AmatyaAvadhanula authored Sep 25, 2023
1 parent 48b6d2a commit f7a5491
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class TaskLocks
{
Expand Down Expand Up @@ -93,6 +94,12 @@ public static String defaultLockVersion(TaskLockType lockType)
: DateTimes.nowUtc().toString();
}

/**
* Checks if the segments are covered by a non revoked lock
* @param taskLockMap task locks for a task
* @param segments segments to be checked
* @return true if each of the segments is covered by a non-revoked lock
*/
public static boolean isLockCoversSegments(
NavigableMap<DateTime, List<TaskLock>> taskLockMap,
Collection<DataSegment> segments
Expand All @@ -105,7 +112,11 @@ public static boolean isLockCoversSegments(
return false;
}

final List<TaskLock> locks = entry.getValue();
// taskLockMap may contain revoked locks which need to be filtered
final List<TaskLock> locks = entry.getValue()
.stream()
.filter(lock -> !lock.isRevoked())
.collect(Collectors.toList());
return locks.stream().anyMatch(
lock -> {
if (lock.getGranularity() == LockGranularity.TIME_CHUNK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ private void revokeLock(TaskLockPosse lockPosse)
* @param lock lock to be revoked
*/
@VisibleForTesting
protected void revokeLock(String taskId, TaskLock lock)
public void revokeLock(String taskId, TaskLock lock)
{
giant.lock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand All @@ -34,6 +35,7 @@
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -62,11 +64,13 @@ public class TaskLocksTest
@Before
public void setup()
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(
new HeapMemoryTaskStorage(new TaskStorageConfig(null)),
taskStorage,
new TestIndexerMetadataStorageCoordinator()
);
task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
}

Expand Down Expand Up @@ -296,6 +300,19 @@ public void testFindSegmentLocksForSegments()
);
}

@Test
public void testRevokedLocksDoNotCoverSegments()
{
final Set<DataSegment> segments = createNumberedPartitionedSegments();
final Interval interval = Intervals.of("2017-01-01/2017-01-02");

final TaskLock lock = tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE);
Assert.assertTrue(TaskLocks.isLockCoversSegments(task, lockbox, segments));

lockbox.revokeLock(task.getId(), lock);
Assert.assertFalse(TaskLocks.isLockCoversSegments(task, lockbox, segments));
}

@Test
public void testFindReplaceLocksCoveringSegments()
{
Expand Down

0 comments on commit f7a5491

Please sign in to comment.