Skip to content

Commit

Permalink
Add support for selective loading of broadcast datasources in the tas…
Browse files Browse the repository at this point in the history
…k layer (#17027)

Tasks control the loading of broadcast datasources via BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec(). By default, tasks download all broadcast datasources, unless there's an override as with kill and MSQ controller task.

The CLIPeon command line option --loadBroadcastSegments is deprecated in favor of --loadBroadcastDatasourceMode.

Broadcast datasources can be specified in SQL queries through JOIN and FROM clauses, or obtained from other sources such as lookups.To this effect, we have introduced a BroadcastDatasourceLoadingSpec. Finding the set of broadcast datasources during SQL planning will be done in a follow-up, which will apply only to MSQ tasks, so they load only required broadcast datasources. This PR primarily focuses on the skeletal changes around BroadcastDatasourceLoadingSpec and integrating it from the Task interface via CliPeon to SegmentBootstrapper.

Currently, only kill tasks and MSQ controller tasks skip loading broadcast datasources.
  • Loading branch information
abhishekrb19 authored Sep 12, 2024
1 parent 6ef8d5d commit 5ef94c9
Show file tree
Hide file tree
Showing 22 changed files with 673 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DruidK8sConstants
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
public static final String TASK_ID_ENV = "TASK_ID";
public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = "LOAD_BROADCAST_DATASOURCE_MODE";
public static final String LOAD_BROADCAST_SEGMENTS_ENV = "LOAD_BROADCAST_SEGMENTS";
public static final String JAVA_OPTS = "JAVA_OPTS";
public static final String DRUID_HOST_ENV = "druid_host";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,16 @@ private List<String> generateCommand(Task task)
}

// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--loadBroadcastDatasourceMode");
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());

command.add("--taskId");
command.add(task.getId());
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ private Collection<EnvVar> getEnv(Task task) throws IOException
.withName(DruidK8sConstants.TASK_ID_ENV)
.withValue(task.getId())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)
.withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.k8s.overlord.execution.Selector;
import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
import org.junit.Assert;
Expand Down Expand Up @@ -537,6 +538,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();

EasyMock.replay(task);
Job actual = adapter.fromTask(task);
Expand All @@ -550,7 +552,46 @@ public void test_fromTask_taskSupportsQueries() throws IOException
}

@Test
public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException
public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);

Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", templatePath.toString());

PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props,
taskLogs,
dynamicConfigRef
);

Task task = EasyMock.mock(Task.class);
EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();

EasyMock.replay(task);
Job actual = adapter.fromTask(task);
EasyMock.verify(task);

Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), actual.getSpec().getTemplate()
.getSpec().getContainers()
.get(0).getEnv().stream()
.filter(env -> env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV))
.collect(Collectors.toList()).get(0).getValue());
}

@Test
public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws IOException
{
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down Expand Up @@ -374,4 +375,10 @@ public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}

@Override
public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
Expand Down Expand Up @@ -104,6 +105,22 @@ public void testGetDefaultLookupLoadingSpec()
Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec());
}

@Test
public void testGetDefaultBroadcastDatasourceLoadingSpec()
{
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec());
}

@Test
public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -412,6 +413,12 @@ public LookupLoadingSpec getLookupLoadingSpec()
return LookupLoadingSpec.NONE;
}

@Override
public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.NONE;
}

@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -329,9 +329,18 @@ static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task,
* This behaviour can be overridden by passing parameters {@link LookupLoadingSpec#CTX_LOOKUP_LOADING_MODE}
* and {@link LookupLoadingSpec#CTX_LOOKUPS_TO_LOAD} in the task context.
*/
@Nullable
default LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.ALL);
}

/**
* Specifies the list of broadcast datasources to load for this task. Tasks load ALL broadcast datasources by default.
* This behavior can be overridden by passing parameters {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCE_LOADING_MODE}
* and {@link BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCES_TO_LOAD} in the task context.
*/
default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), BroadcastDatasourceLoadingSpec.ALL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,16 @@ public TaskStatus call()
}

// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}

command.add("--loadBroadcastDatasourceMode");
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());

if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -601,6 +602,16 @@ public void testGetLookupsToLoad()
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
}

@Test
public void testGetBroadcastDatasourcesToLoad()
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
.build();
Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, task.getBroadcastDatasourceLoadingSpec().getMode());
}

@Test
public void testKillBatchSizeOneAndLimit4() throws Exception
{
Expand Down
Loading

0 comments on commit 5ef94c9

Please sign in to comment.