-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Workload Management] QueryGroupService that orchestrates Resource Tracking & Cancellation #1
base: kp/wlm-cancellation-1
Are you sure you want to change the base?
Changes from all commits
b189670
8a11078
a245583
eb7ad77
193dd49
06994fb
e7a8fa0
5b9bc90
495d9d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,32 +8,163 @@ | |
|
||
package org.opensearch.wlm; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.ClusterChangedEvent; | ||
import org.opensearch.cluster.ClusterStateApplier; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.cluster.metadata.QueryGroup; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.lifecycle.AbstractLifecycleComponent; | ||
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; | ||
import org.opensearch.threadpool.Scheduler; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.wlm.cancellation.DefaultTaskCancellation; | ||
import org.opensearch.wlm.cancellation.DefaultTaskSelectionStrategy; | ||
import org.opensearch.wlm.stats.QueryGroupState; | ||
import org.opensearch.wlm.stats.QueryGroupStats; | ||
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder; | ||
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes | ||
*/ | ||
public class QueryGroupService { | ||
public class QueryGroupService extends AbstractLifecycleComponent implements ClusterStateApplier { | ||
// This map does not need to be concurrent since we will process the cluster state change serially and update | ||
// this map with new additions and deletions of entries. QueryGroupState is thread safe | ||
private final Map<String, QueryGroupState> queryGroupStateMap; | ||
private static final Logger logger = LogManager.getLogger(QueryGroupService.class); | ||
|
||
public QueryGroupService() { | ||
this(new HashMap<>()); | ||
private final QueryGroupResourceUsageTrackerService queryGroupUsageTracker; | ||
private volatile Scheduler.Cancellable scheduledFuture; | ||
private final ThreadPool threadPool; | ||
private final ClusterService clusterService; | ||
private final WorkloadManagementSettings workloadManagementSettings; | ||
private Set<QueryGroup> activeQueryGroups = new HashSet<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should init the query groups from the state metadata because in come cases metadata might have some querygroups since metadata is durable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'm doing this in protected Set<QueryGroup> getActiveQueryGroups() {
Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups();
return new HashSet<>(queryGroups.values());
} which is invoked from |
||
private Set<QueryGroup> deletedQueryGroups = new HashSet<>(); | ||
|
||
protected Set<QueryGroup> getDeletedQueryGroups() { | ||
return deletedQueryGroups; | ||
} | ||
|
||
protected Set<QueryGroup> getActiveQueryGroups() { | ||
return activeQueryGroups; | ||
} | ||
|
||
public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) { | ||
/** | ||
* Guice managed constructor | ||
* | ||
* @param queryGroupUsageTracker tracker service | ||
* @param threadPool threadPool this will be used to schedule the service | ||
*/ | ||
@Inject | ||
public QueryGroupService( | ||
QueryGroupResourceUsageTrackerService queryGroupUsageTracker, | ||
ClusterService clusterService, | ||
ThreadPool threadPool, | ||
WorkloadManagementSettings workloadManagementSettings, | ||
Map<String, QueryGroupState> queryGroupStateMap | ||
) { | ||
this.queryGroupUsageTracker = queryGroupUsageTracker; | ||
this.clusterService = clusterService; | ||
this.threadPool = threadPool; | ||
this.workloadManagementSettings = workloadManagementSettings; | ||
this.queryGroupStateMap = queryGroupStateMap; | ||
} | ||
|
||
/** | ||
* run at regular interval | ||
*/ | ||
protected void doRun() { | ||
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews = queryGroupUsageTracker | ||
.constructQueryGroupLevelUsageViews(); | ||
this.activeQueryGroups = getActiveQueryGroupsFromClusterState(); | ||
DefaultTaskCancellation defaultTaskCancellation = new DefaultTaskCancellation( | ||
workloadManagementSettings, | ||
new DefaultTaskSelectionStrategy(), | ||
queryGroupLevelResourceUsageViews, | ||
activeQueryGroups, | ||
deletedQueryGroups | ||
); | ||
defaultTaskCancellation.cancelTasks(); | ||
} | ||
|
||
/** | ||
* {@link AbstractLifecycleComponent} lifecycle method | ||
*/ | ||
@Override | ||
protected void doStart() { | ||
if (!workloadManagementSettings.queryGroupServiceEnabled()) { | ||
return; | ||
} | ||
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { | ||
try { | ||
doRun(); | ||
} catch (Exception e) { | ||
logger.debug("Exception occurred in Query Sandbox service", e); | ||
} | ||
}, this.workloadManagementSettings.getQueryGroupServiceRunInterval(), ThreadPool.Names.GENERIC); | ||
} | ||
|
||
@Override | ||
protected void doStop() { | ||
if (workloadManagementSettings.queryGroupServiceEnabled()) { | ||
return; | ||
} | ||
if (scheduledFuture != null) { | ||
scheduledFuture.cancel(); | ||
} | ||
} | ||
|
||
@Override | ||
protected void doClose() throws IOException {} | ||
|
||
protected Set<QueryGroup> getActiveQueryGroupsFromClusterState() { | ||
Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups(); | ||
return new HashSet<>(queryGroups.values()); | ||
} | ||
|
||
@Override | ||
public void applyClusterState(ClusterChangedEvent event) { | ||
// Retrieve the current and previous cluster states | ||
Metadata previousMetadata = event.previousState().metadata(); | ||
Metadata currentMetadata = event.state().metadata(); | ||
|
||
// Extract the query groups from both the current and previous cluster states | ||
Map<String, QueryGroup> previousQueryGroups = previousMetadata.queryGroups(); | ||
Map<String, QueryGroup> currentQueryGroups = currentMetadata.queryGroups(); | ||
|
||
// Detect new query groups added in the current cluster state | ||
for (String queryGroupName : currentQueryGroups.keySet()) { | ||
if (!previousQueryGroups.containsKey(queryGroupName)) { | ||
// New query group detected | ||
QueryGroup newQueryGroup = currentQueryGroups.get(queryGroupName); | ||
// Perform any necessary actions with the new query group | ||
this.activeQueryGroups.add(newQueryGroup); | ||
} | ||
} | ||
|
||
// Detect query groups deleted in the current cluster state | ||
for (String queryGroupName : previousQueryGroups.keySet()) { | ||
if (!currentQueryGroups.containsKey(queryGroupName)) { | ||
// Query group deleted | ||
QueryGroup deletedQueryGroup = previousQueryGroups.get(queryGroupName); | ||
// Perform any necessary actions with the deleted query group | ||
this.deletedQueryGroups.add(deletedQueryGroup); | ||
} | ||
} | ||
} // tested | ||
|
||
/** | ||
* updates the failure stats for the query group | ||
* | ||
* @param queryGroupId query group identifier | ||
*/ | ||
public void incrementFailuresFor(final String queryGroupId) { | ||
|
@@ -47,7 +178,6 @@ public void incrementFailuresFor(final String queryGroupId) { | |
} | ||
|
||
/** | ||
* | ||
* @return node level query group stats | ||
*/ | ||
public QueryGroupStats nodeStats() { | ||
|
@@ -63,7 +193,6 @@ public QueryGroupStats nodeStats() { | |
} | ||
|
||
/** | ||
* | ||
* @param queryGroupId query group identifier | ||
*/ | ||
public void rejectIfNeeded(String queryGroupId) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also have setting which controls whether the feature is enabled or not