Skip to content

Commit

Permalink
support CPU resource configurable for Kubernates job
Browse files Browse the repository at this point in the history
  • Loading branch information
fectrain committed Feb 23, 2024
1 parent 91fd3be commit 01c9cd8
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public class KubernetesTaskRunnerConfig
@NotNull
private List<String> javaOptsArray = ImmutableList.of();

@JsonProperty
@NotNull
private int cpuCoreInMicro = 0;

@JsonProperty
@NotNull
private Map<String, String> labels = ImmutableMap.of();
Expand Down Expand Up @@ -133,6 +137,7 @@ private KubernetesTaskRunnerConfig(
Period k8sjobLaunchTimeout,
List<String> peonMonitors,
List<String> javaOptsArray,
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Expand Down Expand Up @@ -184,6 +189,10 @@ private KubernetesTaskRunnerConfig(
javaOptsArray,
this.javaOptsArray
);
this.cpuCoreInMicro = ObjectUtils.defaultIfNull(
cpuCoreInMicro,
this.cpuCoreInMicro
);
this.labels = ObjectUtils.defaultIfNull(
labels,
this.labels
Expand Down Expand Up @@ -264,6 +273,11 @@ public List<String> getJavaOptsArray()
return javaOptsArray;
}

public int getCpuCoreInMicro()
{
return cpuCoreInMicro;
}

public Map<String, String> getLabels()
{
return labels;
Expand Down Expand Up @@ -299,6 +313,7 @@ public static class Builder
private Period k8sjobLaunchTimeout;
private List<String> peonMonitors;
private List<String> javaOptsArray;
private int cpuCoreInMicro;
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
Expand Down Expand Up @@ -379,6 +394,12 @@ public Builder withPeonMonitors(List<String> peonMonitors)
return this;
}

public Builder withCpuCore(int cpuCore)
{
this.cpuCoreInMicro = cpuCore;
return this;
}

public Builder withJavaOptsArray(List<String> javaOptsArray)
{
this.javaOptsArray = javaOptsArray;
Expand All @@ -397,6 +418,7 @@ public Builder withAnnotations(Map<String, String> annotations)
return this;
}


public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
{
this.capacity = capacity;
Expand All @@ -419,6 +441,7 @@ public KubernetesTaskRunnerConfig build()
this.k8sjobLaunchTimeout,
this.peonMonitors,
this.javaOptsArray,
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class DruidK8sConstants
public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final int DEFAULT_CPU_MILLICORES = 1000;
public static final String DEFAULT_JAVA_HEAP_SIZE = "1G";
public static final String TLS_ENABLED = "tls.enabled";
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,25 @@ public class PeonCommandContext
private final List<String> javaOpts;
private final File taskDir;
private final boolean enableTls;
private final int CpuMicroCore;

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir)
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, int CpuMicroCore)
{
this(comamnd, javaOpts, taskDir, false);
this(comamnd, javaOpts, taskDir, CpuMicroCore, false);
}

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, boolean enableTls)
public PeonCommandContext(
List<String> comamnd,
List<String> javaOpts,
File taskDir,
int CpuMicroCore,
boolean enableTls
)
{
this.comamnd = comamnd;
this.javaOpts = javaOpts;
this.taskDir = taskDir;
this.CpuMicroCore = CpuMicroCore;
this.enableTls = enableTls;
}

Expand All @@ -66,6 +74,11 @@ public File getTaskDir()
return taskDir;
}

public int getCpuMicroCore()
{
return CpuMicroCore;
}

