Skip to content

Commit

Permalink
include SearchRequest in SearchContext and rename to SearchRequestOpe…
Browse files Browse the repository at this point in the history
…rationsCompositeListenerFactory

Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jan 11, 2024
1 parent bb64cd2 commit 96c8388
Show file tree
Hide file tree
Showing 17 changed files with 257 additions and 100 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678))
- Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582))
- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732))
- Added Support for dynamically adding SearchRequestOperationsListeners ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

Expand All @@ -31,25 +29,14 @@ class SearchRequestContext {
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

private final boolean phaseTookEnabled;
private final SearchRequest searchRequest;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), false);
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, boolean phaseTookEnabled) {
SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.phaseTookEnabled = phaseTookEnabled;
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this(searchRequestOperationsListener, false);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -65,7 +52,7 @@ Map<String, Long> phaseTookMap() {
}

SearchResponse.PhaseTook getPhaseTook() {
if (phaseTookEnabled) {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
import java.util.stream.Stream;

/**
* SearchRequestOperationsListeners contains listeners registered to search requests,
* SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public class SearchRequestOperationsListeners {
public final class SearchRequestOperationsCompositeListenerFactory {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestOperationsListeners and add multiple {@link SearchRequestOperationsListener}
* Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestOperationsListeners(SearchRequestOperationsListener... listeners) {
public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
Expand All @@ -49,7 +49,6 @@ public SearchRequestOperationsListeners(SearchRequestOperationsListener... liste
* Get searchRequestListenersList,
*
* @return List of SearchRequestOperationsListener
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public List<SearchRequestOperationsListener> getListeners() {
return searchRequestListenersList;
Expand All @@ -65,9 +64,9 @@ public List<SearchRequestOperationsListener> getListeners() {
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
SearchRequest searchRequest,
Logger logger,
SearchRequestOperationsListener... perRequestListeners
final SearchRequest searchRequest,
final Logger logger,
final SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected SearchRequestOperationsListener() {
this.enabled = true;
}

protected SearchRequestOperationsListener(boolean enabled) {
protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

Expand All @@ -49,7 +49,7 @@ boolean isEnabled() {
return enabled;
}

protected void setEnabled(boolean enabled) {
protected void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsListeners searchRequestOperationsListeners;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -195,7 +195,7 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestOperationsListeners searchRequestOperationsListeners
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -212,7 +212,7 @@ public TransportSearchAction(
this.searchPipelineService = searchPipelineService;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchRequestOperationsListeners = searchRequestOperationsListeners;
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}
Expand Down Expand Up @@ -428,15 +428,12 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
final boolean phaseTookEnabled;
if (originalSearchRequest.isPhaseTook() == null) {
phaseTookEnabled = clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
} else {
phaseTookEnabled = originalSearchRequest.isPhaseTook();
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, phaseTookEnabled);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
Expand Down
23 changes: 12 additions & 11 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestOperationsListeners;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -881,15 +881,16 @@ protected Node(
)
.collect(Collectors.toList());

// register all standard SearchRequestOperationsListeners to the SearchRequestOperationsListeners
final SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);

ActionModule actionModule = new ActionModule(
settings,
Expand Down Expand Up @@ -1287,7 +1288,7 @@ protected Node(
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestOperationsListeners.class).toInstance(searchRequestOperationsListeners);
b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
});
injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -177,7 +178,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
Expand Down Expand Up @@ -715,7 +716,10 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
null,
task,
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger))
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
);
}

Expand Down Expand Up @@ -765,7 +769,10 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
null,
task,
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger))
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
) {
@Override
ShardSearchFailure[] buildShardFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.action.OriginalIndices;
Expand Down Expand Up @@ -137,7 +138,10 @@ public void run() throws IOException {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -229,7 +233,10 @@ public void run() throws IOException {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -320,7 +327,10 @@ public void sendCanMatch(
new ArraySearchPhaseResults<>(iter.size()),
randomIntBetween(1, 32),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
) {

@Override
Expand Down Expand Up @@ -348,7 +358,10 @@ protected void executePhaseOnShard(
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -433,7 +446,10 @@ public void run() {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -533,7 +549,10 @@ public void run() {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down
Loading

0 comments on commit 96c8388

Please sign in to comment.