Skip to content

Commit

Permalink
Renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 12, 2023
1 parent ad084e7 commit 8779371
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -960,23 +960,23 @@ private Set<ReplaceTaskLock> getNonRevokedReplaceLocks(List<TaskLockPosse> posse
}

/**
* @param conflictingLockRequests Requests for conflicing lock intervals for various datasources
* @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked
* @param lockFilterPolicies Lock filters for the given datasources
* @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions
*/
public Map<String, List<Interval>> getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
public Map<String, List<Interval>> getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
{
final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();

// Take a lock and populate the maps
giant.lock();

try {
conflictingLockRequests.forEach(
conflictingLockRequest -> {
final String datasource = conflictingLockRequest.getDatasource();
final int priority = conflictingLockRequest.getPriority();
lockFilterPolicies.forEach(
lockFilter -> {
final String datasource = lockFilter.getDatasource();
final int priority = lockFilter.getPriority();
final boolean ignoreAppendLocks =
TaskLockType.REPLACE.name().equals(conflictingLockRequest.getContext().get(Tasks.TASK_LOCK_TYPE));
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
if (!running.containsKey(datasource)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
Expand Down Expand Up @@ -62,12 +62,12 @@ public List<Task> getActiveTasks()
}

/**
* @param conflictingLockRequests Requests for conflicing lock intervals for various datasources
* @param lockFilterPolicies Requests for conflicing lock intervals for various datasources
* @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked
*/
public Map<String, List<Interval>> getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
public Map<String, List<Interval>> getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
{
return taskLockbox.getConflictingLockIntervals(conflictingLockRequests);
return taskLockbox.getLockedIntervalsV2(lockFilterPolicies);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
Expand Down Expand Up @@ -277,17 +277,17 @@ public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriorit
}

@POST
@Path("/conflictingLockIntervals")
@Path("/getLockedIntervals/v2")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getConflictingLockIntervals(List<ConflictingLockRequest> conflictingLockRequests)
public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
{
if (conflictingLockRequests == null || conflictingLockRequests.isEmpty()) {
if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build();
}

// Build the response
return Response.ok(taskStorageQueryAdapter.getConflictingLockIntervals(conflictingLockRequests)).build();
return Response.ok(taskStorageQueryAdapter.getLockedIntervalsV2(lockFilterPolicies)).build();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
Expand Down Expand Up @@ -1252,7 +1252,7 @@ public void testGetLockedIntervalsForEqualPriorityTask()
}

@Test
public void testGetConflictingLockIntervalsForHigherPriorityExclusiveLock()
public void testGetLockedIntervalsV2ForHigherPriorityExclusiveLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
Expand All @@ -1263,19 +1263,19 @@ public void testGetConflictingLockIntervalsForHigherPriorityExclusiveLock()
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
75,
null
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertTrue(conflictingIntervals.isEmpty());
}

@Test
public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock()
public void testGetLockedIntervalsForLowerPriorityExclusiveLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
Expand All @@ -1286,14 +1286,14 @@ public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock()
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
null
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertEquals(1, conflictingIntervals.size());
Assert.assertEquals(
Collections.singletonList(Intervals.of("2017/2018")),
Expand All @@ -1302,7 +1302,7 @@ public void testGetConflictingLockIntervalsForLowerPriorityExclusiveLock()
}

@Test
public void testGetConflictingLockIntervalsForLowerPriorityReplaceLock()
public void testGetLockedIntervalsV2ForLowerPriorityReplaceLock()
{
final Task task = NoopTask.ofPriority(50);
lockbox.add(task);
Expand All @@ -1313,14 +1313,14 @@ public void testGetConflictingLockIntervalsForLowerPriorityReplaceLock()
Intervals.of("2017/2018")
);

ConflictingLockRequest requestForExclusiveLowerPriorityLock = new ConflictingLockRequest(
LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
);

Map<String, List<Interval>> conflictingIntervals =
lockbox.getConflictingLockIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
lockbox.getLockedIntervalsV2(ImmutableList.of(requestForExclusiveLowerPriorityLock));
Assert.assertTrue(conflictingIntervals.isEmpty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import java.util.Objects;

/**
* Request object used by CompactSegments to determine intervals with conflicting locks on the Overlord
* Specifies a policy to filter active locks held by a datasource
*/
public class ConflictingLockRequest
public class LockFilterPolicy
{
private final String datasource;
private final int priority;
private final Map<String, Object> context;

@JsonCreator
public ConflictingLockRequest(
public LockFilterPolicy(
@JsonProperty("datasource") String datasource,
@JsonProperty("priority") int priority,
@JsonProperty("context") Map<String, Object> context
Expand Down Expand Up @@ -74,7 +74,7 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
ConflictingLockRequest that = (ConflictingLockRequest) o;
LockFilterPolicy that = (LockFilterPolicy) o;
return Objects.equals(datasource, that.datasource)
&& priority == that.priority
&& Objects.equals(context, that.context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.joda.time.Interval;

Expand Down Expand Up @@ -188,13 +188,12 @@ ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
/**
* Returns a list of intervals locked by higher priority conflicting lock types
*
* @param conflictingLockRequests List of all requests for different datasources
*
* @param lockFilterPolicies List of all filters for different datasources
* @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with
* priority greater than or equal to the {@code minTaskPriority} for that datasource.
*/
ListenableFuture<Map<String, List<Interval>>> findConflictingLockIntervals(
List<ConflictingLockRequest> conflictingLockRequests
ListenableFuture<Map<String, List<Interval>>> findLockedIntervalsV2(
List<LockFilterPolicy> lockFilterPolicies
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
Expand Down Expand Up @@ -189,16 +189,16 @@ public ListenableFuture<TaskStatusResponse> taskStatus(final String taskId)
}

@Override
public ListenableFuture<Map<String, List<Interval>>> findConflictingLockIntervals(
List<ConflictingLockRequest> conflictingLockRequests
public ListenableFuture<Map<String, List<Interval>>> findLockedIntervalsV2(
List<LockFilterPolicy> lockFilterPolicies
)
{
final String path = "/druid/indexer/v1/conflictingLockIntervals";
final String path = "/druid/indexer/v1/lockedIntervals/v2";

return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, path)
.jsonContent(jsonMapper, conflictingLockRequests),
.jsonContent(jsonMapper, lockFilterPolicies),
new BytesFullResponseHandler()
),
holder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Period;

Expand Down Expand Up @@ -215,9 +214,4 @@ public int hashCode()
result = 31 * result + Arrays.hashCode(metricsSpec);
return result;
}

public ConflictingLockRequest toConflictingLockRequest()
{
return new ConflictingLockRequest(dataSource, taskPriority, taskContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
Expand Down Expand Up @@ -176,7 +176,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
// Skip all the intervals locked by higher priority tasks for each datasource
// This must be done after the invalid compaction tasks are cancelled
// in the loop above so that their intervals are not considered locked
getConflictingLockIntervals(compactionConfigList).forEach(
getLockedIntervalsV2(compactionConfigList).forEach(
(dataSource, intervals) ->
intervalsToSkipCompaction
.computeIfAbsent(dataSource, ds -> new ArrayList<>())
Expand Down Expand Up @@ -261,16 +261,16 @@ private boolean cancelTaskIfGranularityChanged(
* higher priority Task</li>
* </ul>
*/
private Map<String, List<Interval>> getConflictingLockIntervals(
private Map<String, List<Interval>> getLockedIntervalsV2(
List<DataSourceCompactionConfig> compactionConfigs
)
{
final List<ConflictingLockRequest> conflictingLockRequests = compactionConfigs
final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
.stream()
.map(DataSourceCompactionConfig::toConflictingLockRequest)
.map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext()))
.collect(Collectors.toList());
final Map<String, List<Interval>> datasourceToLockedIntervals =
new HashMap<>(FutureUtils.getUnchecked(overlordClient.findConflictingLockIntervals(conflictingLockRequests), true));
new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervalsV2(lockFilterPolicies), true));
LOG.debug(
"Skipping the following intervals for Compaction as they are currently locked: %s",
datasourceToLockedIntervals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.ConflictingLockRequest;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.joda.time.Interval;
Expand Down Expand Up @@ -96,8 +96,8 @@ public ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses(
}

@Override
public ListenableFuture<Map<String, List<Interval>>> findConflictingLockIntervals(
List<ConflictingLockRequest> conflictingLockRequests
public ListenableFuture<Map<String, List<Interval>>> findLockedIntervalsV2(
List<LockFilterPolicy> lockFilterPolicies
)
{
throw new UnsupportedOperationException();
Expand Down
Loading

0 comments on commit 8779371

Please sign in to comment.