diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index e8fba09ddc51..17ac82d736ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -55,7 +55,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; -import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -271,16 +271,16 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); // Put the lookup loading info in the task context to facilitate selective loading of lookups. - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUP_LOADING_MODE, - controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) ); } - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUPS_TO_LOAD, - controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index a23c62881a00..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -38,13 +37,9 @@ import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerImpl; -import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -190,26 +185,4 @@ public int hashCode() { return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker); } - - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE); - if (lookupModeValue == null) { - return LookupLoadingSpec.ALL; - } - - final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); - if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { - return LookupLoadingSpec.NONE; - } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - Collection lookupsToLoad = (Collection) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { - throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); - } - return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); - } else { - return LookupLoadingSpec.ALL; - } - } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 533010c30575..4debe4d9d43e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -285,9 +285,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); final Map context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); } final MSQControllerTask controllerTask = new MSQControllerTask( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index a5001fb58ebd..9de14610f19c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -38,7 +38,6 @@ import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -114,8 +113,8 @@ public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() .dataSource("target") .context( ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) ) .build() ) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 5e79b129f3bd..3672e9d1c299 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; @@ -125,7 +124,7 @@ public void testGetDefaultLookupLoadingSpec() @Test public void testGetLookupLoadingWithModeNoneInContext() { - final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + final ImmutableMap context = ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); } @@ -134,8 +133,8 @@ public void testGetLookupLoadingWithModeNoneInContext() public void testGetLookupLoadingSpecWithLookupListInContext() { final ImmutableMap context = ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); @@ -145,10 +144,10 @@ public void testGetLookupLoadingSpecWithLookupListInContext() public void testGetLookupLoadingSpecWithInvalidInput() { final HashMap context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); // Setting CTX_LOOKUPS_TO_LOAD as null - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, null); MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( @@ -160,7 +159,7 @@ public void testGetLookupLoadingSpecWithInvalidInput() exception.getMessage()); // Setting CTX_LOOKUPS_TO_LOAD as empty list - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); exception = Assert.assertThrows( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index d9b6e28b32b5..e6fb74877d48 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -855,7 +855,7 @@ public abstract class MSQTester> protected CompactionState expectedLastCompactionState = null; protected Set expectedTombstoneIntervals = null; protected List expectedResultRows = null; - protected LookupLoadingSpec expectedLookupLoadingSpec = null; + protected LookupLoadingSpec expectedLookupLoadingSpec = LookupLoadingSpec.NONE; protected Matcher expectedValidationErrorMatcher = null; protected List, String>> adhocReportAssertionAndReasons = new ArrayList<>(); protected Matcher expectedExecutionErrorMatcher = null; @@ -1021,19 +1021,8 @@ public void verifyPlanningErrors() protected void verifyLookupLoadingInfoInTaskContext(Map context) { - String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); - List lookupsToLoad = (List) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (expectedLookupLoadingSpec != null) { - Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode); - if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) { - Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad); - } else { - Assert.assertNull(lookupsToLoad); - } - } else { - Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode); - Assert.assertNull(lookupsToLoad); - } + LookupLoadingSpec specFromContext = LookupLoadingSpec.createFromContext(context, LookupLoadingSpec.ALL); + Assert.assertEquals(expectedLookupLoadingSpec, specFromContext); } protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 833bcdd2fed9..81447f3fd5eb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; @@ -249,6 +250,14 @@ public CompactionTask( this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; + + // Do not load any lookups in sub-tasks launched by compaction task, unless transformSpec is present. + // If transformSpec is present, we will not modify the context so that the sub-tasks can make the + // decision based on context values, loading all lookups by default. + // This is done to ensure backward compatibility since transformSpec can reference lookups. + if (transformSpec == null) { + addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + } } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cdf7cea7e3f2..18d2ac7d6960 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -334,9 +334,14 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo { + binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); + binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); + binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); + binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); + } + ) + ) + ); + objectMapper.setInjectableValues(injectableValues); + objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); + return objectMapper; + } - final byte[] json = mapper.writeValueAsBytes(query); - final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class); - + private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask task) + { Assert.assertEquals(query.getId(), task.getId()); Assert.assertEquals(query.getDataSource(), task.getDataSource()); Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); @@ -226,8 +283,8 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException task.getDimensionsSpec().getDimensions() ); Assert.assertEquals( - query.getTransformSpec().getFilter(), - task.getTransformSpec().getFilter() + query.getTransformSpec(), + task.getTransformSpec() ); Assert.assertArrayEquals( query.getMetricsSpec(), @@ -235,16 +292,53 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException ); } - @Test - public void testCompactionTaskToClientCompactionTaskQuery() throws IOException + private ClientCompactionTaskQuery createCompactionTaskQuery(String id, ClientCompactionTaskTransformSpec transformSpec) { - final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - final CompactionTask.Builder builder = new CompactionTask.Builder( + Map context = new HashMap<>(); + context.put("key", "value"); + return new ClientCompactionTaskQuery( + id, "datasource", - new SegmentCacheManagerFactory(mapper), - new RetryPolicyFactory(new RetryPolicyConfig()) + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true + ), + new ClientCompactionTaskQueryTuningConfig( + 100, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + 30000L, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 + ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + METRICS_SPEC, + transformSpec, + context ); - final CompactionTask task = builder + } + + private CompactionTask createCompactionTask(ClientCompactionTaskTransformSpec transformSpec) + { + CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder( + "datasource", + new SegmentCacheManagerFactory(MAPPER), + new RetryPolicyFactory(new RetryPolicyConfig()) + ) .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) .tuningConfig( new ParallelIndexTuningConfig( @@ -256,18 +350,10 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null, null, null, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, 2, null, null, @@ -290,100 +376,17 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null ) ) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true)) + .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC) .dimensionsSpec( DimensionsSpec.builder() .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) .setDimensionExclusions(ImmutableList.of("__time", "val")) .build() ) - .metricsSpec(new AggregatorFactory[] {new CountAggregatorFactory("cnt")}) - .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) - .build(); + .metricsSpec(METRICS_SPEC) + .transformSpec(transformSpec) + .context(ImmutableMap.of("key", "value")); - final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( - task.getId(), - "datasource", - new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec( - Intervals.of("2019/2020"), - "testSha256OfSortedSegmentIds" - ), - true - ), - new ClientCompactionTaskQueryTuningConfig( - 100, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - 30000L, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), - new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), - new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, - new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), - new HashMap<>() - ); - - final byte[] json = mapper.writeValueAsBytes(task); - final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); - - Assert.assertEquals(expected, actual); - } - - private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) - { - final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); - objectMapper.setAnnotationIntrospectors( - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getSerializationConfig().getAnnotationIntrospector() - ), - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getDeserializationConfig().getAnnotationIntrospector() - ) - ); - GuiceInjectableValues injectableValues = new GuiceInjectableValues( - GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - binder -> { - binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); - binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); - binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); - binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); - binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); - binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); - } - ) - ) - ); - objectMapper.setInjectableValues(injectableValues); - objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); - return objectMapper; + return compactionTaskBuilder.build(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 4ccd3f498118..7a39b46631c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -133,6 +133,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; @@ -1738,6 +1739,35 @@ public void testChooseFinestGranularityAllNulls() Assert.assertNull(chooseFinestGranularityHelper(input)); } + @Test + public void testGetDefaultLookupLoadingSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .build(); + Assert.assertEquals(LookupLoadingSpec.NONE, task.getLookupLoadingSpec()); + } + + @Test + public void testGetDefaultLookupLoadingSpecWithTransformSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) + .build(); + Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec()); + } + private Granularity chooseFinestGranularityHelper(List granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java index c2957f6688c7..33502ecb3fb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.junit.Assert; import org.junit.Test; @@ -128,4 +129,10 @@ public void testGetInputSourceResources() TASK::getInputSourceResources ); } + + @Test + public void testGetLookupLoadingSpec() + { + Assert.assertEquals(LookupLoadingSpec.ALL, TASK.getLookupLoadingSpec()); + } } diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 88524fe27f96..4665bdd18cf4 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,6 +22,11 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -39,6 +44,10 @@ */ public class LookupLoadingSpec { + + public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; + public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; + public enum Mode { ALL, NONE, ONLY_REQUIRED @@ -80,6 +89,50 @@ public ImmutableSet getLookupsToLoad() return lookupsToLoad; } + public static LookupLoadingSpec createFromContext(Map context, LookupLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object lookupModeValue = context.get(CTX_LOOKUP_LOADING_MODE); + if (lookupModeValue == null) { + return defaultSpec; + } + + final LookupLoadingSpec.Mode lookupLoadingMode; + try { + lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values are %s", + CTX_LOOKUP_LOADING_MODE, lookupModeValue.toString(), Arrays.asList(LookupLoadingSpec.Mode.values())); + } + + if (lookupLoadingMode == Mode.NONE) { + return NONE; + } else if (lookupLoadingMode == Mode.ALL) { + return ALL; + } else if (lookupLoadingMode == Mode.ONLY_REQUIRED) { + Collection lookupsToLoad; + try { + lookupsToLoad = (Collection) context.get(CTX_LOOKUPS_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", + CTX_LOOKUPS_TO_LOAD, context.get(CTX_LOOKUPS_TO_LOAD)); + } + + if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { + throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); + } + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return defaultSpec; + } + } + @Override public String toString() { @@ -88,4 +141,23 @@ public String toString() ", lookupsToLoad=" + lookupsToLoad + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupLoadingSpec that = (LookupLoadingSpec) o; + return mode == that.mode && Objects.equals(lookupsToLoad, that.lookupsToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, lookupsToLoad); + } } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 8d0a7a5518a3..d36aff6914cb 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -19,13 +19,20 @@ package org.apache.druid.server.lookup.cache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import java.util.Arrays; import java.util.Set; +@RunWith(JUnitParamsRunner.class) public class LookupLoadingSpecTest { @Test @@ -59,4 +66,121 @@ public void testLoadingOnlyRequiredLookupsWithNullList() DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.loadOnly(null)); Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); } + + @Test + public void testCreateLookupLoadingSpecFromEmptyContext() + { + // Default spec is returned in the case of context not having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), + LookupLoadingSpec.ALL + ) + ); + + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), + LookupLoadingSpec.NONE + ) + ); + } + + @Test + public void testCreateLookupLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.NONE + ) + ); + + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateLookupLoadingSpecFromContext() + { + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.loadOnly(ImmutableSet.of("lookup1", "lookup2")), + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookup1", "lookup2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED + ), + LookupLoadingSpec.ALL + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE), + LookupLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ALL), + LookupLoadingSpec.NONE + ) + ); + } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), LookupLoadingSpec.ALL)); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); + } + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad, + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED), + LookupLoadingSpec.ALL) + ); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 99f721bffaa7..b7e2de3e66b2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -80,8 +80,6 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; - public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; - public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; /** @@ -357,7 +355,7 @@ public void addLookupToLoad(String lookupName) } /** - * Returns the lookup to load for a given task. + * Lookup loading spec used if this context corresponds to an MSQ task. */ public LookupLoadingSpec getLookupLoadingSpec() {