diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index a07558389fae..0abdf7f2c3c2 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -61,7 +61,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi |`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`| |`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| -|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| #### Task autoscaler diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 4c053d89455d..721e66f6f3af 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -53,7 +53,9 @@ public RabbitStreamIndexTaskIOConfig( @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("uri") String uri) + @JsonProperty("uri") String uri, + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes + ) { super( taskGroupId, @@ -63,7 +65,9 @@ public RabbitStreamIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat); + inputFormat, + refreshRejectionPeriodsInMinutes + ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.uri = uri; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 222022e3e66f..5e9d3c3d2c6a 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -59,7 +59,6 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - rabbitConfig.getUri()); + rabbitConfig.getUri(), + ioConfig.getTaskDuration().getStandardMinutes() + ); } @Override diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index e52ca2f29b64..b2ac927b17e1 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -37,6 +38,7 @@ import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory; import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -366,4 +368,49 @@ public void testReportPayload() Assert.assertEquals(30 * 60, payload.getDurationSeconds()); } + @Test + public void testCreateTaskIOConfig() + { + supervisor = getSupervisor( + 1, + 1, + false, + "PT30M", + null, + null, + RabbitStreamSupervisorTest.dataSchema, + tuningConfig + ); + + SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig( + 1, + ImmutableMap.of(), + ImmutableMap.of(), + "test", + null, + null, + ImmutableSet.of(), + new RabbitStreamSupervisorIOConfig( + STREAM, // stream + URI, // uri + INPUT_FORMAT, // inputFormat + 1, // replicas + 1, // taskCount + new Period("PT30M"), // taskDuration + null, // consumerProperties + null, // autoscalerConfig + 400L, // poll timeout + new Period("P1D"), // start delat + new Period("PT30M"), // period + new Period("PT30S"), // completiontimeout + false, // useearliest + null, // latemessagerejection + null, // early message rejection + null, // latemessagerejectionstartdatetime + 1 + ) + ); + + Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue()); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 82c9ad71c973..07c0f80fbe83 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -63,7 +63,8 @@ public KafkaIndexTaskIOConfig( @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, - @JsonProperty("multiTopic") @Nullable Boolean multiTopic + @JsonProperty("multiTopic") @Nullable Boolean multiTopic, + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes ) { super( @@ -76,7 +77,8 @@ public KafkaIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + refreshRejectionPeriodsInMinutes ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -107,7 +109,8 @@ public KafkaIndexTaskIOConfig( DateTime minimumMessageTime, DateTime maximumMessageTime, InputFormat inputFormat, - KafkaConfigOverrides configOverrides + KafkaConfigOverrides configOverrides, + Long refreshRejectionPeriodsInMinutes ) { this( @@ -124,7 +127,8 @@ public KafkaIndexTaskIOConfig( maximumMessageTime, inputFormat, configOverrides, - KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC + KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, + refreshRejectionPeriodsInMinutes ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index aebacecff662..2618c22495ea 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -218,7 +218,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), - kafkaIoConfig.isMultiTopic() + kafkaIoConfig.isMultiTopic(), + ioConfig.getTaskDuration().getStandardMinutes() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index db8db1fdb38c..3bae10a1969b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -120,6 +120,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -357,7 +358,8 @@ public void testRunAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -413,7 +415,8 @@ public void testIngestNullColumnAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -461,7 +464,8 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -496,7 +500,8 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception null, null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -537,7 +542,8 @@ public void testRunBeforeDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -588,7 +594,8 @@ public void testRunAfterDataInsertedLiveReport() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -665,7 +672,8 @@ public void testIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -768,7 +776,8 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -894,7 +903,8 @@ public void testTimeBasedIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -972,7 +982,8 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -1035,7 +1046,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask staleReplica = createTask( @@ -1051,7 +1063,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1117,7 +1130,8 @@ public void testRunWithMinimumMessageTime() throws Exception DateTimes.of("2010"), null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1165,7 +1179,8 @@ public void testRunWithMaximumMessageTime() throws Exception null, DateTimes.of("2010"), INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1222,7 +1237,8 @@ public void testRunWithTransformSpec() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1293,7 +1309,8 @@ public void testKafkaRecordEntityInputFormat() throws Exception null, null, new TestKafkaInputFormat(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1365,7 +1382,8 @@ public void testKafkaInputFormat() throws Exception null, null, KAFKA_INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1416,7 +1434,8 @@ public void testRunOnNothing() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1451,7 +1470,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1497,7 +1517,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1548,7 +1569,8 @@ public void testReportParseExceptions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1586,7 +1608,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1678,7 +1701,8 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1744,7 +1768,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1760,7 +1785,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1808,7 +1834,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1824,7 +1851,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1874,7 +1902,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1890,7 +1919,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1938,7 +1968,8 @@ public void testRunOneTaskTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1984,7 +2015,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -2000,7 +2032,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2050,7 +2083,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2084,7 +2118,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2134,7 +2169,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2177,7 +2213,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2228,7 +2265,8 @@ public void testRunWithPauseAndResume() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2299,7 +2337,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2336,7 +2375,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2383,7 +2423,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2427,7 +2468,8 @@ public void testRunWithDuplicateRequest() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2466,7 +2508,8 @@ public void testRunTransactionModeRollback() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2547,7 +2590,8 @@ public void testRunUnTransactionMode() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2607,7 +2651,8 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -2630,7 +2675,8 @@ public void testRunWithoutDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2676,7 +2722,8 @@ public void testSerde() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2708,7 +2755,8 @@ public void testCorrectInputSources() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2968,7 +3016,8 @@ public void testMultipleLinesJSONText() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3029,7 +3078,8 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3101,7 +3151,8 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3175,7 +3226,8 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3227,7 +3279,8 @@ public void testCompletionReportPartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3281,7 +3334,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7926a0568fd9..8f2f3167ecd9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -107,6 +107,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -496,7 +497,8 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), new KafkaIndexTaskTuningConfig( null, @@ -5641,7 +5643,8 @@ private KafkaIndexTask createKafkaIndexTask( minimumMessageTime, maximumMessageTime, INPUT_FORMAT, - null + null, + Duration.standardHours(2).getStandardMinutes() ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 881d68ba8968..2df59bbca35f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -78,7 +78,8 @@ public KinesisIndexTaskIOConfig( @JsonProperty("endpoint") String endpoint, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, - @JsonProperty("awsExternalId") String awsExternalId + @JsonProperty("awsExternalId") String awsExternalId, + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes ) { super( @@ -89,7 +90,8 @@ public KinesisIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + refreshRejectionPeriodsInMinutes ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() @@ -117,7 +119,8 @@ public KinesisIndexTaskIOConfig( String endpoint, Integer fetchDelayMillis, String awsAssumedRoleArn, - String awsExternalId + String awsExternalId, + Long refreshRejectionPeriodsInMinutes ) { this( @@ -135,7 +138,8 @@ public KinesisIndexTaskIOConfig( endpoint, fetchDelayMillis, awsAssumedRoleArn, - awsExternalId + awsExternalId, + refreshRejectionPeriodsInMinutes ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 2f00c8c16cc9..2391b1265bab 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -145,7 +145,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getEndpoint(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), - ioConfig.getAwsExternalId() + ioConfig.getAwsExternalId(), + ioConfig.getTaskDuration().getStandardMinutes() ); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index 3162b2ea0eee..000be2830d6e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.indexing.IOConfig; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -267,7 +268,8 @@ public void testDeserializeToOldIoConfig() throws IOException "endpoint", 2000, "awsAssumedRoleArn", - "awsExternalId" + "awsExternalId", + Duration.standardHours(2).getStandardMinutes() ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index e84581af6013..ea4431c212dc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -40,6 +40,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -94,7 +95,8 @@ public class KinesisIndexTaskSerdeTest "endpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ); private static final String ACCESS_KEY = "test-access-key"; private static final String SECRET_KEY = "test-secret-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 088bb177d0e6..eb82040a8133 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -89,6 +89,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -785,7 +786,8 @@ public void testRunWithMinimumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -847,7 +849,8 @@ public void testRunWithMaximumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1946,7 +1949,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2108,7 +2112,8 @@ public void testSequencesFromContext() throws IOException "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2309,7 +2314,8 @@ private KinesisIndexTask createTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ), null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 122a8e1c5ae8..dc10e348138a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -94,6 +94,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -5563,7 +5564,8 @@ private KinesisIndexTask createKinesisIndexTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2).getStandardMinutes() ), Collections.emptyMap(), false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..d43b83d78d07 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; @@ -244,32 +243,6 @@ public StreamAppenderatorDriver newDriver( ); } - public boolean withinMinMaxRecordTime(final InputRow row) - { - final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() - && ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp()); - - final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() - && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); - - if (log.isDebugEnabled()) { - if (beforeMinimumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMinimumMessageTime().get() - ); - } else if (afterMaximumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMaximumMessageTime().get() - ); - } - } - return !beforeMinimumMessageTime && !afterMaximumMessageTime; - } - @Override public String getTaskAllocatorId() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 6526bb81b1e3..928149cdee44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -42,6 +42,7 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; + private final Long refreshRejectionPeriodsInMinutes; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -51,7 +52,8 @@ public SeekableStreamIndexTaskIOConfig( final Boolean useTransaction, final DateTime minimumMessageTime, final DateTime maximumMessageTime, - @Nullable final InputFormat inputFormat + @Nullable final InputFormat inputFormat, + @Nullable final Long refreshRejectionPeriodsInMinutes // can be null for backward compabitility ) { this.taskGroupId = taskGroupId; @@ -62,6 +64,7 @@ public SeekableStreamIndexTaskIOConfig( this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.inputFormat = inputFormat; + this.refreshRejectionPeriodsInMinutes = refreshRejectionPeriodsInMinutes; Preconditions.checkArgument( startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()), @@ -134,4 +137,11 @@ public InputFormat getInputFormat() { return inputFormat; } + + @Nullable + @JsonProperty + public Long getRefreshRejectionPeriodsInMinutes() + { + return refreshRejectionPeriodsInMinutes; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ab437eb7a60a..42dcef39bc85 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -246,6 +247,9 @@ public enum Status private final Map partitionsThroughput = new HashMap<>(); + private volatile DateTime minMessageTime; + private volatile DateTime maxMessageTime; + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, @@ -267,6 +271,18 @@ public SeekableStreamIndexTaskRunner( this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; + minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); + maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); + + if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) { + Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") + .scheduleWithFixedDelay( + this::refreshMinMaxMessageTime, + ioConfig.getRefreshRejectionPeriodsInMinutes(), + ioConfig.getRefreshRejectionPeriodsInMinutes(), + TimeUnit.MINUTES + ); + } resetNextCheckpointTime(); } @@ -388,7 +404,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception inputRowSchema, task.getDataSchema().getTransformSpec(), toolbox.getIndexingTmpDir(), - row -> row != null && task.withinMinMaxRecordTime(row), + row -> row != null && withinMinMaxRecordTime(row), rowIngestionMeters, parseExceptionHandler ); @@ -2092,4 +2108,35 @@ protected abstract void possiblyResetDataSourceMetadata( protected abstract boolean isEndOffsetExclusive(); protected abstract TypeReference>> getSequenceMetadataTypeReference(); + + private void refreshMinMaxMessageTime() + { + minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + + log.info(StringUtils.format("Updated min and max messsage times to %s and %s respectively.", minMessageTime, maxMessageTime)); + } + + public boolean withinMinMaxRecordTime(final InputRow row) + { + final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp()); + + if (log.isDebugEnabled()) { + if (beforeMinimumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + row.getTimestamp(), + minMessageTime + ); + } else if (afterMaximumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + row.getTimestamp(), + maxMessageTime + ); + } + } + return !beforeMinimumMessageTime && !afterMaximumMessageTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index 8b1bd4fb0963..d3b69d438d5f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceType; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -380,7 +381,8 @@ public TestSeekableStreamIndexTaskIOConfig() false, DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), - new CsvInputFormat(null, null, true, null, 0, null) + new CsvInputFormat(null, null, true, null, 0, null), + Duration.standardHours(2).getStandardMinutes() ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java new file mode 100644 index 000000000000..f78fd680e36f --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +@RunWith(MockitoJUnitRunner.class) +public class SeekableStreamIndexTaskRunnerTest +{ + @Mock + private InputRow row; + + @Mock + private SeekableStreamIndexTask task; + + @Test + public void testWithinMinMaxTime() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2))); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2))); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Mockito.when(row.getTimestamp()).thenReturn(now); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + } + + @Test + public void testWithinMinMaxTimeNotPopulated() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null); + // min max time not populated. + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + } + + static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner + { + public TestasbleSeekableStreamIndexTaskRunner( + SeekableStreamIndexTask task, + @Nullable InputRowParser parser, + AuthorizerMapper authorizerMapper, + LockGranularity lockGranularityToUse + ) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(Object seqNum) + { + return false; + } + + @Nullable + @Override + protected TreeMap getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + { + return null; + } + + @Override + protected Object getNextStartOffset(Object sequenceNumber) + { + return null; + } + + @Override + protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return null; + } + + @Override + protected List getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox) + { + return null; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetadata(SeekableStreamSequenceNumbers partitions) + { + return null; + } + + @Override + protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber) + { + return null; + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference> getSequenceMetadataTypeReference() + { + return null; + } + + @Override + protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier recordSupplier, Set assignment) + { + + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3281360f5806..5f5aeb0ca853 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -196,7 +196,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..8a3d36ea67fe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2807,7 +2807,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { }; @@ -3166,7 +3167,8 @@ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() ) { };