diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 1cb365ce1b5e..d22d88fa463d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -46,7 +46,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; @@ -960,10 +960,10 @@ private Set getNonRevokedReplaceLocks(List posse } /** - * @param conflictingLockRequests Requests for conflicing lock intervals for various datasources - * @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked + * @param lockFilterPolicies Lock filters for the given datasources + * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions */ - public Map> getConflictingLockIntervals(List conflictingLockRequests) + public Map> getLockedIntervalsV2(List lockFilterPolicies) { final Map> datasourceToIntervals = new HashMap<>(); @@ -971,12 +971,12 @@ public Map> getConflictingLockIntervals(List { - final String datasource = conflictingLockRequest.getDatasource(); - final int priority = conflictingLockRequest.getPriority(); + lockFilterPolicies.forEach( + lockFilter -> { + final String datasource = lockFilter.getDatasource(); + final int priority = lockFilter.getPriority(); final boolean ignoreAppendLocks = - TaskLockType.REPLACE.name().equals(conflictingLockRequest.getContext().get(Tasks.TASK_LOCK_TYPE)); + TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE)); if (!running.containsKey(datasource)) { return; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 6730e363858f..ba984401d3c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -28,7 +28,7 @@ import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -62,12 +62,12 @@ public List getActiveTasks() } /** - * @param conflictingLockRequests Requests for conflicing lock intervals for various datasources + * @param lockFilterPolicies Requests for conflicing lock intervals for various datasources * @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked */ - public Map> getConflictingLockIntervals(List conflictingLockRequests) + public Map> getLockedIntervalsV2(List lockFilterPolicies) { - return taskLockbox.getConflictingLockIntervals(conflictingLockRequests); + return taskLockbox.getLockedIntervalsV2(lockFilterPolicies); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 06491a47ad53..ff3f7e23b6c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -63,7 +63,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -277,17 +277,17 @@ public Response getDatasourceLockedIntervals(Map minTaskPriorit } @POST - @Path("/conflictingLockIntervals") + @Path("/getLockedIntervals/v2") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getConflictingLockIntervals(List conflictingLockRequests) + public Response getDatasourceLockedIntervalsV2(List lockFilterPolicies) { - if (conflictingLockRequests == null || conflictingLockRequests.isEmpty()) { + if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build(); } // Build the response - return Response.ok(taskStorageQueryAdapter.getConflictingLockIntervals(conflictingLockRequests)).build(); + return Response.ok(taskStorageQueryAdapter.getLockedIntervalsV2(lockFilterPolicies)).build(); } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index fd74827f4ae4..18e9fa9ccfb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -54,10 +54,10 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.metadata.ConflictingLockRequest; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.TestHelper; @@ -1252,7 +1252,7 @@ public void testGetLockedIntervalsForEqualPriorityTask() } @Test - public void testGetConflictingLockIntervalsForHigherPriorityExclusiveLock() + public void testGetLockedIntervalsV2ForHigherPriorityExclusiveLock() { final Task task = NoopTask.ofPriority(50); lockbox.add(task); @@ -1263,19 +1263,19 @@ public void testGetConflictingLockIntervalsForHigherPriorityExclusiveLock() Intervals.of("2017/2018") ); - ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest( + LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 75, null ); Map> conflictingIntervals = - lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); + lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock)); Assert.assertTrue(conflictingIntervals.isEmpty()); } @Test - public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock() + public void testGetLockedIntervalsForLowerPriorityExclusiveLock() { final Task task = NoopTask.ofPriority(50); lockbox.add(task); @@ -1286,14 +1286,14 @@ public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock() Intervals.of("2017/2018") ); - ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest( + LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 25, null ); Map> conflictingIntervals = - lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); + lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock)); Assert.assertEquals(1, conflictingIntervals.size()); Assert.assertEquals( Collections.singletonList(Intervals.of("2017/2018")), @@ -1302,7 +1302,7 @@ public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock() } @Test - public void testGetConflictingLockIntervalsForLowerPriorityReplaceLock() + public void testGetLockedIntervalsV2ForLowerPriorityReplaceLock() { final Task task = NoopTask.ofPriority(50); lockbox.add(task); @@ -1313,14 +1313,14 @@ public void testGetConflictingLockIntervalsForLowerPriorityReplaceLock() Intervals.of("2017/2018") ); - ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest( + LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 25, ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name()) ); Map> conflictingIntervals = - lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); + lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock)); Assert.assertTrue(conflictingIntervals.isEmpty()); } diff --git a/server/src/main/java/org/apache/druid/metadata/ConflictingLockRequest.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java similarity index 90% rename from server/src/main/java/org/apache/druid/metadata/ConflictingLockRequest.java rename to server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 54d44ae9eb03..88ab4673aa8a 100644 --- a/server/src/main/java/org/apache/druid/metadata/ConflictingLockRequest.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -27,16 +27,16 @@ import java.util.Objects; /** - * Request object used by CompactSegments to determine intervals with conflicting locks on the Overlord + * Specifies a policy to filter active locks held by a datasource */ -public class ConflictingLockRequest +public class LockFilterPolicy { private final String datasource; private final int priority; private final Map context; @JsonCreator - public ConflictingLockRequest( + public LockFilterPolicy( @JsonProperty("datasource") String datasource, @JsonProperty("priority") int priority, @JsonProperty("context") Map context @@ -74,7 +74,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - ConflictingLockRequest that = (ConflictingLockRequest) o; + LockFilterPolicy that = (LockFilterPolicy) o; return Objects.equals(datasource, that.datasource) && priority == that.priority && Objects.equals(context, that.context); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 7e43a43614c8..a18af000d476 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -32,7 +32,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; import org.joda.time.Interval; @@ -188,13 +188,12 @@ ListenableFuture> taskStatuses( /** * Returns a list of intervals locked by higher priority conflicting lock types * - * @param conflictingLockRequests List of all requests for different datasources - * + * @param lockFilterPolicies List of all filters for different datasources * @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with * priority greater than or equal to the {@code minTaskPriority} for that datasource. */ - ListenableFuture>> findConflictingLockIntervals( - List conflictingLockRequests + ListenableFuture>> findLockedIntervalsV2( + List lockFilterPolicies ); /** diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 7a8c57a4b27b..a3b39b6d3e55 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -39,7 +39,7 @@ import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; @@ -189,16 +189,16 @@ public ListenableFuture taskStatus(final String taskId) } @Override - public ListenableFuture>> findConflictingLockIntervals( - List conflictingLockRequests + public ListenableFuture>> findLockedIntervalsV2( + List lockFilterPolicies ) { - final String path = "/druid/indexer/v1/conflictingLockIntervals"; + final String path = "/druid/indexer/v1/lockedIntervals/v2"; return FutureUtils.transform( client.asyncRequest( new RequestBuilder(HttpMethod.POST, path) - .jsonContent(jsonMapper, conflictingLockRequests), + .jsonContent(jsonMapper, lockFilterPolicies), new BytesFullResponseHandler() ), holder -> { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 3076c1797e2f..da89040f50a4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.metadata.ConflictingLockRequest; import org.apache.druid.query.aggregation.AggregatorFactory; import org.joda.time.Period; @@ -215,9 +214,4 @@ public int hashCode() result = 31 * result + Arrays.hashCode(metricsSpec); return result; } - - public ConflictingLockRequest toConflictingLockRequest() - { - return new ConflictingLockRequest(dataSource, taskPriority, taskContext); - } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index f0a7ababd40a..0ffcc54055ff 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -42,7 +42,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -176,7 +176,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // Skip all the intervals locked by higher priority tasks for each datasource // This must be done after the invalid compaction tasks are cancelled // in the loop above so that their intervals are not considered locked - getConflictingLockIntervals(compactionConfigList).forEach( + getLockedIntervalsV2(compactionConfigList).forEach( (dataSource, intervals) -> intervalsToSkipCompaction .computeIfAbsent(dataSource, ds -> new ArrayList<>()) @@ -261,16 +261,16 @@ private boolean cancelTaskIfGranularityChanged( * higher priority Task * */ - private Map> getConflictingLockIntervals( + private Map> getLockedIntervalsV2( List compactionConfigs ) { - final List conflictingLockRequests = compactionConfigs + final List lockFilterPolicies = compactionConfigs .stream() - .map(DataSourceCompactionConfig::toConflictingLockRequest) + .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext())) .collect(Collectors.toList()); final Map> datasourceToLockedIntervals = - new HashMap<>(FutureUtils.getUnchecked(overlordClient.findConflictingLockIntervals(conflictingLockRequests), true)); + new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervalsV2(lockFilterPolicies), true)); LOG.debug( "Skipping the following intervals for Compaction as they are currently locked: %s", datasourceToLockedIntervals diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 23395462bb6d..b123945cb5bd 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -24,7 +24,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.joda.time.Interval; @@ -96,8 +96,8 @@ public ListenableFuture> supervisorStatuses( } @Override - public ListenableFuture>> findConflictingLockIntervals( - List conflictingLockRequests + public ListenableFuture>> findLockedIntervalsV2( + List lockFilterPolicies ) { throw new UnsupportedOperationException(); diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 0698d54b7718..e9d6f287b632 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -41,7 +41,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; @@ -222,12 +222,12 @@ public void test_findLockedIntervals() throws Exception { final Map> lockMap = ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001"))); - final List requests = ImmutableList.of( - new ConflictingLockRequest("foo", 3, null) + final List requests = ImmutableList.of( + new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLockIntervals") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -236,19 +236,19 @@ public void test_findLockedIntervals() throws Exception Assert.assertEquals( lockMap, - overlordClient.findConflictingLockIntervals(requests).get() + overlordClient.findLockedIntervalsV2(requests).get() ); } @Test public void test_findLockedIntervals_nullReturn() throws Exception { - final List requests = ImmutableList.of( - new ConflictingLockRequest("foo", 3, null) + final List requests = ImmutableList.of( + new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLockIntervals") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -257,7 +257,7 @@ public void test_findLockedIntervals_nullReturn() throws Exception Assert.assertEquals( Collections.emptyMap(), - overlordClient.findConflictingLockIntervals(requests).get() + overlordClient.findLockedIntervalsV2(requests).get() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 6b34e7a4e2dd..35e0bf5467f1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -58,7 +58,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.metadata.ConflictingLockRequest; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -1093,7 +1093,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() .thenReturn( Futures.immediateFuture( CloseableIterators.withEmptyBaggage(ImmutableList.of(runningConflictCompactionTask).iterator()))); - Mockito.when(mockClient.findConflictingLockIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findLockedIntervalsV2(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.cancelTask(conflictTaskId)) .thenReturn(Futures.immediateFuture(null)); @@ -1997,8 +1997,8 @@ public ListenableFuture runTask(String taskId, Object taskObject) @Override - public ListenableFuture>> findConflictingLockIntervals( - List conflictingLockRequests + public ListenableFuture>> findLockedIntervalsV2( + List lockFilterPolicies ) { return Futures.immediateFuture(lockedIntervals); @@ -2190,7 +2190,7 @@ private static ArgumentCaptor setUpMockClient(final OverlordClient mockC final ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(Object.class); Mockito.when(mockClient.taskStatuses(null, null, 0)) .thenReturn(Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator()))); - Mockito.when(mockClient.findConflictingLockIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findLockedIntervalsV2(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.getTotalWorkerCapacity()) .thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0)));