From 5ef94c9deebea6cc52dcc504b83f451c73d5c036 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Thu, 12 Sep 2024 13:30:28 -0400 Subject: [PATCH] Add support for selective loading of broadcast datasources in the task layer (#17027) Tasks control the loading of broadcast datasources via BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec(). By default, tasks download all broadcast datasources, unless there's an override as with kill and MSQ controller task. The CLIPeon command line option --loadBroadcastSegments is deprecated in favor of --loadBroadcastDatasourceMode. Broadcast datasources can be specified in SQL queries through JOIN and FROM clauses, or obtained from other sources such as lookups.To this effect, we have introduced a BroadcastDatasourceLoadingSpec. Finding the set of broadcast datasources during SQL planning will be done in a follow-up, which will apply only to MSQ tasks, so they load only required broadcast datasources. This PR primarily focuses on the skeletal changes around BroadcastDatasourceLoadingSpec and integrating it from the Task interface via CliPeon to SegmentBootstrapper. Currently, only kill tasks and MSQ controller tasks skip loading broadcast datasources. --- .../overlord/common/DruidK8sConstants.java | 1 + .../overlord/taskadapter/K8sTaskAdapter.java | 6 +- .../taskadapter/PodTemplateTaskAdapter.java | 4 + .../PodTemplateTaskAdapterTest.java | 43 ++++- .../src/test/resources/expectedNoopJob.yaml | 2 + .../test/resources/expectedNoopJobBase.yaml | 2 + .../resources/expectedNoopJobLongIds.yaml | 2 + .../resources/expectedNoopJobNoTaskJson.yaml | 2 + .../expectedNoopJobTlsEnabledBase.yaml | 2 + .../druid/msq/indexing/MSQControllerTask.java | 7 + .../msq/indexing/MSQControllerTaskTest.java | 17 ++ .../common/task/KillUnusedSegmentsTask.java | 7 + .../druid/indexing/common/task/Task.java | 13 +- .../indexing/overlord/ForkingTaskRunner.java | 6 +- .../task/KillUnusedSegmentsTaskTest.java | 11 ++ .../BroadcastDatasourceLoadingSpec.java | 170 ++++++++++++++++++ .../coordination/SegmentBootstrapper.java | 32 +++- .../metrics/DataSourceTaskIdHolder.java | 15 +- .../BroadcastDatasourceLoadingSpecTest.java | 166 +++++++++++++++++ .../SegmentBootstrapperCacheTest.java | 10 +- .../coordination/SegmentBootstrapperTest.java | 145 ++++++++++++++- .../java/org/apache/druid/cli/CliPeon.java | 31 +++- 22 files changed, 673 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java create mode 100644 server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index 568f8ed5a117..644a7f109b25 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -37,6 +37,7 @@ public class DruidK8sConstants public static final String TASK_JSON_ENV = "TASK_JSON"; public static final String TASK_DIR_ENV = "TASK_DIR"; public static final String TASK_ID_ENV = "TASK_ID"; + public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = "LOAD_BROADCAST_DATASOURCE_MODE"; public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS"; public static final String JAVA_OPTS = "JAVA_OPTS"; public static final String DRUID_HOST_ENV = "druid_host"; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index c15698803d9f..cc689f925f4f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -444,12 +444,16 @@ private List generateCommand(Task task) } // If the task type is queryable, we need to load broadcast segments on the peon, used for - // join queries + // join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here + // for backwards compatibility and can be removed in a future release. if (task.supportsQueries()) { command.add("--loadBroadcastSegments"); command.add("true"); } + command.add("--loadBroadcastDatasourceMode"); + command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString()); + command.add("--taskId"); command.add(task.getId()); log.info( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index e8aaf1bbab35..321fe3fcb3e8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -280,6 +280,10 @@ private Collection getEnv(Task task) throws IOException .withName(DruidK8sConstants.TASK_ID_ENV) .withValue(task.getId()) .build(), + new EnvVarBuilder() + .withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV) + .withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString()) + .build(), new EnvVarBuilder() .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV) .withValue(Boolean.toString(task.supportsQueries())) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index ac2aaa705581..b25f23a25ddc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -46,6 +46,7 @@ import org.apache.druid.k8s.overlord.execution.Selector; import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.junit.Assert; @@ -537,6 +538,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException EasyMock.expect(task.getId()).andReturn("id").anyTimes(); EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); EasyMock.replay(task); Job actual = adapter.fromTask(task); @@ -550,7 +552,46 @@ public void test_fromTask_taskSupportsQueries() throws IOException } @Test - public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException + public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOException + { + Path templatePath = Files.createFile(tempDir.resolve("noop.yaml")); + mapper.writeValue(templatePath.toFile(), podTemplateSpec); + + Properties props = new Properties(); + props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); + props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString()); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + props, + taskLogs, + dynamicConfigRef + ); + + Task task = EasyMock.mock(Task.class); + EasyMock.expect(task.supportsQueries()).andReturn(true); + EasyMock.expect(task.getType()).andReturn("queryable").anyTimes(); + EasyMock.expect(task.getId()).andReturn("id").anyTimes(); + EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); + EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); + EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); + + EasyMock.replay(task); + Job actual = adapter.fromTask(task); + EasyMock.verify(task); + + Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), actual.getSpec().getTemplate() + .getSpec().getContainers() + .get(0).getEnv().stream() + .filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)) + .collect(Collectors.toList()).get(0).getValue()); + } + + @Test + public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws IOException { Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index ddae7c0567f2..ac539c5da154 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml index 532c3dd53e82..f7c2ff958bbc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml @@ -45,6 +45,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index d6c316dcdde8..3a3af1528b56 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index 90ae99709598..ec7f9a062481 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -43,6 +43,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" image: one diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml index 0e52beac9e32..84457fb3175c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml @@ -44,6 +44,8 @@ spec: value: "/tmp" - name: "TASK_ID" value: "id" + - name: "LOAD_BROADCAST_DATASOURCE_MODE" + value: "ALL" - name: "LOAD_BROADCAST_SEGMENTS" value: "false" - name: "TASK_JSON" diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c3f6feaab245..4ddc8274b9d0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -59,6 +59,7 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -374,4 +375,10 @@ public LookupLoadingSpec getLookupLoadingSpec() { return LookupLoadingSpec.NONE; } + + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return BroadcastDatasourceLoadingSpec.NONE; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 76586c1e1081..8d974285fb57 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -35,6 +35,7 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; @@ -104,6 +105,22 @@ public void testGetDefaultLookupLoadingSpec() Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } + @Test + public void testGetDefaultBroadcastDatasourceLoadingSpec() + { + MSQControllerTask controllerTask = new MSQControllerTask( + null, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec()); + } + @Test public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index e1f6d2915eea..06082a988d98 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; @@ -412,6 +413,12 @@ public LookupLoadingSpec getLookupLoadingSpec() return LookupLoadingSpec.NONE; } + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return BroadcastDatasourceLoadingSpec.NONE; + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 003b39e606b0..cacdc47f520a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -41,13 +41,13 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -329,9 +329,18 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo + *
  • {@link BroadcastDatasourceLoadingSpec#mode}: This mode defines whether broadcastDatasources need to be + * loaded for the given task, or not. It can take 3 values:
  • + *
      + *
    • ALL: Load all the broadcast datasources.
    • + *
    • NONE: Load no broadcast datasources.
    • + *
    • ONLY_REQUIRED: Load only the broadcast datasources defined in broadcastDatasourcesToLoad
    • + *
    + *
  • {@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: Defines the broadcastDatasources to load when the broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.
  • + * + */ +public class BroadcastDatasourceLoadingSpec +{ + + public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = "broadcastDatasourceLoadingMode"; + public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = "broadcastDatasourcesToLoad"; + + public enum Mode + { + ALL, NONE, ONLY_REQUIRED + } + + private final Mode mode; + @Nullable + private final ImmutableSet broadcastDatasourcesToLoad; + + public static final BroadcastDatasourceLoadingSpec ALL = new BroadcastDatasourceLoadingSpec(Mode.ALL, null); + public static final BroadcastDatasourceLoadingSpec NONE = new BroadcastDatasourceLoadingSpec(Mode.NONE, null); + + private BroadcastDatasourceLoadingSpec(Mode mode, @Nullable Set broadcastDatasourcesToLoad) + { + this.mode = mode; + this.broadcastDatasourcesToLoad = broadcastDatasourcesToLoad == null ? null : ImmutableSet.copyOf(broadcastDatasourcesToLoad); + } + + /** + * Creates a BroadcastSegmentLoadingSpec which loads only the broadcast datasources present in the given set. + */ + public static BroadcastDatasourceLoadingSpec loadOnly(Set broadcastDatasourcesToLoad) + { + if (broadcastDatasourcesToLoad == null) { + throw InvalidInput.exception("Expected non-null set of broadcast datasources to load."); + } + return new BroadcastDatasourceLoadingSpec(Mode.ONLY_REQUIRED, broadcastDatasourcesToLoad); + } + + public Mode getMode() + { + return mode; + } + + /** + * @return A non-null immutable set of broadcast datasource names when {@link BroadcastDatasourceLoadingSpec#mode} is ONLY_REQUIRED, null otherwise. + */ + public ImmutableSet getBroadcastDatasourcesToLoad() + { + return broadcastDatasourcesToLoad; + } + + public static BroadcastDatasourceLoadingSpec createFromContext(Map context, BroadcastDatasourceLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object broadcastDatasourceModeValue = context.get(CTX_BROADCAST_DATASOURCE_LOADING_MODE); + if (broadcastDatasourceModeValue == null) { + return defaultSpec; + } + + final BroadcastDatasourceLoadingSpec.Mode broadcastDatasourceLoadingMode; + try { + broadcastDatasourceLoadingMode = BroadcastDatasourceLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception( + "Invalid value of %s[%s]. Allowed values are %s", + CTX_BROADCAST_DATASOURCE_LOADING_MODE, broadcastDatasourceModeValue.toString(), + Arrays.asList(BroadcastDatasourceLoadingSpec.Mode.values()) + ); + } + + if (broadcastDatasourceLoadingMode == Mode.NONE) { + return NONE; + } else if (broadcastDatasourceLoadingMode == Mode.ALL) { + return ALL; + } else if (broadcastDatasourceLoadingMode == Mode.ONLY_REQUIRED) { + final Collection broadcastDatasourcesToLoad; + try { + broadcastDatasourcesToLoad = (Collection) context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception( + "Invalid value of %s[%s]. Please provide a comma-separated list of broadcast datasource names." + + " For example: [\"datasourceName1\", \"datasourceName2\"]", + CTX_BROADCAST_DATASOURCES_TO_LOAD, context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD) + ); + } + + if (broadcastDatasourcesToLoad == null || broadcastDatasourcesToLoad.isEmpty()) { + throw InvalidInput.exception("Set of broadcast datasources to load cannot be %s for mode[ONLY_REQUIRED].", broadcastDatasourcesToLoad); + } + return BroadcastDatasourceLoadingSpec.loadOnly(new HashSet<>(broadcastDatasourcesToLoad)); + } else { + return defaultSpec; + } + } + + @Override + public String toString() + { + return "BroadcastDatasourceLoadingSpec{" + + "mode=" + mode + + ", broadcastDatasourcesToLoad=" + broadcastDatasourcesToLoad + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BroadcastDatasourceLoadingSpec that = (BroadcastDatasourceLoadingSpec) o; + return mode == that.mode && Objects.equals(broadcastDatasourcesToLoad, that.broadcastDatasourcesToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, broadcastDatasourcesToLoad); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java index c5b71fbcddcf..7eec82e80b1c 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java @@ -39,12 +39,14 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -80,6 +82,8 @@ public class SegmentBootstrapper private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class); + private final DataSourceTaskIdHolder datasourceHolder; + @Inject public SegmentBootstrapper( SegmentLoadDropHandler loadDropHandler, @@ -89,7 +93,8 @@ public SegmentBootstrapper( SegmentManager segmentManager, ServerTypeConfig serverTypeConfig, CoordinatorClient coordinatorClient, - ServiceEmitter emitter + ServiceEmitter emitter, + DataSourceTaskIdHolder datasourceHolder ) { this.loadDropHandler = loadDropHandler; @@ -100,6 +105,7 @@ public SegmentBootstrapper( this.serverTypeConfig = serverTypeConfig; this.coordinatorClient = coordinatorClient; this.emitter = emitter; + this.datasourceHolder = datasourceHolder; } @LifecycleStart @@ -261,10 +267,18 @@ private void loadSegmentsOnStartup() throws IOException /** * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned. + * The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()} + * if applicable. */ private List getBootstrapSegments() { - log.info("Fetching bootstrap segments from the coordinator."); + final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode(); + if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) { + log.info("Skipping fetch of bootstrap segments."); + return ImmutableList.of(); + } + + log.info("Fetching bootstrap segments from the coordinator with BroadcastDatasourceLoadingSpec mode[%s].", mode); final Stopwatch stopwatch = Stopwatch.createStarted(); List bootstrapSegments = new ArrayList<>(); @@ -272,7 +286,18 @@ private List getBootstrapSegments() try { final BootstrapSegmentsResponse response = FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true); - bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { + final Set broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad(); + final List filteredBroadcast = new ArrayList<>(); + response.getIterator().forEachRemaining(segment -> { + if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) { + filteredBroadcast.add(segment); + } + }); + bootstrapSegments = filteredBroadcast; + } else { + bootstrapSegments = ImmutableList.copyOf(response.getIterator()); + } } catch (Exception e) { log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage()); @@ -284,7 +309,6 @@ private List getBootstrapSegments() emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size())); log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis); } - return bootstrapSegments; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java index 6d2dafd31a55..87002a5157f8 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java @@ -21,15 +21,16 @@ import com.google.inject.Inject; import com.google.inject.name.Named; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import javax.annotation.Nullable; - public class DataSourceTaskIdHolder { public static final String DATA_SOURCE_BINDING = "druidDataSource"; public static final String TASK_ID_BINDING = "druidTaskId"; public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask"; + public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask"; + @Named(DATA_SOURCE_BINDING) @Inject(optional = true) String dataSource = null; @@ -37,11 +38,14 @@ public class DataSourceTaskIdHolder @Inject(optional = true) String taskId = null; - @Nullable @Named(LOOKUPS_TO_LOAD_FOR_TASK) @Inject(optional = true) LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL; + @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + @Inject(optional = true) + BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL; + public String getDataSource() { return dataSource; @@ -56,4 +60,9 @@ public LookupLoadingSpec getLookupLoadingSpec() { return lookupLoadingSpec; } + + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return broadcastDatasourceLoadingSpec; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java new file mode 100644 index 000000000000..ddec0901965d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Set; + +@RunWith(JUnitParamsRunner.class) +public class BroadcastDatasourceLoadingSpecTest +{ + @Test + public void testLoadingAllBroadcastDatasources() + { + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.ALL; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingNoLookups() + { + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.NONE; + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, spec.getMode()); + Assert.assertNull(spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookups() + { + final Set broadcastDatasourcesToLoad = ImmutableSet.of("ds1", "ds2"); + final BroadcastDatasourceLoadingSpec spec = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")); + Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode()); + Assert.assertEquals(broadcastDatasourcesToLoad, spec.getBroadcastDatasourcesToLoad()); + } + + @Test + public void testLoadingOnlyRequiredLookupsWithNullList() + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.loadOnly(null)); + Assert.assertEquals("Expected non-null set of broadcast datasources to load.", exception.getMessage()); + } + + @Test + public void testCreateBroadcastLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( + null, + BroadcastDatasourceLoadingSpec.NONE + ) + ); + + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( + null, + BroadcastDatasourceLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateBroadcastLoadingSpecFromContext() + { + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")), + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, Arrays.asList("ds1", "ds2"), + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED + ), + BroadcastDatasourceLoadingSpec.ALL + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.NONE, + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.NONE), + BroadcastDatasourceLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + BroadcastDatasourceLoadingSpec.ALL, + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ALL), + BroadcastDatasourceLoadingSpec.NONE + ) + ); + } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testSpecFromInvalidModeInContext(final String mode) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), BroadcastDatasourceLoadingSpec.ALL)); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), exception.getMessage()); + } + + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testSpecFromInvalidBroadcastDatasourcesInContext(final Object lookupsToLoad) + { + final DruidException exception = Assert.assertThrows(DruidException.class, () -> + BroadcastDatasourceLoadingSpec.createFromContext( + ImmutableMap.of( + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad, + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED), + BroadcastDatasourceLoadingSpec.ALL) + ); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "broadcast datasource names. For example: [\"datasourceName1\", \"datasourceName2\"]", + BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad), exception.getMessage()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java index 7629a6b875c8..187725317a21 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.TestSegmentUtils; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -137,7 +138,8 @@ public void testLoadStartStopWithEmptyLocations() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); @@ -164,7 +166,8 @@ public void testLoadStartStop() throws IOException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); @@ -204,7 +207,8 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - emitter + emitter, + new DataSourceTaskIdHolder() ); bootstrapper.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index c41763f18245..fe1424e27005 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -20,13 +20,23 @@ package org.apache.druid.server.coordination; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Scopes; +import com.google.inject.name.Names; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -125,7 +135,8 @@ public void testStartStop() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -184,7 +195,8 @@ public void testLoadCachedSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -240,7 +252,8 @@ public void testLoadBootstrapSegments() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -267,6 +280,129 @@ public void testLoadBootstrapSegments() throws Exception bootstrapper.stop(); } + @Test + public void testLoadNoBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + for (int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); + } + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.NONE; + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + + bootstrapper.stop(); + } + + @Test + public void testLoadOnlyRequiredBootstrapSegments() throws Exception + { + final Set segments = new HashSet<>(); + final DataSegment ds1Segment1 = makeSegment("test1", "1", Intervals.of("P1D/2011-04-01")); + final DataSegment ds1Segment2 = makeSegment("test1", "1", Intervals.of("P1D/2012-04-01")); + final DataSegment ds2Segment1 = makeSegment("test2", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment ds2Segment2 = makeSegment("test2", "1", Intervals.of("P1d/2012-04-01")); + segments.add(ds1Segment1); + segments.add(ds1Segment2); + segments.add(ds2Segment1); + segments.add(ds2Segment2); + + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + final BroadcastDatasourceLoadingSpec broadcastMode = BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("test1")); + binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK))) + .toInstance(broadcastMode); + } + ); + + final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments); + final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(); + final SegmentManager segmentManager = new SegmentManager(cacheManager); + final SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + segmentLoaderConfig, + segmentAnnouncer, + segmentManager + ); + final SegmentBootstrapper bootstrapper = new SegmentBootstrapper( + handler, + segmentLoaderConfig, + segmentAnnouncer, + serverAnnouncer, + segmentManager, + new ServerTypeConfig(ServerType.HISTORICAL), + coordinatorClient, + serviceEmitter, + injector.getInstance(DataSourceTaskIdHolder.class) + ); + + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + bootstrapper.start(); + + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertEquals(ImmutableSet.of("test1"), segmentManager.getDataSourceNames()); + + final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); + + Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); + serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); + + bootstrapper.stop(); + } + @Test public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception { @@ -285,7 +421,8 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception segmentManager, new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, - serviceEmitter + serviceEmitter, + new DataSourceTaskIdHolder() ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 61a8ab7374e4..15374625d301 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -123,6 +123,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.coordination.SegmentBootstrapper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ZkCoordinator; @@ -176,12 +177,26 @@ public class CliPeon extends GuiceRunnable private boolean isZkEnabled = true; /** + *

    This option is deprecated, see {@link #loadBroadcastDatasourcesMode} option.

    + * * If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for * queryable tasks, such as streaming ingestion tasks. + * */ - @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments") + @Deprecated + @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", + description = "Enable loading of broadcast segments. This option is deprecated and will be removed in a" + + " future release. Use --loadBroadcastDatasourceMode instead.") public String loadBroadcastSegments = "false"; + /** + * Broadcast datasource loading mode. The peon will bind classes necessary required for loading broadcast segments if + * the mode is {@link BroadcastDatasourceLoadingSpec.Mode#ALL} or {@link BroadcastDatasourceLoadingSpec.Mode#ONLY_REQUIRED}. + */ + @Option(name = "--loadBroadcastDatasourceMode", title = "loadBroadcastDatasourceMode", + description = "Specify the broadcast datasource loading mode for the peon. Supported values are ALL, NONE, ONLY_REQUIRED.") + public String loadBroadcastDatasourcesMode = BroadcastDatasourceLoadingSpec.Mode.ALL.toString(); + @Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely") public String taskId = ""; @@ -274,7 +289,11 @@ public void configure(Binder binder) binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType))); LifecycleModule.register(binder, Server.class); - if ("true".equals(loadBroadcastSegments)) { + final BroadcastDatasourceLoadingSpec.Mode mode = + BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode); + if ("true".equals(loadBroadcastSegments) + || mode == BroadcastDatasourceLoadingSpec.Mode.ALL + || mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) { binder.install(new BroadcastSegmentLoadingModule()); } } @@ -340,6 +359,14 @@ public LookupLoadingSpec getLookupsToLoad(final Task task) { return task.getLookupLoadingSpec(); } + + @Provides + @LazySingleton + @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK) + public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task task) + { + return task.getBroadcastDatasourceLoadingSpec(); + } }, new QueryablePeonModule(), new IndexingServiceInputSourceModule(),