Skip to content

Commit

Permalink
Filter pending segments upgraded with transactional replace (apache#1…
Browse files Browse the repository at this point in the history
…5169)

* Filter pending segments upgraded with transactional replace

* Push sequence name filter to metadata query
  • Loading branch information
AmatyaAvadhanula authored Oct 23, 2023
1 parent 2e31cb2 commit 65b69cd
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ public LagStats computeLagStats()
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
throw new UnsupportedOperationException();
}

@Override
public int getActiveTaskGroupsCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,11 @@ public void testMaterializedViewSupervisorSpecCreated()
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);

try {
supervisor.computeLagStats();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.computeLagStats());

try {
int count = supervisor.getActiveTaskGroupsCount();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount());

Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes());

Callable<Integer> noop = new Callable<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t
return;
}

final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,6 +111,15 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String
return Optional.absent();
}

public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
{
if (supervisors.containsKey(activeSupervisorId)) {
return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
} else {
return Collections.emptySet();
}
}

public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,28 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}

/**
* The base sequence name of a seekable stream task group is used as a prefix of the sequence names
* of pending segments published by it.
* This method can be used to identify the active pending segments for a datasource
* by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method
* @return the set of base sequence names of both active and pending completion task gruops.
*/
@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
final Set<String> activeBaseSequences = new HashSet<>();
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
}
return activeBaseSequences;
}

public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,48 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
}

@Test
public void testGetActiveRealtimeSequencePrefixes()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);

replayAll();

final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);

supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);

supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
);

Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size());
}

@Test
public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ public SegmentIdWithShardSpec allocatePendingSegment(
}

@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeBaseSequenceNames
)
{
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups
* of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -185,6 +186,12 @@ public int getActiveTaskGroupsCount()
{
return -1;
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
return Collections.emptySet();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Supervisor
{
Expand Down Expand Up @@ -93,4 +94,9 @@ default Boolean isHealthy()
LagStats computeLagStats();

int getActiveTaskGroupsCount();

/**
* @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor
*/
Set<String> getActiveRealtimeSequencePrefixes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) {
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) {
return ImmutableList.copyOf(iterator);
}
}
Expand All @@ -258,9 +258,62 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv

/**
* Fetches all the pending segments, whose interval overlaps with the given
* search interval from the metadata store. Returns a Map from the
* pending segment ID to the sequence name.
* search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter
* from the metadata store. Returns a Map from the pending segment ID to the sequence name.
*/
@VisibleForTesting
Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval,
final Set<String> sequenceNamePrefixFilter
) throws IOException
{
if (sequenceNamePrefixFilter.isEmpty()) {
return Collections.emptyMap();
}

final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter);
final List<String> sequenceNamePrefixConditions = new ArrayList<>();
for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i));
}

String sql = "SELECT sequence_name, payload"
+ " FROM " + dbTables.getPendingSegmentsTable()
+ " WHERE dataSource = :dataSource"
+ " AND start < :end"
+ StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())
+ " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )";

Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());

for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%");
}

final ResultIterator<PendingSegmentsRecord> dbSegments =
query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
.iterator();

final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
while (dbSegments.hasNext()) {
PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);

if (interval.overlaps(identifier.getInterval())) {
pendingSegmentToSequenceName.put(identifier, record.sequenceName);
}
}

dbSegments.close();

return pendingSegmentToSequenceName;
}

private Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
Expand Down Expand Up @@ -620,7 +673,8 @@ public SegmentIdWithShardSpec allocatePendingSegment(

@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
)
{
if (replaceSegments.isEmpty()) {
Expand All @@ -639,7 +693,7 @@ public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegment

final String datasource = replaceSegments.iterator().next().getDataSource();
return connector.retryWithHandle(
handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId)
handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
);
}

Expand All @@ -658,7 +712,8 @@ public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegment
private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(
Handle handle,
String datasource,
Map<Interval, DataSegment> replaceIntervalToMaxId
Map<Interval, DataSegment> replaceIntervalToMaxId,
Set<String> activeRealtimeSequencePrefixes
) throws IOException
{
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> newPendingSegmentVersions = new HashMap<>();
Expand All @@ -673,12 +728,13 @@ private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmen
int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum();

final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
= getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval);
= getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes);

for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
: overlappingPendingSegments.entrySet()) {
final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
final String pendingSegmentSequence = overlappingPendingSegment.getValue();

if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) {
// Ensure unique sequence_name_prev_id_sha1 by setting
// sequence_prev_id -> pendingSegmentId
Expand Down Expand Up @@ -1281,9 +1337,9 @@ private Set<DataSegment> createNewIdsForAppendSegments(
final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new HashMap<>();
for (DataSegment segment : overlappingSegments) {
overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
.add(segment.getInterval());
.add(segment.getInterval());
overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>())
.add(segment);
.add(segment);
}

final Set<DataSegment> upgradedSegments = new HashSet<>();
Expand Down Expand Up @@ -2275,7 +2331,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
// Not in the desired start state.
return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
"Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
"the supervisor name. Stored state: [%s], Target state: [%s].",
"the supervisor name. Stored state: [%s], Target state: [%s].",
oldCommitMetadataFromDb,
startMetadata
));
Expand Down
Loading

0 comments on commit 65b69cd

Please sign in to comment.