Skip to content

Commit

Permalink
Add OpenSearchTable in flint core (opensearch-project#479)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Aug 6, 2024
1 parent d3e54d4 commit 7b52d81
Show file tree
Hide file tree
Showing 32 changed files with 831 additions and 462 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
- `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance.
- `spark.flint.index.hybridscan.enabled`: default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -30,6 +29,9 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
import org.opensearch.flint.core.logging.CustomLogging;
import org.opensearch.flint.core.logging.OperationMessage;
import org.opensearch.flint.core.metrics.MetricsUtil;
Expand Down Expand Up @@ -69,6 +71,8 @@ public interface IRestHighLevelClient extends Closeable {

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;

IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException;

CreatePitResponse createPit(CreatePitRequest request) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;

import static org.opensearch.flint.core.metrics.MetricConstants.*;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;

/**
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
Expand Down Expand Up @@ -121,6 +124,17 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
}

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX,
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
});
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

/**
Expand Down Expand Up @@ -65,25 +64,6 @@ public interface FlintClient {
*/
void deleteIndex(String indexName);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String query);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param shardId shard id.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String shardId, String query);

/**
* Create {@link FlintWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.network.util.ByteUnit;
import org.opensearch.flint.core.http.FlintRetryOptions;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class FlintOptions implements Serializable {
public static final String SYSTEM_INDEX_KEY_NAME = "spark.flint.job.requestIndex";

/**
* Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader}
* The page size for OpenSearch Rest Request.
*/
public static final String SCROLL_SIZE = "read.scroll_size";
public static final int DEFAULT_SCROLL_SIZE = 100;
Expand Down Expand Up @@ -96,6 +97,10 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";

public static final String SUPPORT_SHARD = "read.support_shard";

public static final String DEFAULT_SUPPORT_SHARD = "true";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand All @@ -109,8 +114,12 @@ public int getPort() {
return Integer.parseInt(options.getOrDefault(PORT, "9200"));
}

public int getScrollSize() {
return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE)));
public Optional<Integer> getScrollSize() {
if (options.containsKey(SCROLL_SIZE)) {
return Optional.of(Integer.parseInt(options.get(SCROLL_SIZE)));
} else {
return Optional.empty();
}
}

public int getScrollDuration() {
Expand Down Expand Up @@ -176,4 +185,16 @@ public int getBatchBytes() {
public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

/**
* FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard
* operation, but shard info is exposed in index settings. Remove this setting when AOSS fix
* the bug.
*
* @return
*/
public boolean supportShard() {
return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase(
DEFAULT_SUPPORT_SHARD);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

/**
* Schema in OpenSearch index mapping format.
*
* @param jsonSchema
*/
case class JsonSchema(jsonSchema: String) extends Schema {
override def asJson(): String = jsonSchema
}
28 changes: 28 additions & 0 deletions flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import org.opensearch.flint.core.metadata.FlintMetadata

/**
* OpenSearch Table metadata.
*
* @param name
* name
* @param properties
* properties
* @param setting
* setting
*/
case class MetaData(name: String, properties: String, setting: String)

object MetaData {
def apply(name: String, flintMetadata: FlintMetadata): MetaData = {
val properties = flintMetadata.getContent
val setting = flintMetadata.indexSettings.getOrElse("")
MetaData(name, properties, setting)
}
}
20 changes: 20 additions & 0 deletions flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

/**
* Table Schema.
*/
trait Schema {

/**
* Return table schema as Json.
*
* @return
* schema.
*/
def asJson(): String
}
85 changes: 85 additions & 0 deletions flint-core/src/main/scala/org/opensearch/flint/core/Table.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import java.io.IOException
import java.util

import com.google.common.base.Strings
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentType}
import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS
import org.opensearch.flint.core.storage.FlintReader
import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder}
import org.opensearch.plugins.SearchPlugin
import org.opensearch.search.SearchModule

/**
* A OpenSearch Table.
*/
trait Table extends Serializable {

/**
* OpenSearch Table MetaData.
*
* @return
* {@link Table}
*/
def metaData(): MetaData

/**
* Is OpenSearch Table splittable.
*
* @return
* true if splittable, otherwise false.
*/
def isSplittable(): Boolean = false

/**
* Slice OpenSearch Table.
* @return
* a sequence of sliced OpenSearch Table
*/
def slice(): Seq[Table]

/**
* Create Flint Reader from DSL query.
*
* @param query
* OpenSearch DSL query.
* @return
*/
def createReader(query: String): FlintReader

/**
* OpenSearch Table schema
*
* @return
* {@link Schema}
*/
def schema(): Schema
}

object Table {

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link
* QueryBuilder} from DSL query string.
*/
val xContentRegistry = new NamedXContentRegistry(
new SearchModule(Settings.builder.build, new util.ArrayList[SearchPlugin]).getNamedXContents)

@throws[IOException]
def queryBuilder(query: String): QueryBuilder = {
if (!Strings.isNullOrEmpty(query)) {
val parser =
XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query)
AbstractQueryBuilder.parseInnerQueryBuilder(parser)
} else {
new MatchAllQueryBuilder
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import java.util.logging.Logger;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

/**
* Flint options related to HTTP request retry.
*/
public class FlintRetryOptions {
public class FlintRetryOptions implements Serializable {

private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ object FlintMetadata {
}
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
}
}
}
Expand Down
Loading

0 comments on commit 7b52d81

Please sign in to comment.