From 08b5a8b88e942f0c7393a61f5cdbc6959924886e Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Mon, 22 Apr 2024 23:26:45 +0530 Subject: [PATCH] Ignore append locks for compaction when using concurrent locks (#16316) * Ignore append locks for compaction when using concurrent locks --- .../druid/indexing/overlord/TaskLockbox.java | 15 ++++++++-- .../indexing/overlord/TaskLockboxTest.java | 28 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 35ec79d74ec7..7248fcab865e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -960,8 +960,19 @@ public Map> getLockedIntervals(List loc } final int priority = lockFilter.getPriority(); - final boolean ignoreAppendLocks = - TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE)); + final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( + lockFilter.getContext().getOrDefault( + Tasks.TASK_LOCK_TYPE, + Tasks.DEFAULT_TASK_LOCK_TYPE + ) + ); + final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( + lockFilter.getContext().getOrDefault( + Tasks.USE_CONCURRENT_LOCKS, + Tasks.DEFAULT_USE_CONCURRENT_LOCKS + ) + ); + final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 999d4d0abb2e..ab4bf3a504fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1325,6 +1325,34 @@ public void testGetLockedIntervalsForLowerPriorityReplaceLock() Assert.assertTrue(conflictingIntervals.isEmpty()); } + @Test + public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.EXCLUSIVE.name(), + Tasks.USE_CONCURRENT_LOCKS, + true + ) + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + @Test public void testExclusiveLockCompatibility()