Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pending segments upgraded with transactional replace #15169

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ public LagStats computeLagStats()
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probably don't need an explicit message here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this as the message is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Missed this earlier

}

@Override
public int getActiveTaskGroupsCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated()
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Assert.assertThrows instead and update the existing assertions in this method too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

supervisor.getActiveRealtimeSequencePrefixes();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t
return;
}

final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,6 +111,14 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String
return Optional.absent();
}

public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
{
if (!supervisors.containsKey(activeSupervisorId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert the if condition:

if contains:
   return value
else
  return empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return Collections.emptySet();
}
return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
}

public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,28 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}

/**
* The base sequence name of a seekable stream task group is used as a prefix of the sequence names
* of pending segments published by it.
* This method can be used to identify the active pending segments for a datasource
* by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method
* @return the set of base sequence names of both active and pending completion task gruops.
*/
@Override
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
public Set<String> getActiveRealtimeSequencePrefixes()
{
final Set<String> activeBaseSequences = new HashSet<>();
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
}
return activeBaseSequences;
}

public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,48 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
}

@Test
public void testGetActiveRealtimeSequencePrefixes()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);

replayAll();

final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);

supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);

supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
);

Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size());
}

@Test
public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ public SegmentIdWithShardSpec allocatePendingSegment(
}

@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeBaseSequenceNames
)
{
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the javadoc to say prefixes instead of sequence names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -185,6 +186,12 @@ public int getActiveTaskGroupsCount()
{
return -1;
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
return Collections.emptySet();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Supervisor
{
Expand Down Expand Up @@ -93,4 +94,9 @@ default Boolean isHealthy()
LagStats computeLagStats();

int getActiveTaskGroupsCount();

/**
* @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: prefixes instead of sequence names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
Set<String> getActiveRealtimeSequencePrefixes();
}
Loading