Skip to content

Commit

Permalink
Merge branch 'main' into 13531-range-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Jun 7, 2024
2 parents 54bfe92 + b06d0b9 commit 6ae1a9b
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))

### Dependencies
Expand All @@ -44,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
if (userProvidedLabel != null) {
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
}
attributes.put(Attribute.LABELS, labels);
// construct SearchQueryRecord from attributes and measurements
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
NODE_ID;
NODE_ID,
/**
* Custom search request labels
*/
LABELS;

/**
* Read an Attribute from a StreamInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,39 @@
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -48,6 +59,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final ThreadPool threadPool = mock(ThreadPool.class);
private ClusterService clusterService;

@Before
Expand All @@ -61,15 +73,21 @@ public void setup() {
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
when(threadPool.getThreadContext()).thenReturn(threadContext);
}

@SuppressWarnings("unchecked")
public void testOnRequestEnd() throws InterruptedException {
Long timestamp = System.currentTimeMillis() - 100L;
SearchType searchType = SearchType.QUERY_THEN_FETCH;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -89,10 +107,19 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);
ArgumentCaptor<SearchQueryRecord> captor = ArgumentCaptor.forClass(SearchQueryRecord.class);

queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);

verify(queryInsightsService, times(1)).addRecord(any());
verify(queryInsightsService, times(1)).addRecord(captor.capture());
SearchQueryRecord generatedRecord = captor.getValue();
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand All @@ -102,6 +129,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -121,6 +149,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);

int numRequests = 50;
Thread[] threads = new Thread[numRequests];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ initalMetadataVersion < internalCluster().client()
* After shard relocation completes, shuts down the docrep nodes and asserts remote
* index settings are applied even when the index is in YELLOW state
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13737")
public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Exception {
internalCluster().startClusterManagerOnlyNode();

Expand Down Expand Up @@ -329,6 +330,7 @@ public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Except
* After shard relocation completes, restarts the docrep node holding extra replica shard copy
* and asserts remote index settings are applied as soon as the docrep replica copy is unassigned
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13871")
public void testIndexSettingsUpdatedWhenDocrepNodeIsRestarted() throws Exception {
internalCluster().startClusterManagerOnlyNode();

Expand Down Expand Up @@ -469,6 +471,7 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E
* exclude docrep nodes, assert that remote index path file exists
* when shards start relocating to the remote nodes.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13939")
public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected int minimumNumberOfReplicas() {
Brings up new replica copies on remote and docrep nodes, when primary is on a remote node
Live indexing is happening meanwhile
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13473")
public void testReplicaRecovery() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
String primaryNode = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ String formattedShardStats() {
);
}
}

public SearchRequest getRequest() {
return searchRequest;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

protected abstract void onPhaseStart(SearchPhaseContext context);
protected void onPhaseStart(SearchPhaseContext context) {};

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {};

protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {};

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down
53 changes: 46 additions & 7 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ReleasableLock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -396,7 +398,12 @@ private V get(K key, long now, Consumer<Entry<K, V>> onExpiration) {
if (entry == null) {
return null;
} else {
promote(entry, now);
List<RemovalNotification<K, V>> removalNotifications = promote(entry, now).v2();
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return entry.value;
}
}
Expand Down Expand Up @@ -446,8 +453,14 @@ private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
removalNotifications = promote(ok, now).v2();
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return ok.value;
} else {
Expand Down Expand Up @@ -512,16 +525,22 @@ private void put(K key, V value, long now) {
CacheSegment<K, V> segment = getCacheSegment(key);
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
boolean replaced = false;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
promote(tuple.v1(), now);
removalNotifications = promote(tuple.v1(), now).v2();
}
if (replaced) {
removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
}

Expand Down Expand Up @@ -767,8 +786,17 @@ public long getEvictions() {
}
}

private boolean promote(Entry<K, V> entry, long now) {
/**
* Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in
* case the cache size is exceeding or the entry got expired.
* @param entry Entry to be promoted
* @param now the current time
* @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal
* notifications that the callers needs to handle.
*/
private Tuple<Boolean, List<RemovalNotification<K, V>>> promote(Entry<K, V> entry, long now) {
boolean promoted = true;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
switch (entry.state) {
case DELETED:
Expand All @@ -782,10 +810,21 @@ private boolean promote(Entry<K, V> entry, long now) {
break;
}
if (promoted) {
evict(now);
while (tail != null && shouldPrune(tail, now)) {
Entry<K, V> entryToBeRemoved = tail;
CacheSegment<K, V> segment = getCacheSegment(entryToBeRemoved.key);
if (segment != null) {
segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {});
}
if (unlink(entryToBeRemoved)) {
removalNotifications.add(
new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED)
);
}
}
}
}
return promoted;
return new Tuple<>(promoted, removalNotifications);
}

private void evict(long now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@
@ExperimentalApi
@FunctionalInterface
public interface RemovalListener<K, V> {

/**
* This may be called from multiple threads at once. So implementation needs to be thread safe.
* @param notification removal notification for desired entry.
*/
void onRemoval(RemovalNotification<K, V> notification);
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
this::setInternalActionRetryTimeout
);

}

Expand Down Expand Up @@ -313,6 +317,10 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

public void setInternalActionRetryTimeout(TimeValue internalActionRetryTimeout) {
this.internalActionRetryTimeout = internalActionRetryTimeout;
}

private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,15 @@ public void testInternalLongActionTimeout() {
);
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testInternalActionRetryTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.getKey(), duration, timeUnit)
.build()
);
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionRetryTimeout());
}
}

0 comments on commit 6ae1a9b

Please sign in to comment.