public boolean isEnableTls()
{
return enableTls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public Job fromTask(Task task) throws IOException
generateCommand(task),
javaOpts(task),
taskConfig.getBaseTaskDir(),
taskRunnerConfig.getCpuCoreInMicro(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
Expand Down Expand Up @@ -179,7 +180,7 @@ static long getContainerMemory(PeonCommandContext context)
{
List<String> javaOpts = context.getJavaOpts();
Optional<Long> optionalXmx = getJavaOptValueBytes("-Xmx", javaOpts);
long heapSize = HumanReadableBytes.parse("1g");
long heapSize = HumanReadableBytes.parse(DruidK8sConstants.DEFAULT_JAVA_HEAP_SIZE);
if (optionalXmx.isPresent()) {
heapSize = optionalXmx.get();
}
Expand Down Expand Up @@ -276,7 +277,8 @@ protected Container setupMainContainer(
mainContainer.setName("main");
ResourceRequirements requirements = getResourceRequirements(
mainContainer.getResources(),
containerSize
containerSize,
context.getCpuMicroCore()
);
mainContainer.setResources(requirements);
return mainContainer;
Expand Down Expand Up @@ -411,10 +413,13 @@ private List<String> generateCommand(Task task)
}

@VisibleForTesting
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize)
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore)
{
Map<String, Quantity> resourceMap = new HashMap<>();
resourceMap.put("cpu", new Quantity("1000", "m"));
resourceMap.put(
"cpu",
new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m")
);
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
if (requirements != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception
jsonMapper
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"
), new ArrayList<>(), new File(taskBasePath));
PeonCommandContext context = new PeonCommandContext(
Collections.singletonList("sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"),
new ArrayList<>(),
new File(taskBasePath),
config.getCpuCoreInMicro()
);

Job job = adapter.createJobFromPodSpec(podSpec, task, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void serializingAndDeserializingATask() throws IOException
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"), config.getCpuCoreInMicro())
);
client.batch().v1().jobs().inNamespace("test").create(jobFromSpec);
JobList jobList = client.batch().v1().jobs().inNamespace("test").list();
Expand Down Expand Up @@ -210,15 +210,17 @@ void testGettingContainerSize()
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
0
);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));

context = new PeonCommandContext(
new ArrayList<>(),
Collections.singletonList(
"-server -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/druid/data -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"),
new File("/tmp")
new File("/tmp"),
0
);
expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));
Expand Down Expand Up @@ -271,7 +273,8 @@ void testAddingMonitors() throws IOException
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp/")
new File("/tmp/"),
0
);
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
Expand Down Expand Up @@ -361,7 +364,8 @@ void testEphemeralStorageIsRespected() throws IOException
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutput.yaml", Job.class);
Expand All @@ -385,14 +389,72 @@ void testEphemeralStorageIsRespected() throws IOException
Assertions.assertEquals(expected, actual);
}

@Test
void testEphemeralResourceIsEspected() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);

List<String> javaOpts = new ArrayList<>();
javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G");
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
.withJavaOptsArray(javaOpts)
.withCpuCore(2000)
.build();

SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper,
taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(
Collections.singletonList("foo && bar"),
javaOpts,
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutputResource.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
Assertions.assertEquals(expected, actual);

}


@Test
void testEphemeralStorage()
{
// no resources set.
Container container = new ContainerBuilder().build();
ResourceRequirements result = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// requests and limits will only have 2 items, cpu / memory
assertEquals(2, result.getLimits().size());
Expand All @@ -404,7 +466,8 @@ void testEphemeralStorage()
container.setResources(new ResourceRequirementsBuilder().withRequests(requestMap).withLimits(limitMap).build());
ResourceRequirements ephemeralResult = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// you will have ephemeral storage as well.
assertEquals(3, ephemeralResult.getLimits().size());
Expand All @@ -422,7 +485,8 @@ void testEphemeralStorage()
container.getResources().setAdditionalProperty("additional", "some-value");
ResourceRequirements additionalProperties = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
assertEquals(1, additionalProperties.getAdditionalProperties().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ public void testMultiContainerSupport() throws IOException
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class);
Expand Down Expand Up @@ -154,9 +157,12 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class);
Expand Down Expand Up @@ -203,9 +209,12 @@ public void testOverridingPeonMonitors() throws IOException
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public void testSingleContainerSupport() throws IOException
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);

Expand Down
Loading

0 comments on commit 01c9cd8

Please sign in to comment.