Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workload Management] QueryGroupService that orchestrates Resource Tracking & Cancellation #1

Open
wants to merge 9 commits into
base: kp/wlm-cancellation-1
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709))
- [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15151](https://github.com/opensearch-project/OpenSearch/pull/15151))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15151](https://github.com/opensearch-project/OpenSearch/pull/15151))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ public Builder remove(final QueryGroup queryGroup) {
return queryGroups(existing);
}

private Map<String, QueryGroup> getQueryGroups() {
public Map<String, QueryGroup> getQueryGroups() {
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
.map(o -> (QueryGroupMetadata) o)
.map(QueryGroupMetadata::queryGroups)
Expand Down
27 changes: 24 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1020,8 +1022,21 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(
settings,
settingsModule.getClusterSettings()
);
final QueryGroupService queryGroupService = new QueryGroupService(
queryGroupResourceUsageTrackerService,
clusterService,
threadPool,
workloadManagementSettings,
new HashMap<>()
); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
Expand Down Expand Up @@ -1087,7 +1102,13 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
new QueryGroupService(
queryGroupResourceUsageTrackerService,
clusterService,
threadPool,
workloadManagementSettings,
new HashMap<>()
) // We will need to replace this with actual implementation
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down
141 changes: 135 additions & 6 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,163 @@

package org.opensearch.wlm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.cancellation.DefaultTaskCancellation;
import org.opensearch.wlm.cancellation.DefaultTaskSelectionStrategy;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
*/
public class QueryGroupService {
public class QueryGroupService extends AbstractLifecycleComponent implements ClusterStateApplier {
// This map does not need to be concurrent since we will process the cluster state change serially and update
// this map with new additions and deletions of entries. QueryGroupState is thread safe
private final Map<String, QueryGroupState> queryGroupStateMap;
private static final Logger logger = LogManager.getLogger(QueryGroupService.class);

public QueryGroupService() {
this(new HashMap<>());
private final QueryGroupResourceUsageTrackerService queryGroupUsageTracker;
private volatile Scheduler.Cancellable scheduledFuture;
private final ThreadPool threadPool;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have setting which controls whether the feature is enabled or not

private final ClusterService clusterService;
private final WorkloadManagementSettings workloadManagementSettings;
private Set<QueryGroup> activeQueryGroups = new HashSet<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should init the query groups from the state metadata because in come cases metadata might have some querygroups since metadata is durable.

Copy link
Owner Author

@kiranprakash154 kiranprakash154 Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm doing this in

protected Set<QueryGroup> getActiveQueryGroups() {
        Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups();
        return new HashSet<>(queryGroups.values());
    }

which is invoked from doRun()

private Set<QueryGroup> deletedQueryGroups = new HashSet<>();

protected Set<QueryGroup> getDeletedQueryGroups() {
return deletedQueryGroups;
}

protected Set<QueryGroup> getActiveQueryGroups() {
return activeQueryGroups;
}

public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
/**
* Guice managed constructor
*
* @param queryGroupUsageTracker tracker service
* @param threadPool threadPool this will be used to schedule the service
*/
@Inject
public QueryGroupService(
QueryGroupResourceUsageTrackerService queryGroupUsageTracker,
ClusterService clusterService,
ThreadPool threadPool,
WorkloadManagementSettings workloadManagementSettings,
Map<String, QueryGroupState> queryGroupStateMap
) {
this.queryGroupUsageTracker = queryGroupUsageTracker;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.workloadManagementSettings = workloadManagementSettings;
this.queryGroupStateMap = queryGroupStateMap;
}

/**
* run at regular interval
*/
protected void doRun() {
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews = queryGroupUsageTracker
.constructQueryGroupLevelUsageViews();
this.activeQueryGroups = getActiveQueryGroupsFromClusterState();
DefaultTaskCancellation defaultTaskCancellation = new DefaultTaskCancellation(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelResourceUsageViews,
activeQueryGroups,
deletedQueryGroups
);
defaultTaskCancellation.cancelTasks();
}

/**
* {@link AbstractLifecycleComponent} lifecycle method
*/
@Override
protected void doStart() {
if (!workloadManagementSettings.queryGroupServiceEnabled()) {
return;
}
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
} catch (Exception e) {
logger.debug("Exception occurred in Query Sandbox service", e);
}
}, this.workloadManagementSettings.getQueryGroupServiceRunInterval(), ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if (workloadManagementSettings.queryGroupServiceEnabled()) {
return;
}
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
}

@Override
protected void doClose() throws IOException {}

protected Set<QueryGroup> getActiveQueryGroupsFromClusterState() {
Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups();
return new HashSet<>(queryGroups.values());
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
// Retrieve the current and previous cluster states
Metadata previousMetadata = event.previousState().metadata();
Metadata currentMetadata = event.state().metadata();

// Extract the query groups from both the current and previous cluster states
Map<String, QueryGroup> previousQueryGroups = previousMetadata.queryGroups();
Map<String, QueryGroup> currentQueryGroups = currentMetadata.queryGroups();

// Detect new query groups added in the current cluster state
for (String queryGroupName : currentQueryGroups.keySet()) {
if (!previousQueryGroups.containsKey(queryGroupName)) {
// New query group detected
QueryGroup newQueryGroup = currentQueryGroups.get(queryGroupName);
// Perform any necessary actions with the new query group
this.activeQueryGroups.add(newQueryGroup);
}
}

// Detect query groups deleted in the current cluster state
for (String queryGroupName : previousQueryGroups.keySet()) {
if (!currentQueryGroups.containsKey(queryGroupName)) {
// Query group deleted
QueryGroup deletedQueryGroup = previousQueryGroups.get(queryGroupName);
// Perform any necessary actions with the deleted query group
this.deletedQueryGroups.add(deletedQueryGroup);
}
}
} // tested

/**
* updates the failure stats for the query group
*
* @param queryGroupId query group identifier
*/
public void incrementFailuresFor(final String queryGroupId) {
Expand All @@ -47,7 +178,6 @@ public void incrementFailuresFor(final String queryGroupId) {
}

/**
*
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
Expand All @@ -63,7 +193,6 @@ public QueryGroupStats nodeStats() {
}

/**
*
* @param queryGroupId query group identifier
*/
public void rejectIfNeeded(String queryGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;

import java.util.EnumMap;

/**
* Main class to declare Workload Management related settings
Expand All @@ -20,6 +27,8 @@ public class WorkloadManagementSettings {
private static final Double DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = 0.9;
private static final Double DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = 0.9;
private static final Long DEFAULT_QUERYGROUP_SERVICE_RUN_INTERVAL_MILLIS = 1000L;

public static final double NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
public static final double NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE = 0.9;
public static final double NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
Expand All @@ -29,6 +38,9 @@ public class WorkloadManagementSettings {
private Double nodeLevelMemoryRejectionThreshold;
private Double nodeLevelCpuCancellationThreshold;
private Double nodeLevelCpuRejectionThreshold;
private TimeValue queryGroupServiceRunIntervalInMillis;
private NodeDuressTrackers nodeDuressTrackers;
private Boolean queryGroupServiceEnabled;

/**
* Setting name for node level memory based rejection threshold for QueryGroup service
Expand Down Expand Up @@ -82,6 +94,27 @@ public class WorkloadManagementSettings {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final String QUERYGROUP_SERVICE_ENABLED_SETTING_NAME = "wlm.query_group.service.enabled";

public static final Setting<Boolean> QUERYGROUP_SERVICE_ENABLED_SETTING = Setting.boolSetting(
QUERYGROUP_SERVICE_ENABLED_SETTING_NAME,
false,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);
/**
* Setting name for Query Group Service run interval
*/
public static final String QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING_NAME = "wlm.query_group.service.run_interval";
/**
* Setting to control the run interval of Query Group Service
*/
public static final Setting<Long> QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING = Setting.longSetting(
QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING_NAME,
DEFAULT_QUERYGROUP_SERVICE_RUN_INTERVAL_MILLIS,
1000,
Setting.Property.NodeScope
);

/**
* QueryGroup service settings constructor
Expand All @@ -93,6 +126,9 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett
nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings);
nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings);
nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings);
queryGroupServiceRunIntervalInMillis = TimeValue.timeValueMillis(QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings));
nodeDuressTrackers = setupNodeDuressTracker(settings, clusterSettings);
queryGroupServiceEnabled = QUERYGROUP_SERVICE_ENABLED_SETTING.get(settings);

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
Expand All @@ -113,6 +149,50 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold);
}

/**
* Gets the interval at which the Query Group Service runs.
*
* @return the interval as a \`TimeValue\` object.
*/
public TimeValue getQueryGroupServiceRunInterval() {
return queryGroupServiceRunIntervalInMillis;
}

/**
* Gets the \`NodeDuressTrackers\` instance which tracks the node duress state.
*
* @return the \`NodeDuressTrackers\` instance.
*/
public NodeDuressTrackers getNodeDuressTrackers() {
return nodeDuressTrackers;
}

public Boolean queryGroupServiceEnabled() {
return queryGroupServiceEnabled;
}

private NodeDuressTrackers setupNodeDuressTracker(Settings settings, ClusterSettings clusterSettings) {
NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
put(
ResourceType.CPU,
new NodeDuressTrackers.NodeDuressTracker(
() -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
put(
ResourceType.MEMORY,
new NodeDuressTrackers.NodeDuressTracker(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
}
});
}

/**
* Method to get the node level memory based cancellation threshold
* @return current node level memory based cancellation threshold
Expand Down
Loading
Loading