Skip to content

Commit

Permalink
Add realtime segment update and announcement to replace action
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 10, 2023
1 parent 04ba92c commit 7274912
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);

for (String supervisorId : activeSupervisorIds) {
for (SegmentIdWithShardSpec pendingSegment : upgradedPendingSegments) {
toolbox.getSupervisorManager().updatePendingSegmentMapping(supervisorId, pendingSegment);
}
}

// These upgraded pending segments should be forwarded to the SupervisorManager
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
Expand Down Expand Up @@ -203,10 +202,16 @@ public ListenableFuture<Boolean> updatePendingSegmentMapping(
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
if (versionsOfPendingSegment.isEmpty()) {
return Futures.immediateFuture(true);
}
final List<SegmentIdWithShardSpec> allVersionsOfPendingSegment = new ArrayList<>();
allVersionsOfPendingSegment.add(rootPendingSegment);
allVersionsOfPendingSegment.addAll(versionsOfPendingSegment);
final RequestBuilder requestBuilder = new RequestBuilder(
HttpMethod.POST,
"pendingSegmentMapping"
).jsonContent(jsonMapper, Pair.of(rootPendingSegment, versionsOfPendingSegment));
"/pendingSegmentMapping"
).jsonContent(jsonMapper, allVersionsOfPendingSegment);

return makeRequest(id, requestBuilder)
.handler(IgnoreHttpResponseHandler.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,13 +1558,13 @@ public Response setEndOffsetsHTTP(
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response updatePendingSegmentMapping(
Pair<SegmentIdWithShardSpec, Set<SegmentIdWithShardSpec>> rootPendingSegmentToVersionMapping,
List<SegmentIdWithShardSpec> allVersionsOfPendingSegment,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.WRITE);
return updatePendingSegmentMapping(rootPendingSegmentToVersionMapping.lhs, rootPendingSegmentToVersionMapping.rhs);
return updatePendingSegmentMapping(allVersionsOfPendingSegment);
}

public Map<String, Object> doGetRowStats()
Expand Down Expand Up @@ -1772,13 +1772,13 @@ public Response setEndOffsets(
return Response.ok(sequenceNumbers).build();
}

private Response updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
private Response updatePendingSegmentMapping(List<SegmentIdWithShardSpec> allVersionsOfPendingSegment)
{
try {
((StreamAppenderator) appenderator).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment);
((StreamAppenderator) appenderator).updatePendingSegmentMapping(
allVersionsOfPendingSegment.get(0),
allVersionsOfPendingSegment.subList(1, allVersionsOfPendingSegment.size())
);
return Response.ok().build();
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegmen
taskClient.updatePendingSegmentMapping(
taskId,
rootPendingSegment,
indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment)
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
);
}
}
Expand All @@ -1111,7 +1111,7 @@ public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegmen
taskClient.updatePendingSegmentMapping(
taskId,
rootPendingSegment,
indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment)
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,4 @@ SegmentPublishResult commitMetadataOnly(
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id, boolean includeUnused);

Set<SegmentIdWithShardSpec> getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2554,12 +2554,6 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
);
}

@Override
public Set<SegmentIdWithShardSpec> getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment)
{
return Collections.emptySet();
}

private static class PendingSegmentsRecord
{
private final String sequenceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

Expand All @@ -95,7 +96,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final Map<SegmentDescriptor, SegmentDescriptor> newIdToRootPendingSegment = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToRootPendingSegment
= new ConcurrentHashMap<>();

public SinkQuerySegmentWalker(
String dataSource,
Expand Down Expand Up @@ -304,7 +306,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
List<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
for (SegmentIdWithShardSpec versionOfPendingSegment : versionsOfPendingSegment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ private void unannounceRootSegmentAndUpgradedVersions(Sink sink) throws IOExcept

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
List<SegmentIdWithShardSpec> versionsOfPendingSegment
) throws IOException
{
if (!sinks.containsKey(rootPendingSegment) || droppingSinks.contains(rootPendingSegment)) {
Expand Down

0 comments on commit 7274912

Please sign in to comment.