Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pending segments upgraded with transactional replace #15169

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ public LagStats computeLagStats()
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probably don't need an explicit message here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this as the message is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Missed this earlier

}

@Override
public int getActiveTaskGroupsCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
final Optional<String> activeSupervisorId =
supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
if (!activeSupervisorId.isPresent()) {
return;
}

final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorId.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -87,6 +88,14 @@ public Optional<String> getActiveSupervisorIdForDatasource(String datasource)
return Optional.absent();
}

public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
{
if (!supervisors.containsKey(activeSupervisorId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert the if condition:

if contains:
   return value
else
  return empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return Collections.emptySet();
}
return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
}

public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,21 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}

@Override
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
public Set<String> getActiveRealtimeSequencePrefixes()
{
final Set<String> activeBaseSequences = new HashSet<>();
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
}
return activeBaseSequences;
}

public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ public SegmentIdWithShardSpec allocatePendingSegment(
}

@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeBaseSequenceNames
)
{
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,13 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the javadoc to say prefixes instead of sequence names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -185,6 +186,12 @@ public int getActiveTaskGroupsCount()
{
return -1;
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
return Collections.emptySet();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Supervisor
{
Expand Down Expand Up @@ -93,4 +94,9 @@ default Boolean isHealthy()
LagStats computeLagStats();

int getActiveTaskGroupsCount();

/**
* @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: prefixes instead of sequence names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
Set<String> getActiveRealtimeSequencePrefixes();
}
Loading
Loading