Skip to content

Commit

Permalink
Use task actions to fetch used segments in MSQ (apache#15284)
Browse files Browse the repository at this point in the history
* Use task actions to fetch used segments in MSQ

* Fix tests

* Fixing tests.

* Revert "Fix tests"

This reverts commit 95ab649

* Removing conditional check in tests.

* Pulling in latest changes.

---------

Co-authored-by: cryptoe <[email protected]>
  • Loading branch information
AmatyaAvadhanula and cryptoe authored Dec 1, 2023
1 parent 9f3b266 commit 4a594bb
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
Expand Down Expand Up @@ -1200,8 +1201,18 @@ private DataSegmentTimelineView makeDataSegmentTimelineView()
}

// Fetch all published, used segments (all non-realtime segments) from the metadata store.
final Collection<DataSegment> publishedUsedSegments =
FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource, intervals), true);
// If the task is operating with a REPLACE lock,
// any segment created after the lock was acquired for its interval will not be considered.
final Collection<DataSegment> publishedUsedSegments;
try {
publishedUsedSegments = context.taskActionClient().submit(new RetrieveSegmentsToReplaceAction(
dataSource,
intervals
));
}
catch (IOException e) {
throw new MSQException(e, UnknownFault.forException(e));
}

int realtimeCount = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public SqlEngine createEngine(
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public SqlEngine createEngine(
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public SqlEngine createEngine(
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public SqlEngine createEngine(
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public String getFormatString()

doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());

testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper));
testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper, injector));
indexingServiceClient = new MSQTestOverlordServiceClient(
objectMapper,
injector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
Expand Down Expand Up @@ -104,20 +103,6 @@ public MSQTestControllerContext(
this.injector = injector;
this.taskActionClient = taskActionClient;
coordinatorClient = Mockito.mock(CoordinatorClient.class);
Mockito.when(coordinatorClient.fetchUsedSegments(
ArgumentMatchers.anyString(),
ArgumentMatchers.anyList()
)
).thenAnswer(invocation ->
Futures.immediateFuture(
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
.getSegments()
.stream()
.filter(dataSegment -> dataSegment.getDataSource()
.equals(invocation.getArguments()[0]))
.collect(Collectors.toList())
)
);

Mockito.when(coordinatorClient.fetchServerViewSegments(
ArgumentMatchers.anyString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
Expand All @@ -39,6 +41,7 @@
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
Expand All @@ -62,12 +65,15 @@ public class MSQTestTaskActionClient implements TaskActionClient
"foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
);
private final Set<DataSegment> publishedSegments = new HashSet<>();
private final Injector injector;

public MSQTestTaskActionClient(
ObjectMapper mapper
ObjectMapper mapper,
Injector injector
)
{
this.mapper = mapper;
this.injector = injector;
}

@Override
Expand Down Expand Up @@ -122,6 +128,14 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction)
.build()
).collect(Collectors.toSet());
}
} else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
String dataSource = ((RetrieveSegmentsToReplaceAction) taskAction).getDataSource();
return (RetType) injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
.getSegments()
.stream()
.filter(dataSegment -> dataSegment.getDataSource()
.equals(dataSource))
.collect(Collectors.toSet());
} else if (taskAction instanceof SegmentTransactionalInsertAction) {
final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) taskAction).getSegments();
publishedSegments.addAll(segments);
Expand Down

0 comments on commit 4a594bb

Please sign in to comment.