Skip to content

Commit

Permalink
improve support for multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
ansjcy committed Dec 12, 2023
1 parent 4c11760 commit 8f89365
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -47,20 +48,22 @@ public SearchRequestListenerManager(
}

/**
* Add a {@link SearchRequestOperationsListener} to the searchRequestListenersList,
* which will be executed during each search request.
* Add multiple {@link SearchRequestOperationsListener} to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listener A SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null or already exists in the list.
*/
public void addListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
public void addListeners(SearchRequestOperationsListener... listeners) {
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestListenersList.add(listener);
}
searchRequestListenersList.add(listener);
}

/**
Expand Down Expand Up @@ -104,9 +107,9 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = new ArrayList<>(searchRequestListenersList);
final List<SearchRequestOperationsListener> searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList());

Arrays.stream(perRequestListeners).parallel().forEach((listener) -> {
Arrays.stream(perRequestListeners).forEach((listener) -> {
if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) {
TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener;
// phase_took is enabled with request param and/or cluster setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
* @opensearch.internal
*/
@InternalApi
abstract class SearchRequestOperationsListener {
protected SearchRequestListenerManager searchRequestListenerManager;
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = false;
}
protected SearchRequestOperationsListener(boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -33,33 +40,13 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

public void setEnabled(boolean enabled) {
if (enabled) {
register();
} else {
deregister();
}
}


/**
* Handler function to register this listener to certain components
* This function will be called when the listener is enabled.
*/
protected void register() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.addListener(this);
}
boolean getEnabled() {
return enabled;
}

/**
* Handler function to deregister this listener from certain components
* This function will be called when the listener is disabled.
*/
protected void deregister() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.removeListener(this);
}
void setEnabled(boolean enabled) {
this.enabled = enabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,13 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener

@Inject
public SearchRequestSlowLog(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
ClusterService clusterService
) {
this(clusterService, searchRequestListenerManager, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
}

@Inject
SearchRequestSlowLog(ClusterService clusterService, SearchRequestListenerManager searchRequestListenerManager, Logger logger) {
SearchRequestSlowLog(ClusterService clusterService, Logger logger) {
this.logger = logger;
this.searchRequestListenerManager = searchRequestListenerManager;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {

@Inject
public SearchRequestStats(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
ClusterService clusterService
) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
this.searchRequestListenerManager = searchRequestListenerManager;
}

public long getPhaseCurrent(SearchPhaseName searchPhaseName) {
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,16 @@ protected Node(
repositoriesServiceReference::get,
threadPool
);
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);

final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService, searchRequestListenerManager);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, searchRequestListenerManager);
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);
searchRequestListenerManager.addListeners(
searchRequestStats,
searchRequestSlowLog
);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
final IndicesService indicesService = new IndicesService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PersistedStateStats;
Expand Down Expand Up @@ -970,8 +969,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService, listenerManager));
indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService));
RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats();
remoteSegmentStats.addUploadBytesStarted(10L);
remoteSegmentStats.addUploadBytesSucceeded(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,7 @@ public void testOnPhaseFailureAndVerifyListeners() {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);

final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
Expand Down Expand Up @@ -605,8 +604,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

long delay = (randomIntBetween(1, 5));
Expand Down Expand Up @@ -660,8 +658,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void testAddAndGetListeners() {
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener();
listenerManager.addListener(testListener);
listenerManager.addListeners(testListener);
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener, listenerManager.getListeners().get(0));
}
Expand All @@ -42,28 +42,55 @@ public void testRemoveListeners() {
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
listenerManager.addListener(testListener1);
listenerManager.addListener(testListener2);
listenerManager.addListeners(testListener1, testListener2);
assertEquals(2, listenerManager.getListeners().size());
listenerManager.removeListener(testListener2);
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
}

public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldException, IllegalAccessException {
public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
testListener2.setEnabled(true);
listenerManager.addListeners(testListener1, testListener2);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger
);
Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners");
listenersField.setAccessible(true);
List<SearchRequestOperationsListener> listeners = (List<SearchRequestOperationsListener>) listenersField.get(compositeListener);
assertEquals(1, listeners.size());
assertEquals(testListener2, listeners.get(0));
assertEquals(2, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(testListener2, listenerManager.getListeners().get(1));
}

public void testStandardListenersAndTimeProvider() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);
listenerManager.addListener(testListener1);
listenerManager.addListeners(testListener1);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
Expand All @@ -82,20 +109,53 @@ public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldExce
assertEquals(testListener1, listenerManager.getListeners().get(0));
}

public void testBuildCompositeListenersWithPhaseTookDisabled() throws NoSuchFieldException, IllegalAccessException {
public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);
listenerManager.addListeners(testListener1);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger,
timeProviderListener
);
Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners");
listenersField.setAccessible(true);
List<SearchRequestOperationsListener> listeners = (List<SearchRequestOperationsListener>) listenersField.get(compositeListener);
assertEquals(1, listeners.size());
assertEquals(timeProviderListener, listeners.get(0));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertFalse(listenerManager.getListeners().get(0).getEnabled());
}

public void testStandardListenerAndTimeProviderDisabled() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);
listenerManager.addListener(testListener1);
listenerManager.addListeners(testListener1);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(false);
Expand Down
Loading

0 comments on commit 8f89365

Please sign in to comment.