Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workload Management] Add QueryGroupServiceSettings #15028

Merged
merged 7 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709))
- [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import org.opensearch.transport.SniffConnectionStrategy;
import org.opensearch.transport.TransportSettings;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -766,7 +767,13 @@ public void apply(Settings value, Settings current, Settings previous) {
// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

SystemTemplatesService.SETTING_APPLICATION_BASED_CONFIGURATION_TEMPLATES_ENABLED
SystemTemplatesService.SETTING_APPLICATION_BASED_CONFIGURATION_TEMPLATES_ENABLED,

// WorkloadManagement settings
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Main class to declare Workload Management related settings
*/
public class WorkloadManagementSettings {
private static final Double DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = 0.9;
private static final Double DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = 0.9;
public static final double NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
public static final double NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE = 0.9;
public static final double NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
public static final double NODE_LEVEL_CPU_REJECTION_THRESHOLD_MAX_VALUE = 0.9;

private Double nodeLevelMemoryCancellationThreshold;
private Double nodeLevelMemoryRejectionThreshold;
private Double nodeLevelCpuCancellationThreshold;
private Double nodeLevelCpuRejectionThreshold;

/**
* Setting name for node level memory based rejection threshold for QueryGroup service
*/
public static final String NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.memory_rejection_threshold";
/**
* Setting to control the memory based rejection threshold
*/
public static final Setting<Double> NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = Setting.doubleSetting(
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level cpu based rejection threshold for QueryGroup service
*/
public static final String NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.cpu_rejection_threshold";
/**
* Setting to control the cpu based rejection threshold
*/
public static final Setting<Double> NODE_LEVEL_CPU_REJECTION_THRESHOLD = Setting.doubleSetting(
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level memory based cancellation threshold for QueryGroup service
*/
public static final String NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.memory_cancellation_threshold";
/**
* Setting to control the memory based cancellation threshold
*/
public static final Setting<Double> NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = Setting.doubleSetting(
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level cpu based cancellation threshold for QueryGroup service
*/
public static final String NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.cpu_cancellation_threshold";
/**
* Setting to control the cpu based cancellation threshold
*/
public static final Setting<Double> NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = Setting.doubleSetting(
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* QueryGroup service settings constructor
* @param settings - QueryGroup service settings
* @param clusterSettings - QueryGroup cluster settings
*/
public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSettings) {
nodeLevelMemoryCancellationThreshold = NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD.get(settings);
nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings);
nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings);
nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings);

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);
ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, this::setNodeLevelCpuCancellationThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold);
}

/**
* Method to get the node level memory based cancellation threshold
* @return current node level memory based cancellation threshold
*/
public Double getNodeLevelMemoryCancellationThreshold() {
return nodeLevelMemoryCancellationThreshold;
}

/**
* Method to set the node level memory based cancellation threshold
* @param nodeLevelMemoryCancellationThreshold sets the new node level memory based cancellation threshold
* @throws IllegalArgumentException if the value is &gt; 0.95 and cancellation &lt; rejection threshold
*/
public void setNodeLevelMemoryCancellationThreshold(Double nodeLevelMemoryCancellationThreshold) {
if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.95 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelMemoryCancellationThreshold = nodeLevelMemoryCancellationThreshold;
}

/**
* Method to get the node level cpu based cancellation threshold
* @return current node level cpu based cancellation threshold
*/
public Double getNodeLevelCpuCancellationThreshold() {
return nodeLevelCpuCancellationThreshold;
}

/**
* Method to set the node level cpu based cancellation threshold
* @param nodeLevelCpuCancellationThreshold sets the new node level cpu based cancellation threshold
* @throws IllegalArgumentException if the value is &gt; 0.95 and cancellation &lt; rejection threshold
*/
public void setNodeLevelCpuCancellationThreshold(Double nodeLevelCpuCancellationThreshold) {
if (Double.compare(nodeLevelCpuCancellationThreshold, NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.95 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelCpuCancellationThreshold = nodeLevelCpuCancellationThreshold;
}

/**
* Method to get the memory based node level rejection threshold
* @return the current memory based node level rejection threshold
*/
public Double getNodeLevelMemoryRejectionThreshold() {
return nodeLevelMemoryRejectionThreshold;
}

/**
* Method to set the node level memory based rejection threshold
* @param nodeLevelMemoryRejectionThreshold sets the new memory based rejection threshold
* @throws IllegalArgumentException if rejection &gt; 0.90 and rejection &lt; cancellation threshold
*/
public void setNodeLevelMemoryRejectionThreshold(Double nodeLevelMemoryRejectionThreshold) {
if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.90 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelMemoryRejectionThreshold = nodeLevelMemoryRejectionThreshold;
}

/**
* Method to get the cpu based node level rejection threshold
* @return the current cpu based node level rejection threshold
*/
public Double getNodeLevelCpuRejectionThreshold() {
return nodeLevelCpuRejectionThreshold;
}

/**
* Method to set the node level cpu based rejection threshold
* @param nodeLevelCpuRejectionThreshold sets the new cpu based rejection threshold
* @throws IllegalArgumentException if rejection &gt; 0.90 and rejection &lt; cancellation threshold
*/
public void setNodeLevelCpuRejectionThreshold(Double nodeLevelCpuRejectionThreshold) {
if (Double.compare(nodeLevelCpuRejectionThreshold, NODE_LEVEL_CPU_REJECTION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.90 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelCpuRejectionThreshold = nodeLevelCpuRejectionThreshold;
}

/**
* Method to validate that the cancellation threshold is greater than or equal to rejection threshold
* @param nodeLevelRejectionThreshold rejection threshold to be compared
* @param nodeLevelCancellationThreshold cancellation threshold to be compared
* @param rejectionThresholdSettingName name of the rejection threshold setting
* @param cancellationThresholdSettingName name of the cancellation threshold setting
* @throws IllegalArgumentException if cancellation threshold is less than rejection threshold
*/
private void ensureRejectionThresholdIsLessThanCancellation(
Double nodeLevelRejectionThreshold,
Double nodeLevelCancellationThreshold,
String rejectionThresholdSettingName,
String cancellationThresholdSettingName
) {
if (Double.compare(nodeLevelCancellationThreshold, nodeLevelRejectionThreshold) < 0) {
throw new IllegalArgumentException(
cancellationThresholdSettingName + " value should not be less than " + rejectionThresholdSettingName
);
}
}
}
Loading
Loading