Skip to content

Commit

Permalink
Allow intervals with append locks to be chosen by autocompaction with…
Browse files Browse the repository at this point in the history
… replace locks
  • Loading branch information
AmatyaAvadhanula committed Oct 12, 2023
1 parent 73707a8 commit e893d1a
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -957,6 +959,62 @@ private Set<ReplaceTaskLock> getNonRevokedReplaceLocks(List<TaskLockPosse> posse
return replaceLocks;
}

/**
* @param conflictingLockRequests Requests for conflicing lock intervals for various datasources
* @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked
*/
public Map<String, List<Interval>> getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
{
final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();

// Take a lock and populate the maps
giant.lock();

try {
conflictingLockRequests.forEach(
conflictingLockRequest -> {
final String datasource = conflictingLockRequest.getDatasource();
final int priority = conflictingLockRequest.getPriority();
final boolean ignoreAppendLocks =
TaskLockType.REPLACE.name().equals(conflictingLockRequest.getContext().get(Tasks.TASK_LOCK_TYPE));
if (!running.containsKey(datasource)) {
return;
}

running.get(datasource).forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
(interval, taskLockPosses) -> taskLockPosses.forEach(
taskLockPosse -> {
if (taskLockPosse.getTaskLock().isRevoked()) {
// do nothing
} else if (ignoreAppendLocks
&& TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) {
// do nothing
} else if (taskLockPosse.getTaskLock().getPriority() == null
|| taskLockPosse.getTaskLock().getPriority() < priority) {
// do nothing
} else {
datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>())
.add(interval);
}
}
)
)
);
}
);
}
finally {
giant.unlock();
}

return datasourceToIntervals.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ArrayList<>(entry.getValue())
));
}

/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
* Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
Expand Down Expand Up @@ -60,6 +61,15 @@ public List<Task> getActiveTasks()
return storage.getActiveTasks();
}

/**
* @param conflictingLockRequests Requests for conflicing lock intervals for various datasources
* @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked
*/
public Map<String, List<Interval>> getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
{
return taskLockbox.getConflictingLockIntervals(conflictingLockRequests);
}

/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
Expand Down Expand Up @@ -274,6 +275,20 @@ public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriorit
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
}

@POST
@Path("/conflictingLockIntervals")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
{
if (conflictingLockRequests == null || conflictingLockRequests.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build();
}

// Build the response
return Response.ok(taskStorageQueryAdapter.getConflictingLockIntervals(conflictingLockRequests)).build();
}

@GET
@Path("/task/{taskid}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -43,6 +44,7 @@
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -52,6 +54,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
Expand Down Expand Up @@ -1248,6 +1251,80 @@ public void testGetLockedIntervalsForEqualPriorityTask()
);
}

@Test
public void testGetConflictingLockIntervalsForHigherPriorityExclusiveLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
tryTimeChunkLock(
TaskLockType.APPEND,
task,
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
task.getDataSource(),
75,
null
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertTrue(conflictingIntervals.isEmpty());
}

@Test
public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
tryTimeChunkLock(
TaskLockType.APPEND,
task,
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
task.getDataSource(),
25,
null
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertEquals(1, conflictingIntervals.size());
Assert.assertEquals(
Collections.singletonList(Intervals.of("2017/2018")),
conflictingIntervals.get(task.getDataSource())
);
}

@Test
public void testGetConflictingLockIntervalsForLowerPriorityReplaceLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
tryTimeChunkLock(
TaskLockType.APPEND,
task,
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
task.getDataSource(),
25,
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertTrue(conflictingIntervals.isEmpty());
}


@Test
public void testExclusiveLockCompatibility()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
* Request object used by CompactSegments to determine intervals with conflicting locks on the Overlord
*/
public class ConflictingLockRequest
{
private final String datasource;
private final int priority;
private final Map<String, Object> context;

@JsonCreator
public ConflictingLockRequest(
@JsonProperty("datasource") String datasource,
@JsonProperty("priority") int priority,
@JsonProperty("context") Map<String, Object> context
)
{
this.datasource = datasource;
this.priority = priority;
this.context = context == null ? Collections.emptyMap() : context;
}

@JsonProperty
public String getDatasource()
{
return datasource;
}

@JsonProperty
public int getPriority()
{
return priority;
}

@JsonProperty
public Map<String, Object> getContext()
{
return context;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConflictingLockRequest that = (ConflictingLockRequest) o;
return Objects.equals(datasource, that.datasource)
&& priority == that.priority
&& Objects.equals(context, that.context);
}

@Override
public int hashCode()
{
return Objects.hash(datasource, priority, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.joda.time.Interval;

Expand Down Expand Up @@ -190,11 +191,23 @@ ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
* @param minTaskPriority Minimum task priority for each datasource. Only the intervals that are locked by tasks with
* equal or higher priority than this are returned.
*
* @return Map from dtasource name to list of intervals locked by tasks that have priority greater than or equal to
* @return Map from datasource name to list of intervals locked by tasks that have priority greater than or equal to
* the {@code minTaskPriority} for that datasource.
*/
ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(Map<String, Integer> minTaskPriority);

/**
* Returns a list of intervals locked by higher priority conflicting lock types
*
* @param conflictingLockRequests List of all requests for different datasources
*
* @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with
* priority greater than or equal to the {@code minTaskPriority} for that datasource.
*/
ListenableFuture<Map<String, List<Interval>>> findConflictingLockIntervals(
List<ConflictingLockRequest> conflictingLockRequests
);

/**
* Deletes pending segment records from the metadata store for a particular datasource. Records with
* {@code created_date} within the provided {@code interval} are deleted; other records are left alone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
Expand Down Expand Up @@ -210,6 +211,31 @@ public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(Map<Str
);
}

@Override
public ListenableFuture<Map<String, List<Interval>>> findConflictingLockIntervals(
List<ConflictingLockRequest> conflictingLockRequests
)
{
final String path = "/druid/indexer/v1/conflictingLockIntervals";

return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, path)
.jsonContent(jsonMapper, conflictingLockRequests),
new BytesFullResponseHandler()
),
holder -> {
final Map<String, List<Interval>> response = JacksonUtils.readValue(
jsonMapper,
holder.getContent(),
new TypeReference<Map<String, List<Interval>>>() {}
);

return response == null ? Collections.emptyMap() : response;
}
);
}

@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
{
Expand Down
Loading

0 comments on commit e893d1a

Please sign in to comment.