diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 557198f14244..10c775aa0a30 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -61,8 +61,10 @@ import org.apache.druid.segment.realtime.sink.Sink; import org.apache.druid.segment.realtime.sink.SinkSegmentReference; import org.apache.druid.server.ResourceIdPopulatingQueryRunner; +import org.apache.druid.timeline.Overshadowable; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; @@ -74,8 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -90,7 +90,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final String dataSource; - private final VersionedIntervalTimeline sinkTimeline; + // Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions + private final VersionedIntervalTimeline upgradedSegmentsTimeline; private final ObjectMapper objectMapper; private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; @@ -98,12 +99,10 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; - private final ConcurrentMap newIdToBasePendingSegment - = new ConcurrentHashMap<>(); public SinkQuerySegmentWalker( String dataSource, - VersionedIntervalTimeline sinkTimeline, + VersionedIntervalTimeline upgradedSegmentsTimeline, ObjectMapper objectMapper, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, @@ -114,7 +113,7 @@ public SinkQuerySegmentWalker( ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); - this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline"); + this.upgradedSegmentsTimeline = upgradedSegmentsTimeline; this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); @@ -133,7 +132,7 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final { final Iterable specs = FunctionalIterable .create(intervals) - .transformCat(sinkTimeline::lookup) + .transformCat(upgradedSegmentsTimeline::lookup) .transformCat( holder -> FunctionalIterable .create(holder.getObject()) @@ -196,9 +195,8 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final LinkedHashMap>> allRunners = new LinkedHashMap<>(); try { - for (final SegmentDescriptor newDescriptor : specs) { - final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); - final PartitionChunk chunk = sinkTimeline.findChunk( + for (final SegmentDescriptor descriptor : specs) { + final PartitionChunk chunk = upgradedSegmentsTimeline.findChunk( descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber() @@ -212,7 +210,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final continue; } - final Sink theSink = chunk.getObject(); + final Sink theSink = chunk.getObject().sink; final SegmentId sinkSegmentId = theSink.getSegment().getId(); segmentIdMap.put(descriptor, sinkSegmentId); final List sinkSegmentReferences = @@ -361,26 +359,48 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } } - public void registerUpgradedPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec upgradedPendingSegment - ) + /** + * Must be called when a segment is announced by a task. + * Either the base segment upon allocation or any upgraded version due to a concurrent replace + */ + public void registerUpgradedPendingSegment(SegmentIdWithShardSpec id, Sink sink) { - newIdToBasePendingSegment.put( - upgradedPendingSegment.asSegmentId().toDescriptor(), - basePendingSegment.asSegmentId().toDescriptor() + final SegmentDescriptor upgradedDescriptor = id.asSegmentId().toDescriptor(); + upgradedSegmentsTimeline.add( + upgradedDescriptor.getInterval(), + upgradedDescriptor.getVersion(), + IntegerPartitionChunk.make( + null, + null, + upgradedDescriptor.getPartitionNumber(), + new SinkHolder(upgradedDescriptor, sink) + ) ); } - @VisibleForTesting - String getDataSource() + /** + * Must be called when dropping sink from the sinkTimeline + * It is the responsibility of the caller to unregister all associated ids including the base id + */ + public void unregisterUpgradedPendingSegment(SegmentIdWithShardSpec id, Sink sink) { - return dataSource; + final SegmentDescriptor upgradedDescriptor = id.asSegmentId().toDescriptor(); + upgradedSegmentsTimeline.remove( + upgradedDescriptor.getInterval(), + upgradedDescriptor.getVersion(), + IntegerPartitionChunk.make( + null, + null, + upgradedDescriptor.getPartitionNumber(), + new SinkHolder(upgradedDescriptor, sink) + ) + ); } - public VersionedIntervalTimeline getSinkTimeline() + @VisibleForTesting + String getDataSource() { - return sinkTimeline; + return dataSource; } public static String makeHydrantCacheIdentifier(final FireHydrant hydrant) @@ -395,4 +415,46 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final // with subsegments (hydrants). return segmentId + "_H" + hydrantNumber; } + + private static class SinkHolder implements Overshadowable + { + private final Sink sink; + private final SegmentDescriptor segmentDescriptor; + + private SinkHolder(SegmentDescriptor segmentDescriptor, Sink sink) + { + this.segmentDescriptor = segmentDescriptor; + this.sink = sink; + } + + @Override + public int getStartRootPartitionId() + { + return segmentDescriptor.getPartitionNumber(); + } + + @Override + public int getEndRootPartitionId() + { + return segmentDescriptor.getPartitionNumber() + 1; + } + + @Override + public String getVersion() + { + return segmentDescriptor.getVersion(); + } + + @Override + public short getMinorVersion() + { + return 0; + } + + @Override + public short getAtomicUpdateGroupSize() + { + return 1; + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index b1cc63d2d565..bef8f0c82e77 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -82,7 +82,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -95,6 +94,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -150,7 +150,6 @@ public class StreamAppenderator implements Appenderator private final ConcurrentMap sinks = new ConcurrentHashMap<>(); private final ConcurrentMap idToPendingSegment = new ConcurrentHashMap<>(); private final Set droppingSinks = Sets.newConcurrentHashSet(); - private final VersionedIntervalTimeline sinkTimeline; private final long maxBytesTuningConfig; private final boolean skipBytesInMemoryOverheadCheck; private final boolean useMaxMemoryEstimates; @@ -250,14 +249,6 @@ public class StreamAppenderator implements Appenderator this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); - if (sinkQuerySegmentWalker == null) { - this.sinkTimeline = new VersionedIntervalTimeline<>( - String.CASE_INSENSITIVE_ORDER - ); - } else { - this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline(); - } - maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); this.useMaxMemoryEstimates = useMaxMemoryEstimates; @@ -612,12 +603,35 @@ public void clear() throws InterruptedException } } + /** + * Mark a given version of a segment as abandoned and return its base segment if it can be dropped. + * Return null if there are other valid versions of the segment that are yet to be dropped. + */ + private SegmentIdWithShardSpec abandonUpgradedIdentifier(final SegmentIdWithShardSpec identifier) + { + final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier); + synchronized (abandonedSegments) { + abandonedSegments.add(identifier); + if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { + Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); + relevantSegments.removeAll(abandonedSegments); + // If there are unabandoned segments associated with the sink, return early + // This may be the case if segments have been upgraded as the result of a concurrent replace + if (!relevantSegments.isEmpty()) { + return null; + } + } + } + return baseIdentifier; + } + @Override public ListenableFuture drop(final SegmentIdWithShardSpec identifier) { - final Sink sink = sinks.get(identifier); + final SegmentIdWithShardSpec baseIdentifier = abandonUpgradedIdentifier(identifier); + final Sink sink = baseIdentifier == null ? null : sinks.get(baseIdentifier); if (sink != null) { - return abandonSegment(identifier, sink, true); + return abandonSegment(baseIdentifier, sink, true); } else { return Futures.immediateFuture(null); } @@ -1107,13 +1121,17 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() /** * Unannounces the given base segment and all its upgraded versions. + * + * @param baseSegment base segment + * @param sink sink corresponding to the base segment + * @return the set of all segment ids associated with the base segment containing the upgraded ids and itself. */ - private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) + private Set unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) { synchronized (sink) { final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { - return; + return Collections.emptySet(); } final Set upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId); @@ -1132,6 +1150,7 @@ private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) unannounceSegment(newSegment); upgradedSegmentToBaseSegment.remove(newId); } + return upgradedVersionsOfSegment; } } @@ -1155,11 +1174,13 @@ public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRe return; } + final Sink sink = sinks.get(basePendingSegment); + // Update query mapping with SinkQuerySegmentWalker - ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment); + ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(upgradedPendingSegment, sink); // Announce segments - final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment(); + final DataSegment baseSegment = sink.getSegment(); final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment); segmentAnnouncer.announceSegment(newSegment); @@ -1434,11 +1455,7 @@ private void addSink(SegmentIdWithShardSpec identifier, Sink sink) baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); baseSegmentToUpgradedSegments.get(identifier).add(identifier); - sinkTimeline.add( - sink.getInterval(), - sink.getVersion(), - identifier.getShardSpec().createChunk(sink) - ); + ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(identifier, sink); } private ListenableFuture abandonSegment( @@ -1447,19 +1464,6 @@ private ListenableFuture abandonSegment( final boolean removeOnDiskData ) { - abandonedSegments.add(identifier); - final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier); - synchronized (sink) { - if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { - Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); - relevantSegments.removeAll(abandonedSegments); - // If there are unabandoned segments associated with the sink, return early - // This may be the case if segments have been upgraded as the result of a concurrent replace - if (!relevantSegments.isEmpty()) { - return Futures.immediateFuture(null); - } - } - } // Ensure no future writes will be made to this sink. if (sink.finishWriting()) { // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, @@ -1477,7 +1481,7 @@ private ListenableFuture abandonSegment( } // Mark this identifier as dropping, so no future push tasks will pick it up. - droppingSinks.add(baseIdentifier); + droppingSinks.add(identifier); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( @@ -1488,8 +1492,8 @@ private ListenableFuture abandonSegment( @Override public Void apply(@Nullable Object input) { - if (!sinks.remove(baseIdentifier, sink)) { - log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier); + if (!sinks.remove(identifier, sink)) { + log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier); return null; } @@ -1497,17 +1501,17 @@ public Void apply(@Nullable Object input) if (removeOnDiskData) { // Remove this segment from the committed list. This must be done from the persist thread. - log.debug("Removing commit metadata for segment[%s].", baseIdentifier); + log.debug("Removing commit metadata for segment[%s].", identifier); try { commitLock.lock(); final Committed oldCommit = readCommit(); if (oldCommit != null) { - writeCommit(oldCommit.without(baseIdentifier.toString())); + writeCommit(oldCommit.without(identifier.toString())); } } catch (Exception e) { log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource()) - .addData("identifier", baseIdentifier.toString()) + .addData("identifier", identifier.toString()) .emit(); throw new RuntimeException(e); } @@ -1516,15 +1520,14 @@ public Void apply(@Nullable Object input) } } - unannounceAllVersionsOfSegment(sink.getSegment(), sink); + final Set allVersionIds = unannounceAllVersionsOfSegment(sink.getSegment(), sink); Runnable removeRunnable = () -> { - droppingSinks.remove(baseIdentifier); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - baseIdentifier.getShardSpec().createChunk(sink) - ); + droppingSinks.remove(identifier); + for (SegmentIdWithShardSpec id : allVersionIds) { + // Update query mapping with SinkQuerySegmentWalker + ((SinkQuerySegmentWalker) texasRanger).unregisterUpgradedPendingSegment(id, sink); + } for (FireHydrant hydrant : sink) { if (cache != null) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); @@ -1533,7 +1536,7 @@ public Void apply(@Nullable Object input) } if (removeOnDiskData) { - removeDirectory(computePersistDir(baseIdentifier)); + removeDirectory(computePersistDir(identifier)); } log.info("Dropped segment[%s].", identifier); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 0088e33ca7a8..dc1c3b599a95 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -65,7 +65,6 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.SegmentGenerationMetrics; -import org.apache.druid.segment.realtime.sink.Sink; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -353,12 +352,9 @@ public DatasourceBundle( { this.taskAppenderatorMap = new HashMap<>(); - VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>( - String.CASE_INSENSITIVE_ORDER - ); this.walker = new SinkQuerySegmentWalker( dataSource, - sinkTimeline, + new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER), objectMapper, serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 46580a5eeeed..eaa24af3d6c5 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Druids; import org.apache.druid.query.Order; import org.apache.druid.query.QueryPlus; @@ -1132,6 +1133,730 @@ ScheduledFuture getLastScheduledFuture() } } + @Test + public void testQueryBySegments_withSegmentVersionUpgrades() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .build()) { + + final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator(); + appenderator.startJob(); + + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); + // Segment0 for interval upgraded after appends + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2000/2001", "B", 1), + si("2000/2001", "B", 1).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + // Segment1 allocated for version B + appenderator.add(si("2000/2001", "B", 2), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil())); + + appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil())); + // Concurrent replace registers a segment version upgrade for the second interval + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2001/2002", "B", 1), + si("2001/2002", "B", 1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil())); + // Another Concurrent replace registers upgrade with version C for the second interval + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2001/2002", "C", 7), + si("2001/2002", "C", 7).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + + // Query1: segment #2 + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + IDENTIFIERS.get(2).getInterval(), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ) + ) + ) + ) + .build(); + final TimeseriesQuery query1_B = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + IDENTIFIERS.get(2).getInterval(), + "B", + 1 + ) + ) + ) + ) + .build(); + final TimeseriesQuery query1_C = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + IDENTIFIERS.get(2).getInterval(), + "C", + 7 + ) + ) + ) + ) + .build(); + + final List> results1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + ) + ), + results1 + ); + final List> results1_B = + QueryPlus.wrap(query1_B).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1_B", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + ) + ), + results1_B + ); + final List> results1_C = + QueryPlus.wrap(query1_C).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1_C", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + ) + ), + results1_C + ); + + // Query2: segment #2, partial + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ) + ) + ) + ) + .build(); + final TimeseriesQuery query2_B = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "B", + 1 + ) + ) + ) + ) + .build(); + final TimeseriesQuery query2_C = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "C", + 7 + ) + ) + ) + ) + .build(); + + final List> results2 = + QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query2", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L)) + ) + ), + results2 + ); + final List> results2_B = + QueryPlus.wrap(query2_B).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query2_B", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L)) + ) + ), + results2_B + ); + final List> results2_C = + QueryPlus.wrap(query2_C).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query2_C", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L)) + ) + ), + results2_C + ); + + // Query3: segment #2, two disjoint intervals + final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ) + ) + ) + ) + .build(); + final TimeseriesQuery query3_B = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "B", + 1 + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + "B", + 1 + ) + ) + ) + ) + .build(); + final TimeseriesQuery query3_C = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "C", + 7 + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + "C", + 7 + ) + ) + ) + ) + .build(); + + final List> results3 = + QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query3", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L)) + ) + ), + results3 + ); + final List> results3_B = + QueryPlus.wrap(query3_B).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query3_B", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L)) + ) + ), + results3_B + ); + final List> results3_C = + QueryPlus.wrap(query3_C).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query3_C", + ImmutableList.of( + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L)) + ) + ), + results3_C + ); + + final ScanQuery query4 = Druids.newScanQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ) + ) + ) + ) + .order(Order.ASCENDING) + .batchSize(10) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final ScanQuery query4_B = Druids.newScanQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "B", + 1 + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + "B", + 1 + ) + ) + ) + ) + .order(Order.ASCENDING) + .batchSize(10) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final ScanQuery query4_C = Druids.newScanQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + "C", + 7 + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + "C", + 7 + ) + ) + ) + ) + .order(Order.ASCENDING) + .batchSize(10) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final List results4 = + QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L}, + ((List) ((List) results4.get(0).getEvents()).get(0)).toArray() + ); + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, + ((List) ((List) results4.get(1).getEvents()).get(0)).toArray() + ); + final List results4_B = + QueryPlus.wrap(query4_B).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals(2, results4_B.size()); // 2 segments, 1 row per segment + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_B.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L}, + ((List) ((List) results4_B.get(0).getEvents()).get(0)).toArray() + ); + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_B.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, + ((List) ((List) results4_B.get(1).getEvents()).get(0)).toArray() + ); + final List results4_C = + QueryPlus.wrap(query4_C).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals(2, results4_C.size()); // 2 segments, 1 row per segment + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_C.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L}, + ((List) ((List) results4_C.get(0).getEvents()).get(0)).toArray() + ); + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_C.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, + ((List) ((List) results4_C.get(1).getEvents()).get(0)).toArray() + ); + } + } + + @Test + public void testQueryByIntervals_withSegmentVersionUpgrades() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .build()) { + final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator(); + + appenderator.startJob(); + + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); + // Segment0 for interval upgraded after appends + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2000/2001", "B", 1), + si("2000/2001", "B", 1).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(0).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + // Segment1 allocated for version B + appenderator.add(si("2000/2001", "B", 2), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil())); + + appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil())); + // Concurrent replace registers a segment version upgrade for the second interval + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2001/2002", "B", 1), + si("2001/2002", "B", 1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil())); + // Another Concurrent replace registers upgrade with version C for the second interval + appenderator.registerUpgradedPendingSegment( + new PendingSegmentRecord( + si("2001/2002", "C", 7), + si("2001/2002", "C", 7).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString(), + StreamAppenderatorTester.DATASOURCE + ) + ); + + // Query1: 2000/2001 + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ) + ), + results1 + ); + + // Query2: 2000/2002 + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2002"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results2 = + QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query2", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + ) + ), + results2 + ); + + // Query3: 2000/2001T01 + final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001T01"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results3 = + QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L)) + ) + ), + results3 + ); + + // Query4: 2000/2001T01, 2001T03/2001T04 + final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals( + ImmutableList.of( + Intervals.of("2000/2001T01"), + Intervals.of("2001T03/2001T04") + ) + ) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results4 = + QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L)) + ) + ), + results4 + ); + + // Drop segment + appenderator.drop(IDENTIFIERS.get(0)).get(); + // Drop its upgraded version (Drop happens for each version on handoff) + appenderator.drop(si("2000/2001", "B", 1)).get(); + + final List> resultsAfterDrop1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ) + ), + resultsAfterDrop1 + ); + + final List> resultsAfterDrop2 = + QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query2", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + ) + ), + resultsAfterDrop2 + ); + + final List> resultsAfterDrop3 = + QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L)) + ) + ), + resultsAfterDrop3 + ); + + final List> resultsAfterDrop4 = + QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ), + new Result<>( + DateTimes.of("2001"), + new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L)) + ) + ), + resultsAfterDrop4 + ); + } + } + @Test public void testQueryByIntervals() throws Exception {