From 42bc105ff463e661c6216e9e1afbfa421f57f739 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Jul 2024 10:14:39 +0530 Subject: [PATCH] Add API to fetch conflicting locks --- .../druid/indexing/common/TaskLock.java | 6 + .../druid/indexing/overlord/TaskLockbox.java | 107 ++------ .../indexing/overlord/TaskQueryTool.java | 25 +- .../overlord/http/OverlordResource.java | 21 +- .../indexing/overlord/TaskLockboxTest.java | 258 ++---------------- .../overlord/http/OverlordResourceTest.java | 42 ++- .../druid/metadata/LockFilterPolicy.java | 23 +- .../apache/druid/metadata/TaskLockInfo.java | 108 ++++++++ .../druid/rpc/indexing/OverlordClient.java | 9 +- .../rpc/indexing/OverlordClientImpl.java | 9 +- .../coordinator/duty/CompactSegments.java | 20 +- .../client/indexing/NoopOverlordClient.java | 3 +- .../rpc/indexing/OverlordClientImplTest.java | 28 +- .../coordinator/duty/CompactSegmentsTest.java | 38 ++- 14 files changed, 289 insertions(+), 408 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index eb96eb4fed91..e3f5c0a42809 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.LockRequest; +import org.apache.druid.metadata.TaskLockInfo; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -77,4 +78,9 @@ default void assertNotRevoked() .build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval()); } } + + default TaskLockInfo toTaskLockInfo() + { + return new TaskLockInfo(getGranularity().name(), getType().name(), getNonNullPriority(), getInterval()); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 2155ac2c2655..bb033aea9c08 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -40,8 +40,8 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateResult; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -49,6 +49,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -925,11 +926,11 @@ private Set getNonRevokedReplaceLocks(List posse /** * @param lockFilterPolicies Lock filters for the given datasources - * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions + * @return Map from datasource to list of non-revoked lock infos with at least as much priority and an overlapping interval */ - public Map> getLockedIntervals(List lockFilterPolicies) + public Map> getConflictingLockInfos(List lockFilterPolicies) { - final Map> datasourceToIntervals = new HashMap<>(); + final Map> datasourceToLocks = new HashMap<>(); // Take a lock and populate the maps giant.lock(); @@ -943,19 +944,12 @@ public Map> getLockedIntervals(List loc } final int priority = lockFilter.getPriority(); - final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( - lockFilter.getContext().getOrDefault( - Tasks.TASK_LOCK_TYPE, - Tasks.DEFAULT_TASK_LOCK_TYPE - ) - ); - final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( - lockFilter.getContext().getOrDefault( - Tasks.USE_CONCURRENT_LOCKS, - Tasks.DEFAULT_USE_CONCURRENT_LOCKS - ) - ); - final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; + final List intervals; + if (lockFilter.getIntervals() != null) { + intervals = lockFilter.getIntervals(); + } else { + intervals = Collections.singletonList(Intervals.ETERNITY); + } running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( @@ -963,15 +957,17 @@ public Map> getLockedIntervals(List loc taskLockPosse -> { if (taskLockPosse.getTaskLock().isRevoked()) { // do nothing - } else if (ignoreAppendLocks - && TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) { - // do nothing } else if (taskLockPosse.getTaskLock().getPriority() == null || taskLockPosse.getTaskLock().getPriority() < priority) { // do nothing } else { - datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); + for (Interval filterInterval : intervals) { + if (interval.overlaps(filterInterval)) { + datasourceToLocks.computeIfAbsent(datasource, ds -> new ArrayList<>()) + .add(taskLockPosse.getTaskLock().toTaskLockInfo()); + break; + } + } } } ) @@ -984,72 +980,7 @@ public Map> getLockedIntervals(List loc giant.unlock(); } - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); - } - - /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * Here, Segment Locks are being treated the same as Time Chunk Locks i.e. - * a Task with a Segment Lock is assumed to lock a whole Interval and not just - * the corresponding Segment. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. - */ - public Map> getLockedIntervals(Map minTaskPriority) - { - final Map> datasourceToIntervals = new HashMap<>(); - - // Take a lock and populate the maps - giant.lock(); - try { - running.forEach( - (datasource, datasourceLocks) -> { - // If this datasource is not requested, do not proceed - if (!minTaskPriority.containsKey(datasource)) { - return; - } - - datasourceLocks.forEach( - (startTime, startTimeLocks) -> startTimeLocks.forEach( - (interval, taskLockPosses) -> taskLockPosses.forEach( - taskLockPosse -> { - if (taskLockPosse.getTaskLock().isRevoked()) { - // Do not proceed if the lock is revoked - return; - } else if (taskLockPosse.getTaskLock().getPriority() == null - || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) { - // Do not proceed if the lock has a priority strictly less than the minimum - return; - } - - datasourceToIntervals - .computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); - }) - ) - ); - } - ); - } - finally { - giant.unlock(); - } - - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); + return datasourceToLocks; } public void unlock(final Task task, final Interval interval) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index f5351d7c6e51..e5dab8526a91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.Duration; @@ -85,28 +86,12 @@ public TaskQueryTool( } /** - * @param lockFilterPolicies Requests for conflicing lock intervals for various datasources - * @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked + * @param lockFilterPolicies Requests for conflicing locks for various datasources + * @return Map from datasource to conflicting lock infos */ - public Map> getLockedIntervals(List lockFilterPolicies) + public Map> getConflictingLockInfos(List lockFilterPolicies) { - return taskLockbox.getLockedIntervals(lockFilterPolicies); - } - - /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. - */ - public Map> getLockedIntervals(Map minTaskPriority) - { - return taskLockbox.getLockedIntervals(minTaskPriority); + return taskLockbox.getConflictingLockInfos(lockFilterPolicies); } public List> getActiveTaskInfo(@Nullable String dataSource) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 54ada7cb2b43..fa87b415a782 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -241,33 +241,18 @@ public Response isLeader() } } - @Deprecated - @POST - @Path("/lockedIntervals") - @Produces(MediaType.APPLICATION_JSON) - @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervals(Map minTaskPriority) - { - if (minTaskPriority == null || minTaskPriority.isEmpty()) { - return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build(); - } - - // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build(); - } - @POST - @Path("/lockedIntervals/v2") + @Path("/conflictingLocks") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervalsV2(List lockFilterPolicies) + public Response getConflictingLockInfos(List lockFilterPolicies) { if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); } // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); + return Response.ok(taskQueryTool.getConflictingLockInfos(lockFilterPolicies)).build(); } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a02b51087675..d3ce5b702bd8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.indexer.TaskStatus; @@ -42,7 +41,6 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -55,6 +53,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -75,7 +74,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1175,200 +1173,48 @@ public void testGetTimeChunkAndSegmentLockForDifferentGroup() } @Test - public void testGetLockedIntervals() + public void testGetConflictingLockInfos() { - // Acquire locks for task1 - final Task task1 = NoopTask.forDatasource("ds1"); - lockbox.add(task1); - - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-01-01/2017-02-01") - ); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-04-01/2017-05-01") - ); + final Set expectedConflicts = new HashSet<>(); + final TaskLockInfo overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingReplaceLock); - // Acquire locks for task2 - final Task task2 = NoopTask.forDatasource("ds2"); - lockbox.add(task2); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task2, - Intervals.of("2017-03-01/2017-04-01") - ); + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - // Verify the locked intervals - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task1.getDataSource(), 10); - minTaskPriority.put(task2.getDataSource(), 10); - final Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(2, lockedIntervals.size()); + final TaskLockInfo overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) + .toTaskLockInfo(); + expectedConflicts.add(overlappingAppendLock); - Assert.assertEquals( - Arrays.asList( - Intervals.of("2017-01-01/2017-02-01"), - Intervals.of("2017-04-01/2017-05-01") - ), - lockedIntervals.get(task1.getDataSource()) - ); + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-03-01/2017-04-01")), - lockedIntervals.get(task2.getDataSource()) - ); - } + final TaskLockInfo overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingExclusiveLock); - @Test - public void testGetLockedIntervalsForLowPriorityTask() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")) ); - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 10); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertTrue(lockedIntervals.isEmpty()); - } - - @Test - public void testGetLockedIntervalsForEqualPriorityTask() - { - // Acquire lock for a low priority task - final Task task = NoopTask.ofPriority(5); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task.getDataSource(), 5); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList(Intervals.of("2017/2018")), - lockedIntervals.get(task.getDataSource()) - ); - } - - @Test - public void testGetLockedIntervalsForHigherPriorityExclusiveLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 75, + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, null ); - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); + Map> conflictingLocks = + lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, conflictingLocks.size()); + Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } - @Test - public void testGetLockedIntervalsForLowerPriorityExclusiveLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - null - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); - Assert.assertEquals(1, conflictingIntervals.size()); - Assert.assertEquals( - Collections.singletonList(Intervals.of("2017/2018")), - conflictingIntervals.get(task.getDataSource()) - ); - } - - @Test - public void testGetLockedIntervalsForLowerPriorityReplaceLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name()) - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); - } - - @Test - public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - ImmutableMap.of( - Tasks.TASK_LOCK_TYPE, - TaskLockType.EXCLUSIVE.name(), - Tasks.USE_CONCURRENT_LOCKS, - true - ) - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); - } - - @Test public void testExclusiveLockCompatibility() { @@ -1770,50 +1616,6 @@ public void testTimechunkLockTypeTransitionForSameTaskGroup() validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, Intervals.of("2024/2025")); } - @Test - public void testGetLockedIntervalsForRevokedLocks() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 1); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017/2018")), - lockedIntervals.get(lowPriorityTask.getDataSource()) - ); - - // Revoke the lowPriorityTask - final Task highPriorityTask = NoopTask.ofPriority(10); - lockbox.add(highPriorityTask); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - highPriorityTask, - Intervals.of("2017-05-01/2017-06-01") - ); - - // Verify the locked intervals - minTaskPriority.put(highPriorityTask.getDataSource(), 1); - lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-05-01/2017-06-01")), - lockedIntervals.get(highPriorityTask.getDataSource()) - ); - } - @Test public void testFailedToReacquireTaskLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index e6dee0c7e403..4fdf206d449d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -1055,44 +1057,56 @@ public void testGetTaskStatus() throws Exception } @Test - public void testGetLockedIntervals() throws Exception + public void testGetConflictingLockInfos() throws Exception { - final Map minTaskPriority = Collections.singletonMap("ds1", 0); - final Map> expectedLockedIntervals = Collections.singletonMap( + final List lockFilterPolicies = ImmutableList.of( + new LockFilterPolicy("ds1", 25, null) + ); + final Map> expectedLocks = Collections.singletonMap( "ds1", Arrays.asList( - Intervals.of("2012-01-01/2012-01-02"), - Intervals.of("2012-01-02/2012-01-03") + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 25, + Intervals.of("2012-01-01/2012-01-02") + ), + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 75, + Intervals.of("2012-01-01/2012-01-02") + ) ) ); - EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority)) - .andReturn(expectedLockedIntervals); + EasyMock.expect(taskLockbox.getConflictingLockInfos(lockFilterPolicies)) + .andReturn(expectedLocks); replayAll(); - final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority); + final Response response = overlordResource.getConflictingLockInfos(lockFilterPolicies); Assert.assertEquals(200, response.getStatus()); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Map> observedLockedIntervals = jsonMapper.readValue( + Map> observedLocks = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), - new TypeReference>>() + new TypeReference>>() { } ); - Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals); + Assert.assertEquals(expectedLocks, observedLocks); } @Test - public void testGetLockedIntervalsWithEmptyBody() + public void testGetConflictingLockInfosWithEmptyBody() { replayAll(); - Response response = overlordResource.getDatasourceLockedIntervals(null); + Response response = overlordResource.getConflictingLockInfos(null); Assert.assertEquals(400, response.getStatus()); - response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap()); + response = overlordResource.getConflictingLockInfos(Collections.emptyList()); Assert.assertEquals(400, response.getStatus()); } diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 88ab4673aa8a..d4d2541c3696 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -21,8 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -33,18 +36,18 @@ public class LockFilterPolicy { private final String datasource; private final int priority; - private final Map context; + private final List intervals; @JsonCreator public LockFilterPolicy( @JsonProperty("datasource") String datasource, @JsonProperty("priority") int priority, - @JsonProperty("context") Map context + @JsonProperty("intervals") @Nullable List intervals ) { this.datasource = datasource; this.priority = priority; - this.context = context == null ? Collections.emptyMap() : context; + this.intervals = intervals; } @JsonProperty @@ -59,10 +62,18 @@ public int getPriority() return priority; } + @Deprecated @JsonProperty public Map getContext() { - return context; + return Collections.emptyMap(); + } + + @Nullable + @JsonProperty + public List getIntervals() + { + return intervals; } @Override @@ -77,12 +88,12 @@ public boolean equals(Object o) LockFilterPolicy that = (LockFilterPolicy) o; return Objects.equals(datasource, that.datasource) && priority == that.priority - && Objects.equals(context, that.context); + && Objects.equals(intervals, that.intervals); } @Override public int hashCode() { - return Objects.hash(datasource, priority, context); + return Objects.hash(datasource, priority, intervals); } } diff --git a/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java b/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java new file mode 100644 index 000000000000..444ef5fb6ee8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import java.util.Objects; + +/** + * Contains information about an active task lock for a given datasource + */ +public class TaskLockInfo +{ + private final String granularity; + private final String type; + private final int priority; + private final Interval interval; + + @JsonCreator + public TaskLockInfo( + @JsonProperty("granularity") String granularity, + @JsonProperty("type") String type, + @JsonProperty("priority") int priority, + @JsonProperty("interval") Interval interval + ) + { + this.granularity = granularity; + this.type = type; + this.priority = priority; + this.interval = interval; + } + + @JsonProperty + public String getGranularity() + { + return granularity; + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public int getPriority() + { + return priority; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskLockInfo that = (TaskLockInfo) o; + return Objects.equals(granularity, that.granularity) + && Objects.equals(type, that.type) + && priority == that.priority + && Objects.equals(interval, that.interval); + } + + @Override + public int hashCode() + { + return Objects.hash(granularity, type, priority, interval); + } + + @Override + public String toString() + { + return "TaskLockInfo{" + + "granularity=" + granularity + + ", type=" + type + + ", interval=" + interval + + ", priority=" + priority + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 310684206d28..925408664071 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -180,15 +181,13 @@ ListenableFuture> taskStatuses( ListenableFuture> supervisorStatuses(); /** - * Returns a list of intervals locked by higher priority conflicting lock types + * Returns a list of Locks of higher priority with conflicting intervals * * @param lockFilterPolicies List of all filters for different datasources - * @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with + * @return Map from datasource name to list of locks held by tasks that have conflicting intervals with * priority greater than or equal to the {@code minTaskPriority} for that datasource. */ - ListenableFuture>> findLockedIntervals( - List lockFilterPolicies - ); + ListenableFuture>> findConflictingLockInfos(List lockFilterPolicies); /** * Deletes pending segment records from the metadata store for a particular datasource. Records with diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 3e3d86ca5f25..b539f6955692 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; @@ -190,11 +191,11 @@ public ListenableFuture taskStatus(final String taskId) } @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { - final String path = "/druid/indexer/v1/lockedIntervals/v2"; + final String path = "/druid/indexer/v1/conflictingLocks"; return FutureUtils.transform( client.asyncRequest( @@ -203,10 +204,10 @@ public ListenableFuture>> findLockedIntervals( new BytesFullResponseHandler() ), holder -> { - final Map> response = JacksonUtils.readValue( + final Map> response = JacksonUtils.readValue( jsonMapper, holder.getContent(), - new TypeReference>>() {} + new TypeReference>>() {} ); return response == null ? Collections.emptyMap() : response; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 01f3bc77e9ee..8933ff53c370 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -67,8 +68,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -272,16 +275,25 @@ private boolean cancelTaskIfGranularityChanged( * higher priority Task * */ - private Map> getLockedIntervals( + private Map> getLockedIntervals( List compactionConfigs ) { final List lockFilterPolicies = compactionConfigs .stream() - .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext())) + .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null)) .collect(Collectors.toList()); - final Map> datasourceToLockedIntervals = - new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); + final Map> datasourceToLocks = + FutureUtils.getUnchecked(overlordClient.findConflictingLockInfos(lockFilterPolicies), true); + final Map> datasourceToLockedIntervals = new HashMap<>(); + for (Map.Entry> locks : datasourceToLocks.entrySet()) { + final String datasource = locks.getKey(); + datasourceToLockedIntervals.put(datasource, new HashSet<>()); + for (TaskLockInfo lock : locks.getValue()) { + datasourceToLockedIntervals.get(datasource) + .add(lock.getInterval()); + } + } LOG.debug( "Skipping the following intervals for Compaction as they are currently locked: %s", datasourceToLockedIntervals diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 10ebeb53af26..8832998239fe 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.joda.time.Interval; @@ -97,7 +98,7 @@ public ListenableFuture> supervisorStatuses( } @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 8c3b867e3681..49e5495eae1c 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; @@ -53,7 +54,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; -import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -220,16 +220,26 @@ public void test_taskStatuses_null_null_zero() throws Exception } @Test - public void test_findLockedIntervals() throws Exception + public void test_findConflictingLockInfos() throws Exception { - final Map> lockMap = - ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001"))); + final Map> lockMap = + ImmutableMap.of( + "foo", + Collections.singletonList( + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2000/2001") + ) + ) + ); final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -238,19 +248,19 @@ public void test_findLockedIntervals() throws Exception Assert.assertEquals( lockMap, - overlordClient.findLockedIntervals(requests).get() + overlordClient.findConflictingLockInfos(requests).get() ); } @Test - public void test_findLockedIntervals_nullReturn() throws Exception + public void test_findConflictingLockInfos_nullReturn() throws Exception { final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -259,7 +269,7 @@ public void test_findLockedIntervals_nullReturn() throws Exception Assert.assertEquals( Collections.emptyMap(), - overlordClient.findLockedIntervals(requests).get() + overlordClient.findConflictingLockInfos(requests).get() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 236cfaf7da54..f8c196de48f4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -61,6 +61,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -1137,7 +1138,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() .thenReturn( Futures.immediateFuture( CloseableIterators.withEmptyBaggage(ImmutableList.of(runningConflictCompactionTask).iterator()))); - Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.cancelTask(conflictTaskId)) .thenReturn(Futures.immediateFuture(null)); @@ -1229,20 +1230,35 @@ public void testRunWithLockedIntervals() // Lock all intervals for dataSource_1 and dataSource_2 final String datasource1 = DATA_SOURCE_PREFIX + 1; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource1, k -> new ArrayList<>()) - .add(Intervals.of("2017/2018")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017/2018") + )); final String datasource2 = DATA_SOURCE_PREFIX + 2; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource2, k -> new ArrayList<>()) - .add(Intervals.of("2017/2018")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017/2018") + )); // Lock all intervals but one for dataSource_0 final String datasource0 = DATA_SOURCE_PREFIX + 0; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource0, k -> new ArrayList<>()) - .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017-01-01T13:00:00Z/2017-02-01") + )); // Verify that locked intervals are skipped and only one compaction task // is submitted for dataSource_0 @@ -2026,7 +2042,7 @@ private class TestOverlordClient extends NoopOverlordClient private final ObjectMapper jsonMapper; // Map from Task Id to the intervals locked by that task - private final Map> lockedIntervals = new HashMap<>(); + private final Map> conflictingLockInfos = new HashMap<>(); // List of submitted compaction tasks for verification in the tests private final List submittedCompactionTasks = new ArrayList<>(); @@ -2069,11 +2085,11 @@ public ListenableFuture runTask(String taskId, Object taskObject) @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { - return Futures.immediateFuture(lockedIntervals); + return Futures.immediateFuture(conflictingLockInfos); } @Override @@ -2262,7 +2278,7 @@ private static ArgumentCaptor setUpMockClient(final OverlordClient mockC final ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(Object.class); Mockito.when(mockClient.taskStatuses(null, null, 0)) .thenReturn(Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator()))); - Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.getTotalWorkerCapacity()) .thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0)));