Skip to content

Commit

Permalink
Add capability to disable source recovery_source for an index (opense…
Browse files Browse the repository at this point in the history
…arch-project#13590) (opensearch-project#14064)

Signed-off-by: Navneet Verma <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
navneet1v authored and kkewwei committed Jul 24, 2024
1 parent 4179ff6 commit c04d612
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
Expand Down Expand Up @@ -253,4 +256,144 @@ public void testNoRebalanceOnRollingRestart() throws Exception {
);
}
}

public void testFullRollingRestart_withNoRecoveryPayloadAndSource() throws Exception {
internalCluster().startNode();
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_source")
.field("enabled")
.value(false)
.field("recovery_source_enabled")
.value(false)
.endObject()
.endObject();
CreateIndexResponse response = prepareCreate("test").setMapping(builder).get();
logger.info("Create index response is : {}", response);

final String healthTimeout = "1m";

for (int i = 0; i < 1000; i++) {
client().prepareIndex("test")
.setId(Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
.execute()
.actionGet();
}

for (int i = 1000; i < 2000; i++) {
client().prepareIndex("test")
.setId(Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
.execute()
.actionGet();
}
// ensuring all docs are committed to file system
flush();

logger.info("--> now start adding nodes");
internalCluster().startNode();
internalCluster().startNode();

// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("3")
);

logger.info("--> add two more nodes");
internalCluster().startNode();
internalCluster().startNode();

// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("5")
);

logger.info("--> refreshing and checking data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}

// now start shutting nodes down
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("4")
);

internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("3")
);

logger.info("--> stopped two nodes, verifying data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}

// closing the 3rd node
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("2")
);

internalCluster().stopRandomDataNode();

// make sure the cluster state is yellow, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("1")
);

logger.info("--> one node left, verifying data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}
}
}
117 changes: 105 additions & 12 deletions server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SourceFieldMapper extends MetadataFieldMapper {

public static final String CONTENT_TYPE = "_source";
private final Function<Map<String, ?>, Map<String, Object>> filter;
private final Function<Map<String, ?>, Map<String, Object>> recoverySourceFilter;

/**
* Default parameters for source fields
Expand Down Expand Up @@ -119,21 +120,75 @@ public static class Builder extends MetadataFieldMapper.Builder {
Collections.emptyList()
);

/**
* A mapping parameter which define whether the recovery_source should be added or not. Default value is true.
* <p>
* Recovery source gets added if source is disabled or there are filters that are applied on _source using
* {@link #includes}/{@link #excludes}, which has the possibility to change the original document provided by
* customer. Recovery source is not a permanent field and gets removed during merges. Refer this merge
* policy: org.opensearch.index.engine.RecoverySourcePruneMergePolicy
* <p>
* The main reason for adding the _recovery_source was to ensure Peer to Peer recovery if segments
* are not flushed to the disk. If you are disabling the recovery source, then ensure that you are calling
* flush operation of Opensearch periodically to ensure that segments are flushed to the disk and if required
* Peer to Peer recovery can happen using segment files rather than replaying traffic by querying Lucene
* snapshot.
*
* <p>
* This is an expert mapping parameter.
*
*/
private final Parameter<Boolean> recoverySourceEnabled = Parameter.boolParam(
"recovery_source_enabled",
false,
m -> toType(m).recoverySourceEnabled,
Defaults.ENABLED
);

/**
* Provides capability to add specific fields in the recovery_source.
* <p>
* Refer {@link #recoverySourceEnabled} for more details
* This is an expert parameter.
*/
private final Parameter<List<String>> recoverySourceIncludes = Parameter.stringArrayParam(
"recovery_source_includes",
false,
m -> Arrays.asList(toType(m).recoverySourceIncludes),
Collections.emptyList()
);

/**
* Provides capability to remove specific fields in the recovery_source.
*
* Refer {@link #recoverySourceEnabled} for more details
* This is an expert parameter.
*/
private final Parameter<List<String>> recoverySourceExcludes = Parameter.stringArrayParam(
"recovery_source_excludes",
false,
m -> Arrays.asList(toType(m).recoverySourceExcludes),
Collections.emptyList()
);

public Builder() {
super(Defaults.NAME);
}

@Override
protected List<Parameter<?>> getParameters() {
return Arrays.asList(enabled, includes, excludes);
return Arrays.asList(enabled, includes, excludes, recoverySourceEnabled, recoverySourceIncludes, recoverySourceExcludes);
}

@Override
public SourceFieldMapper build(BuilderContext context) {
return new SourceFieldMapper(
enabled.getValue(),
includes.getValue().toArray(new String[0]),
excludes.getValue().toArray(new String[0])
excludes.getValue().toArray(new String[0]),
recoverySourceEnabled.getValue(),
recoverySourceIncludes.getValue().toArray(new String[0]),
recoverySourceExcludes.getValue().toArray(new String[0])
);
}
}
Expand Down Expand Up @@ -173,24 +228,44 @@ public Query termQuery(Object value, QueryShardContext context) {
}

private final boolean enabled;
private final boolean recoverySourceEnabled;
/** indicates whether the source will always exist and be complete, for use by features like the update API */
private final boolean complete;

private final String[] includes;
private final String[] excludes;
private final String[] recoverySourceIncludes;
private final String[] recoverySourceExcludes;

private SourceFieldMapper() {
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
}

private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) {
private SourceFieldMapper(
boolean enabled,
String[] includes,
String[] excludes,
boolean recoverySourceEnabled,
String[] recoverySourceIncludes,
String[] recoverySourceExcludes
) {
super(new SourceFieldType(enabled));
this.enabled = enabled;
this.includes = includes;
this.excludes = excludes;
final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null;
this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);

// Set parameters for recovery source
this.recoverySourceEnabled = recoverySourceEnabled;
this.recoverySourceIncludes = recoverySourceIncludes;
this.recoverySourceExcludes = recoverySourceExcludes;
final boolean recoverySourcefiltered = CollectionUtils.isEmpty(recoverySourceIncludes) == false
|| CollectionUtils.isEmpty(recoverySourceExcludes) == false;
this.recoverySourceFilter = this.recoverySourceEnabled && recoverySourcefiltered
? XContentMapValues.filter(recoverySourceIncludes, recoverySourceExcludes)
: null;
}

public boolean enabled() {
Expand All @@ -212,22 +287,40 @@ public void preParse(ParseContext context) throws IOException {
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
if (recoverySourceEnabled) {
if (originalSource != null && adaptedSource != originalSource) {
final BytesReference adaptedRecoverySource = applyFilters(
originalSource,
contentType,
recoverySourceEnabled,
recoverySourceFilter
);
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = adaptedRecoverySource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}
}

@Nullable
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable MediaType contentType) throws IOException {
if (enabled && originalSource != null) {
return applyFilters(originalSource, contentType, enabled, filter);
}

@Nullable
private BytesReference applyFilters(
@Nullable BytesReference originalSource,
@Nullable MediaType contentType,
boolean isProvidedSourceEnabled,
@Nullable final Function<Map<String, ?>, Map<String, Object>> filters
) throws IOException {
if (isProvidedSourceEnabled && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
if (filters != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
Map<String, Object> filteredSource = filters.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
MediaType actualContentType = mapTuple.v1();
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);
Expand Down
Loading

0 comments on commit c04d612

Please sign in to comment.