From ddfd62d9a981e14ba197404eaf6c08646e92daa2 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 15 May 2024 11:39:23 +0530 Subject: [PATCH] Disable loading lookups by default in CompactionTask (#16420) This PR updates CompactionTask to not load any lookups by default, unless transformSpec is present. If transformSpec is present, we will make the decision based on context values, loading all lookups by default. This is done to ensure backward compatibility since transformSpec can reference lookups. If transform spec is not present and no context value is passed, we donot load any lookup. This behavior can be overridden by supplying lookupLoadingMode and lookupsToLoad in the task context. --- .../indexing/IndexerControllerContext.java | 14 +- .../druid/msq/indexing/MSQWorkerTask.java | 27 -- .../druid/msq/sql/MSQTaskQueryMaker.java | 4 +- .../msq/indexing/MSQControllerTaskTest.java | 5 +- .../druid/msq/indexing/MSQWorkerTaskTest.java | 13 +- .../apache/druid/msq/test/MSQTestBase.java | 17 +- .../indexing/common/task/CompactionTask.java | 9 + .../druid/indexing/common/task/Task.java | 7 +- .../ClientCompactionTaskQuerySerdeTest.java | 317 +++++++++--------- .../common/task/CompactionTaskTest.java | 30 ++ .../druid/indexing/common/task/TaskTest.java | 7 + .../lookup/cache/LookupLoadingSpec.java | 72 ++++ .../lookup/cache/LookupLoadingSpecTest.java | 124 +++++++ .../sql/calcite/planner/PlannerContext.java | 4 +- 14 files changed, 429 insertions(+), 221 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index e8fba09ddc51..17ac82d736ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -55,7 +55,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; -import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -271,16 +271,16 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); // Put the lookup loading info in the task context to facilitate selective loading of lookups. - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUP_LOADING_MODE, - controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) ); } - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUPS_TO_LOAD, - controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index a23c62881a00..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -38,13 +37,9 @@ import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerImpl; -import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -190,26 +185,4 @@ public int hashCode() { return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker); } - - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE); - if (lookupModeValue == null) { - return LookupLoadingSpec.ALL; - } - - final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); - if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { - return LookupLoadingSpec.NONE; - } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - Collection lookupsToLoad = (Collection) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { - throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); - } - return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); - } else { - return LookupLoadingSpec.ALL; - } - } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 533010c30575..4debe4d9d43e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -285,9 +285,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); final Map context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); } final MSQControllerTask controllerTask = new MSQControllerTask( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index a5001fb58ebd..9de14610f19c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -38,7 +38,6 @@ import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -114,8 +113,8 @@ public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() .dataSource("target") .context( ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) ) .build() ) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 5e79b129f3bd..3672e9d1c299 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; @@ -125,7 +124,7 @@ public void testGetDefaultLookupLoadingSpec() @Test public void testGetLookupLoadingWithModeNoneInContext() { - final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + final ImmutableMap context = ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); } @@ -134,8 +133,8 @@ public void testGetLookupLoadingWithModeNoneInContext() public void testGetLookupLoadingSpecWithLookupListInContext() { final ImmutableMap context = ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); @@ -145,10 +144,10 @@ public void testGetLookupLoadingSpecWithLookupListInContext() public void testGetLookupLoadingSpecWithInvalidInput() { final HashMap context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); // Setting CTX_LOOKUPS_TO_LOAD as null - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, null); MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( @@ -160,7 +159,7 @@ public void testGetLookupLoadingSpecWithInvalidInput() exception.getMessage()); // Setting CTX_LOOKUPS_TO_LOAD as empty list - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); exception = Assert.assertThrows( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index d9b6e28b32b5..e6fb74877d48 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -855,7 +855,7 @@ public abstract class MSQTester> protected CompactionState expectedLastCompactionState = null; protected Set expectedTombstoneIntervals = null; protected List expectedResultRows = null; - protected LookupLoadingSpec expectedLookupLoadingSpec = null; + protected LookupLoadingSpec expectedLookupLoadingSpec = LookupLoadingSpec.NONE; protected Matcher expectedValidationErrorMatcher = null; protected List, String>> adhocReportAssertionAndReasons = new ArrayList<>(); protected Matcher expectedExecutionErrorMatcher = null; @@ -1021,19 +1021,8 @@ public void verifyPlanningErrors() protected void verifyLookupLoadingInfoInTaskContext(Map context) { - String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); - List lookupsToLoad = (List) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (expectedLookupLoadingSpec != null) { - Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode); - if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) { - Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad); - } else { - Assert.assertNull(lookupsToLoad); - } - } else { - Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode); - Assert.assertNull(lookupsToLoad); - } + LookupLoadingSpec specFromContext = LookupLoadingSpec.createFromContext(context, LookupLoadingSpec.ALL); + Assert.assertEquals(expectedLookupLoadingSpec, specFromContext); } protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 833bcdd2fed9..81447f3fd5eb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; @@ -249,6 +250,14 @@ public CompactionTask( this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; + + // Do not load any lookups in sub-tasks launched by compaction task, unless transformSpec is present. + // If transformSpec is present, we will not modify the context so that the sub-tasks can make the + // decision based on context values, loading all lookups by default. + // This is done to ensure backward compatibility since transformSpec can reference lookups. + if (transformSpec == null) { + addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + } } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cdf7cea7e3f2..18d2ac7d6960 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -334,9 +334,14 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo { + binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); + binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); + binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); + binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); + } + ) + ) + ); + objectMapper.setInjectableValues(injectableValues); + objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); + return objectMapper; + } - final byte[] json = mapper.writeValueAsBytes(query); - final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class); - + private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask task) + { Assert.assertEquals(query.getId(), task.getId()); Assert.assertEquals(query.getDataSource(), task.getDataSource()); Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); @@ -226,8 +283,8 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException task.getDimensionsSpec().getDimensions() ); Assert.assertEquals( - query.getTransformSpec().getFilter(), - task.getTransformSpec().getFilter() + query.getTransformSpec(), + task.getTransformSpec() ); Assert.assertArrayEquals( query.getMetricsSpec(), @@ -235,16 +292,53 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException ); } - @Test - public void testCompactionTaskToClientCompactionTaskQuery() throws IOException + private ClientCompactionTaskQuery createCompactionTaskQuery(String id, ClientCompactionTaskTransformSpec transformSpec) { - final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - final CompactionTask.Builder builder = new CompactionTask.Builder( + Map context = new HashMap<>(); + context.put("key", "value"); + return new ClientCompactionTaskQuery( + id, "datasource", - new SegmentCacheManagerFactory(mapper), - new RetryPolicyFactory(new RetryPolicyConfig()) + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true + ), + new ClientCompactionTaskQueryTuningConfig( + 100, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + 30000L, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 + ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + METRICS_SPEC, + transformSpec, + context ); - final CompactionTask task = builder + } + + private CompactionTask createCompactionTask(ClientCompactionTaskTransformSpec transformSpec) + { + CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder( + "datasource", + new SegmentCacheManagerFactory(MAPPER), + new RetryPolicyFactory(new RetryPolicyConfig()) + ) .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) .tuningConfig( new ParallelIndexTuningConfig( @@ -256,18 +350,10 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null, null, null, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, 2, null, null, @@ -290,100 +376,17 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null ) ) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true)) + .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC) .dimensionsSpec( DimensionsSpec.builder() .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) .setDimensionExclusions(ImmutableList.of("__time", "val")) .build() ) - .metricsSpec(new AggregatorFactory[] {new CountAggregatorFactory("cnt")}) - .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) - .build(); + .metricsSpec(METRICS_SPEC) + .transformSpec(transformSpec) + .context(ImmutableMap.of("key", "value")); - final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( - task.getId(), - "datasource", - new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec( - Intervals.of("2019/2020"), - "testSha256OfSortedSegmentIds" - ), - true - ), - new ClientCompactionTaskQueryTuningConfig( - 100, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - 30000L, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), - new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), - new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, - new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), - new HashMap<>() - ); - - final byte[] json = mapper.writeValueAsBytes(task); - final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); - - Assert.assertEquals(expected, actual); - } - - private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) - { - final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); - objectMapper.setAnnotationIntrospectors( - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getSerializationConfig().getAnnotationIntrospector() - ), - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getDeserializationConfig().getAnnotationIntrospector() - ) - ); - GuiceInjectableValues injectableValues = new GuiceInjectableValues( - GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - binder -> { - binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); - binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); - binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); - binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); - binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); - binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); - } - ) - ) - ); - objectMapper.setInjectableValues(injectableValues); - objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); - return objectMapper; + return compactionTaskBuilder.build(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 4ccd3f498118..7a39b46631c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -133,6 +133,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; @@ -1738,6 +1739,35 @@ public void testChooseFinestGranularityAllNulls() Assert.assertNull(chooseFinestGranularityHelper(input)); } + @Test + public void testGetDefaultLookupLoadingSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .build(); + Assert.assertEquals(LookupLoadingSpec.NONE, task.getLookupLoadingSpec()); + } + + @Test + public void testGetDefaultLookupLoadingSpecWithTransformSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) + .build(); + Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec()); + } + private Granularity chooseFinestGranularityHelper(List granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java index c2957f6688c7..33502ecb3fb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.junit.Assert; import org.junit.Test; @@ -128,4 +129,10 @@ public void testGetInputSourceResources() TASK::getInputSourceResources ); } + + @Test + public void testGetLookupLoadingSpec() + { + Assert.assertEquals(LookupLoadingSpec.ALL, TASK.getLookupLoadingSpec()); + } } diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 88524fe27f96..4665bdd18cf4 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,6 +22,11 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -39,6 +44,10 @@ */ public class LookupLoadingSpec { + + public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; + public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; + public enum Mode { ALL, NONE, ONLY_REQUIRED @@ -80,6 +89,50 @@ public ImmutableSet getLookupsToLoad() return lookupsToLoad; } + public static LookupLoadingSpec createFromContext(Map context, LookupLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object lookupModeValue = context.get(CTX_LOOKUP_LOADING_MODE); + if (lookupModeValue == null) { + return defaultSpec; + } + + final LookupLoadingSpec.Mode lookupLoadingMode; + try { + lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values are %s", + CTX_LOOKUP_LOADING_MODE, lookupModeValue.toString(), Arrays.asList(LookupLoadingSpec.Mode.values())); + } + + if (lookupLoadingMode == Mode.NONE) { + return NONE; + } else if (lookupLoadingMode == Mode.ALL) { + return ALL; + } else if (lookupLoadingMode == Mode.ONLY_REQUIRED) { + Collection lookupsToLoad; + try { + lookupsToLoad = (Collection) context.get(CTX_LOOKUPS_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", + CTX_LOOKUPS_TO_LOAD, context.get(CTX_LOOKUPS_TO_LOAD)); + } + + if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { + throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); + } + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return defaultSpec; + } + } + @Override public String toString() { @@ -88,4 +141,23 @@ public String toString() ", lookupsToLoad=" + lookupsToLoad + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupLoadingSpec that = (LookupLoadingSpec) o; + return mode == that.mode && Objects.equals(lookupsToLoad, that.lookupsToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, lookupsToLoad); + } } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 8d0a7a5518a3..d36aff6914cb 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -19,13 +19,20 @@ package org.apache.druid.server.lookup.cache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import java.util.Arrays; import java.util.Set; +@RunWith(JUnitParamsRunner.class) public class LookupLoadingSpecTest { @Test @@ -59,4 +66,121 @@ public void testLoadingOnlyRequiredLookupsWithNullList() DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.loadOnly(null)); Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); } + + @Test + public void testCreateLookupLoadingSpecFromEmptyContext() + { + // Default spec is returned in the case of context not having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), + LookupLoadingSpec.ALL + ) + ); + + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), + LookupLoadingSpec.NONE + ) + ); + } + + @Test + public void testCreateLookupLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.NONE + ) + ); + + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateLookupLoadingSpecFromContext() + { + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.loadOnly(ImmutableSet.of("lookup1", "lookup2")), + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookup1", "lookup2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED + ), + LookupLoadingSpec.ALL + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE), + LookupLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ALL), + LookupLoadingSpec.NONE + ) + ); + } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), LookupLoadingSpec.ALL)); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); + } + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad, + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED), + LookupLoadingSpec.ALL) + ); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 99f721bffaa7..b7e2de3e66b2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -80,8 +80,6 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; - public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; - public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; /** @@ -357,7 +355,7 @@ public void addLookupToLoad(String lookupName) } /** - * Returns the lookup to load for a given task. + * Lookup loading spec used if this context corresponds to an MSQ task. */ public LookupLoadingSpec getLookupLoadingSpec() {