Skip to content

Commit

Permalink
Create fewer temporary maps when querying sys.segments. (apache#16981)
Browse files Browse the repository at this point in the history
Eliminates two map creations (availableSegmentMetadata, partialSegmentDataMap).
The segmentsAlreadySeen set remains.
  • Loading branch information
gianm authored Sep 4, 2024
1 parent 57bf053 commit 76b8c20
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -413,13 +414,28 @@ public Set<String> getDatasourceNames()
*/
public Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments);
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val);
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(getTotalSegments());
final Iterator<AvailableSegmentMetadata> it = iterateSegmentMetadata();
while (it.hasNext()) {
final AvailableSegmentMetadata availableSegmentMetadata = it.next();
segmentMetadata.put(availableSegmentMetadata.getSegment().getId(), availableSegmentMetadata);
}
return segmentMetadata;
}

/**
* Get metadata for all the cached segments, which includes information like RowSignature, realtime & numRows etc.
* This is a lower-overhead method than {@link #getSegmentMetadataSnapshot()}.
*
* @return iterator of metadata.
*/
public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata()
{
return FluentIterable.from(segmentMetadataInfo.values())
.transformAndConcat(Map::values)
.iterator();
}

/**
* Get metadata for the specified segment, which includes information like RowSignature, realtime & numRows.
*
Expand All @@ -431,10 +447,14 @@ public Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
@Nullable
public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId)
{
if (!segmentMetadataInfo.containsKey(datasource)) {
final ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> dataSourceMap =
segmentMetadataInfo.get(datasource);

if (dataSourceMap == null) {
return null;
} else {
return dataSourceMap.get(segmentId);
}
return segmentMetadataInfo.get(datasource).get(segmentId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
Expand Down Expand Up @@ -64,6 +64,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -357,27 +358,29 @@ protected boolean segmentMetadataQueryResultHandler(
}

@Override
public Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(getTotalSegments());
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : val.entrySet()) {
Optional<SchemaPayloadPlus> metadata = segmentSchemaCache.getSchemaForSegment(entry.getKey());
AvailableSegmentMetadata availableSegmentMetadata = entry.getValue();
if (metadata.isPresent()) {
availableSegmentMetadata = AvailableSegmentMetadata.from(entry.getValue())
.withRowSignature(metadata.get().getSchemaPayload().getRowSignature())
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(entry.getKey());
log.debug("SchemaMetadata for segmentId [%s] is absent.", entry.getKey());
}
segmentMetadata.put(entry.getKey(), availableSegmentMetadata);
}
}
return segmentMetadata;
return FluentIterable
.from(segmentMetadataInfo.values())
.transformAndConcat(Map::values)
.transform(
availableSegmentMetadata -> {
final SegmentId segmentId = availableSegmentMetadata.getSegment().getId();
final Optional<SchemaPayloadPlus> metadata = segmentSchemaCache.getSchemaForSegment(segmentId);
if (metadata.isPresent()) {
return AvailableSegmentMetadata.from(availableSegmentMetadata)
.withRowSignature(metadata.get().getSchemaPayload().getRowSignature())
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId);
return availableSegmentMetadata;
}
}
)
.iterator();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
Expand Down Expand Up @@ -100,7 +99,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -304,38 +302,29 @@ public Enumerable<Object[]> scan(
)
{
// get available segments from druidSchema
final Map<SegmentId, AvailableSegmentMetadata> availableSegmentMetadata =
druidSchema.cache().getSegmentMetadataSnapshot();
final Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries =
availableSegmentMetadata.entrySet().iterator();

// in memory map to store segment data from available segments
final Map<SegmentId, PartialSegmentData> partialSegmentDataMap =
Maps.newHashMapWithExpectedSize(druidSchema.cache().getTotalSegments());
for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData =
new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegment().getId(), partialSegmentData);
}
final BrokerSegmentMetadataCache availableMetadataCache = druidSchema.cache();

// Keep track of which segments we emitted from the publishedSegments iterator, so we don't emit them again
// from the availableSegments iterator.
final Set<SegmentId> segmentsAlreadySeen =
Sets.newHashSetWithExpectedSize(availableMetadataCache.getTotalSegments());

// Get segments from metadata segment cache (if enabled in SQL planner config), else directly from
// Coordinator. This may include both published and realtime segments.
final Iterator<SegmentStatusInCluster> metadataStoreSegments = metadataView.getSegments();

final Set<SegmentId> segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments());

final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root))
.transform(val -> {
final DataSegment segment = val.getDataSegment();
final AvailableSegmentMetadata availableSegmentMetadata =
availableMetadataCache.getAvailableSegmentMetadata(segment.getDataSource(), segment.getId());
segmentsAlreadySeen.add(segment.getId());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
long numReplicas = 0L, numRows = 0L, isRealtime, isAvailable = 0L;

if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
isAvailable = partialSegmentData.isAvailable();
numRows = partialSegmentData.getNumRows();
if (availableSegmentMetadata != null) {
numReplicas = availableSegmentMetadata.getNumReplicas();
isAvailable = availableSegmentMetadata.getNumReplicas() > 0 ? IS_AVAILABLE_TRUE : IS_ACTIVE_FALSE;
numRows = availableSegmentMetadata.getNumRows();
}

// If druid.centralizedDatasourceSchema.enabled is set on the Coordinator, SegmentMetadataCache on the
Expand Down Expand Up @@ -383,34 +372,29 @@ public Enumerable<Object[]> scan(
// If druid.centralizedDatasourceSchema.enabled is set on the Coordinator, all the segments in this loop
// would be covered in the previous iteration since Coordinator would return realtime segments as well.
final FluentIterable<Object[]> availableSegments = FluentIterable
.from(() -> getAuthorizedAvailableSegments(
availableSegmentEntries,
root
))
.from(() -> getAuthorizedAvailableSegments(availableMetadataCache.iterateSegmentMetadata(), root))
.transform(val -> {
if (segmentsAlreadySeen.contains(val.getKey())) {
final DataSegment segment = val.getSegment();
if (segmentsAlreadySeen.contains(segment.getId())) {
return null;
}
final DataSegment segment = val.getValue().getSegment();
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart(),
val.getKey().getInterval().getEnd(),
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getSize(),
val.getKey().getVersion(),
segment.getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
val.getNumReplicas(),
val.getNumRows(),
// is_active is true for unpublished segments iff they are realtime
val.getValue().isRealtime() /* is_active */,
val.isRealtime() /* is_active */,
// is_published is false for unpublished segments
IS_PUBLISHED_FALSE,
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
val.isRealtime(),
IS_OVERSHADOWED_FALSE,
// there is an assumption here that unpublished segments are never overshadowed
segment.getShardSpec(),
Expand Down Expand Up @@ -450,8 +434,8 @@ private Iterator<SegmentStatusInCluster> getAuthorizedPublishedSegments(
return authorizedSegments.iterator();
}

private Iterator<Entry<SegmentId, AvailableSegmentMetadata>> getAuthorizedAvailableSegments(
Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries,
private Iterator<AvailableSegmentMetadata> getAuthorizedAvailableSegments(
Iterator<AvailableSegmentMetadata> availableSegmentEntries,
DataContext root
)
{
Expand All @@ -460,12 +444,12 @@ private Iterator<Entry<SegmentId, AvailableSegmentMetadata>> getAuthorizedAvaila
"authenticationResult in dataContext"
);

Function<Entry<SegmentId, AvailableSegmentMetadata>, Iterable<ResourceAction>> raGenerator = segment ->
Function<AvailableSegmentMetadata, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource())
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getSegment().getDataSource())
);

final Iterable<Entry<SegmentId, AvailableSegmentMetadata>> authorizedSegments =
final Iterable<AvailableSegmentMetadata> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
() -> availableSegmentEntries,
Expand Down

0 comments on commit 76b8c20

Please sign in to comment.