Skip to content

Commit

Permalink
Fix NPE caused by realtime segment closing race, fix possible missing…
Browse files Browse the repository at this point in the history
…-segment retry bug.

Fixes apache#12168, by returning empty from FireHydrant when the segment is
swapped to null. This causes the SinkQuerySegmentWalker to use
ReportTimelineMissingSegmentQueryRunner, which causes the Broker to look
for the segment somewhere else.

In addition, this patch changes SinkQuerySegmentWalker to acquire references
to all hydrants (subsegments of a sink) at once, and return a
ReportTimelineMissingSegmentQueryRunner if *any* of them could not be acquired.
I suspect, although have not confirmed, that the prior behavior could lead to
segments being reported as missing even though results from some hydrants were
still included.
  • Loading branch information
gianm committed Oct 26, 2023
1 parent e7b8e65 commit 930c81e
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ public Optional<Pair<SegmentReference, Closeable>> getSegmentForQuery(
)
{
ReferenceCountingSegment sinkSegment = adapter.get();

if (sinkSegment == null) {
// adapter can be null if this segment is removed (swapped to null) while being queried.
return Optional.empty();
}

SegmentReference segment = segmentMapFn.apply(sinkSegment);
while (true) {
Optional<Closeable> reference = segment.acquireReferences();
Expand All @@ -186,7 +192,8 @@ public Optional<Pair<SegmentReference, Closeable>> getSegmentForQuery(
// segment swap, the new segment should already be visible.
ReferenceCountingSegment newSinkSegment = adapter.get();
if (newSinkSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
// adapter can be null if this segment is removed (swapped to null) while being queried.
return Optional.empty();
}
if (sinkSegment == newSinkSegment) {
if (newSinkSegment.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -169,17 +171,17 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);

// Make sure this query type can handle the subquery, if present.
if ((dataSourceFromQuery instanceof QueryDataSource) && !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
if ((dataSourceFromQuery instanceof QueryDataSource)
&& !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}

// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = dataSourceFromQuery
.createSegmentMapFunction(
query,
cpuTimeAccumulator
);

final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery.createSegmentMapFunction(
query,
cpuTimeAccumulator
);

// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
Expand All @@ -200,44 +202,34 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final

final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
final List<SinkSegmentReference> segmentReferences =
theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);

Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
Iterables.transform(
theSink,
hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();

if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>());
}

// Prevent the underlying segment from swapping when its being iterated
final Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable =
hydrant.getSegmentForQuery(segmentMapFn);
if (segmentReferences == null) {
// We failed to acquire references for all subsegments. Bail and report the entire sink missing.
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
} else if (segmentReferences.isEmpty()) {
return new NoopQueryRunner<>();
}

// if optional isn't present, we failed to acquire reference to the segment or any joinables
if (!maybeSegmentAndCloseable.isPresent()) {
return new Pair<>(
hydrant.getSegmentDataInterval(),
new ReportTimelineMissingSegmentQueryRunner<>(descriptor)
);
}
final Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
try {
final Closeable releaser = () -> CloseableUtils.closeAll(segmentReferences);

QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);
try {
Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
Iterables.transform(
segmentReferences,
segmentReference -> {
QueryRunner<T> runner = factory.createRunner(segmentReference.getSegment());

// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
if (segmentReference.isImmutable() && cache.isLocal()) {
StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter();
long segmentMinTime = storageAdapter.getMinTime().getMillis();
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
cacheKeyPrefix,
descriptor,
actualDataInterval,
Expand All @@ -254,35 +246,33 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
cacheConfig
);
}
// Make it always use Closeable to decrement()
runner = QueryRunnerHelper.makeClosingQueryRunner(
runner,
segmentAndCloseable.rhs
);
return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs);
return new Pair<>(segmentReference.getSegment().getDataInterval(), runner);
}
}
)
);
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
DirectQueryProcessingPool.INSTANCE,
perHydrantRunners
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
)
);
return QueryRunnerHelper.makeClosingQueryRunner(
new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
DirectQueryProcessingPool.INSTANCE,
perHydrantRunners
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
),
releaser
);
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, releaser);
}
}
);
final QueryRunner<T> mergedRunner =
Expand Down Expand Up @@ -361,8 +351,16 @@ public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
return sinkTimeline;
}

public static String makeHydrantCacheIdentifier(FireHydrant input)
public static String makeHydrantCacheIdentifier(final FireHydrant hydrant)
{
return makeHydrantCacheIdentifier(hydrant.getSegmentId(), hydrant.getCount());
}

public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final int hydrantNumber)
{
return input.getSegmentId() + "_" + input.getCount();
// Cache ID like segmentId_H0, etc. The 'H' disambiguates subsegment [foo_x_y_z partition 0 hydrant 1]
// from full segment [foo_x_y_z partition 1], and is therefore useful if we ever want the cache to mix full segments
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
Expand All @@ -40,8 +44,12 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -50,14 +58,18 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
{
private static final IncrementalIndexAddResult ALREADY_SWAPPED =
new IncrementalIndexAddResult(-1, -1, "write after index swapped");
private static final Logger log = new Logger(Sink.class);

private final Object hydrantLock = new Object();
private final Interval interval;
Expand Down Expand Up @@ -228,6 +240,7 @@ public boolean finished()

/**
* Marks sink as 'finished', preventing further writes.
*
* @return 'true' if sink was sucessfully finished, 'false' if sink was already finished
*/
public boolean finishWriting()
Expand Down Expand Up @@ -288,6 +301,57 @@ public long getBytesInMemory()
}
}

/**
* Acquire references to all {@link FireHydrant} that represent this sink. Returns null if they cannot all be
* acquired, possibly because they were closed (swapped to null) concurrently with this method being called.
*
* @param segmentMapFn from {@link org.apache.druid.query.DataSource#createSegmentMapFunction(Query, AtomicLong)}
* @param skipIncrementalSegment whether in-memory {@link IncrementalIndex} segments should be skipped
*/
@Nullable
public List<SinkSegmentReference> acquireSegmentReferences(
final Function<SegmentReference, SegmentReference> segmentMapFn,
final boolean skipIncrementalSegment
)
{
final List<SinkSegmentReference> retVal = new ArrayList<>(hydrants.size());

try {
for (final FireHydrant hydrant : hydrants) {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();

if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
continue;
}

final Optional<Pair<SegmentReference, Closeable>> maybeHolder = hydrant.getSegmentForQuery(segmentMapFn);
if (maybeHolder.isPresent()) {
final Pair<SegmentReference, Closeable> holder = maybeHolder.get();
retVal.add(new SinkSegmentReference(hydrant.getCount(), holder.lhs, hydrantDefinitelySwapped, holder.rhs));
} else {
// Cannot acquire this hydrant. Release all others previously acquired and return null.
for (final SinkSegmentReference reference : retVal) {
reference.close();
}

return null;
}
}

return retVal;
}
catch (Throwable e) {
// Release all references previously acquired and throw the error.
for (final SinkSegmentReference reference : retVal) {
CloseableUtils.closeAndSuppressExceptions(reference, e::addSuppressed);
}

throw e;
}
}

private boolean checkInDedupSet(InputRow row)
{
if (dedupColumn != null) {
Expand Down Expand Up @@ -335,7 +399,8 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
.build();

// Build the incremental-index according to the spec that was chosen by the user
final IncrementalIndex newIndex = appendableIndexSpec.builder()
final IncrementalIndex newIndex = appendableIndexSpec
.builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
Expand Down
Loading

0 comments on commit 930c81e

Please sign in to comment.