diff --git a/docs/index.md b/docs/index.md index 249e7a770..981ed16f0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -532,6 +532,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. +- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. - `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance. - `spark.flint.index.hybridscan.enabled`: default is false. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index e81508078..04ef216c4 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -7,7 +7,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; @@ -30,6 +29,9 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.client.indices.PutMappingRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.client.opensearch.indices.IndicesStatsRequest; +import org.opensearch.client.opensearch.indices.IndicesStatsResponse; import org.opensearch.flint.core.logging.CustomLogging; import org.opensearch.flint.core.logging.OperationMessage; import org.opensearch.flint.core.metrics.MetricsUtil; @@ -69,6 +71,8 @@ public interface IRestHighLevelClient extends Closeable { DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException; + IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException; + CreatePitResponse createPit(CreatePitRequest request) throws IOException; /** diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index 272380e59..f26a6c158 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -30,13 +30,16 @@ import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.core.pit.CreatePitResponse; import org.opensearch.client.opensearch.core.pit.CreatePitRequest; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.indices.IndicesStatsRequest; +import org.opensearch.client.opensearch.indices.IndicesStatsResponse; import org.opensearch.client.transport.rest_client.RestClientTransport; import java.io.IOException; -import static org.opensearch.flint.core.metrics.MetricConstants.*; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX; /** * A wrapper class for RestHighLevelClient to facilitate OpenSearch operations @@ -121,6 +124,17 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options)); } + @Override + public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException { + return execute(OS_READ_OP_METRIC_PREFIX, + () -> { + OpenSearchClient openSearchClient = + new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(), + new JacksonJsonpMapper())); + return openSearchClient.indices().stats(request); + }); + } + @Override public CreatePitResponse createPit(CreatePitRequest request) throws IOException { return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request)); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 0e9cc57b9..1a3775f0b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -8,7 +8,6 @@ import java.util.Map; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; /** @@ -65,25 +64,6 @@ public interface FlintClient { */ void deleteIndex(String indexName); - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param query DSL query. DSL query is null means match_all - * @return {@link FlintReader}. - */ - FlintReader createReader(String indexName, String query); - - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param shardId shard id. - * @param query DSL query. DSL query is null means match_all - * @return {@link FlintReader}. - */ - FlintReader createReader(String indexName, String shardId, String query); - /** * Create {@link FlintWriter}. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 9be01737c..2678a8f67 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -7,6 +7,7 @@ import java.io.Serializable; import java.util.Map; +import java.util.Optional; import org.apache.spark.network.util.ByteUnit; import org.opensearch.flint.core.http.FlintRetryOptions; @@ -61,7 +62,7 @@ public class FlintOptions implements Serializable { public static final String SYSTEM_INDEX_KEY_NAME = "spark.flint.job.requestIndex"; /** - * Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader} + * The page size for OpenSearch Rest Request. */ public static final String SCROLL_SIZE = "read.scroll_size"; public static final int DEFAULT_SCROLL_SIZE = 100; @@ -96,6 +97,10 @@ public class FlintOptions implements Serializable { public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass"; + public static final String SUPPORT_SHARD = "read.support_shard"; + + public static final String DEFAULT_SUPPORT_SHARD = "true"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -109,8 +114,12 @@ public int getPort() { return Integer.parseInt(options.getOrDefault(PORT, "9200")); } - public int getScrollSize() { - return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE))); + public Optional getScrollSize() { + if (options.containsKey(SCROLL_SIZE)) { + return Optional.of(Integer.parseInt(options.get(SCROLL_SIZE))); + } else { + return Optional.empty(); + } } public int getScrollDuration() { @@ -176,4 +185,16 @@ public int getBatchBytes() { public String getCustomFlintMetadataLogServiceClass() { return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); } + + /** + * FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard + * operation, but shard info is exposed in index settings. Remove this setting when AOSS fix + * the bug. + * + * @return + */ + public boolean supportShard() { + return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase( + DEFAULT_SUPPORT_SHARD); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala b/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala new file mode 100644 index 000000000..f631dda0c --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/JsonSchema.scala @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +/** + * Schema in OpenSearch index mapping format. + * + * @param jsonSchema + */ +case class JsonSchema(jsonSchema: String) extends Schema { + override def asJson(): String = jsonSchema +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala new file mode 100644 index 000000000..98b7f8960 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import org.opensearch.flint.core.metadata.FlintMetadata + +/** + * OpenSearch Table metadata. + * + * @param name + * name + * @param properties + * properties + * @param setting + * setting + */ +case class MetaData(name: String, properties: String, setting: String) + +object MetaData { + def apply(name: String, flintMetadata: FlintMetadata): MetaData = { + val properties = flintMetadata.getContent + val setting = flintMetadata.indexSettings.getOrElse("") + MetaData(name, properties, setting) + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala b/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala new file mode 100644 index 000000000..37cb2204f --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/Schema.scala @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +/** + * Table Schema. + */ +trait Schema { + + /** + * Return table schema as Json. + * + * @return + * schema. + */ + def asJson(): String +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala new file mode 100644 index 000000000..a714542e7 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/Table.scala @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import java.io.IOException +import java.util + +import com.google.common.base.Strings +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentType} +import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS +import org.opensearch.flint.core.storage.FlintReader +import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder} +import org.opensearch.plugins.SearchPlugin +import org.opensearch.search.SearchModule + +/** + * A OpenSearch Table. + */ +trait Table extends Serializable { + + /** + * OpenSearch Table MetaData. + * + * @return + * {@link Table} + */ + def metaData(): MetaData + + /** + * Is OpenSearch Table splittable. + * + * @return + * true if splittable, otherwise false. + */ + def isSplittable(): Boolean = false + + /** + * Slice OpenSearch Table. + * @return + * a sequence of sliced OpenSearch Table + */ + def slice(): Seq[Table] + + /** + * Create Flint Reader from DSL query. + * + * @param query + * OpenSearch DSL query. + * @return + */ + def createReader(query: String): FlintReader + + /** + * OpenSearch Table schema + * + * @return + * {@link Schema} + */ + def schema(): Schema +} + +object Table { + + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link + * QueryBuilder} from DSL query string. + */ + val xContentRegistry = new NamedXContentRegistry( + new SearchModule(Settings.builder.build, new util.ArrayList[SearchPlugin]).getNamedXContents) + + @throws[IOException] + def queryBuilder(query: String): QueryBuilder = { + if (!Strings.isNullOrEmpty(query)) { + val parser = + XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query) + AbstractQueryBuilder.parseInnerQueryBuilder(parser) + } else { + new MatchAllQueryBuilder + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 7b6139014..bf5352f3b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -14,11 +14,12 @@ import java.util.logging.Logger; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; +import java.io.Serializable; /** * Flint options related to HTTP request retry. */ -public class FlintRetryOptions { +public class FlintRetryOptions implements Serializable { private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName()); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index f432af0d0..e4e94cc8c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -151,6 +151,7 @@ object FlintMetadata { } case "properties" => builder.schema(parser.map()) + case _ => // Ignore other fields, for instance, dynamic. } } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e60080e54..1a7c976c2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,7 +5,6 @@ package org.opensearch.flint.core.storage; -import com.google.common.base.Strings; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; @@ -14,33 +13,22 @@ import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.index.query.AbstractQueryBuilder; -import org.opensearch.index.query.MatchAllQueryBuilder; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.search.SearchModule; -import org.opensearch.search.builder.SearchSourceBuilder; import scala.Option; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Function; import java.util.logging.Logger; import java.util.stream.Collectors; -import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; - /** * Flint client implementation for OpenSearch storage. */ @@ -48,15 +36,6 @@ public class FlintOpenSearchClient implements FlintClient { private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName()); - - /** - * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. - */ - private final static NamedXContentRegistry - xContentRegistry = - new NamedXContentRegistry(new SearchModule(Settings.builder().build(), - new ArrayList<>()).getNamedXContents()); - /** * Invalid index name characters to percent-encode, * excluding '*' because it's reserved for pattern matching. @@ -64,9 +43,6 @@ public class FlintOpenSearchClient implements FlintClient { private final static Set INVALID_INDEX_NAME_CHARS = Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); - private final static Function SHARD_ID_PREFERENCE = - shardId -> shardId == null ? shardId : "_shards:"+shardId; - private final FlintOptions options; public FlintOpenSearchClient(FlintOptions options) { @@ -169,47 +145,6 @@ public void deleteIndex(String indexName) { } } - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param query DSL query. DSL query is null means match_all. - * @return {@link FlintReader}. - */ - @Override - public FlintReader createReader(String indexName, String query) { - return createReader(indexName, query, null); - } - - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param query DSL query. DSL query is null means match_all - * @param shardId shardId - * @return - */ - @Override - public FlintReader createReader(String indexName, String query, String shardId) { - LOG.info("Creating Flint index reader for " + indexName + " with query " + query + " shardId " + shardId); - try { - QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - if (!Strings.isNullOrEmpty(query)) { - XContentParser - parser = - XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); - queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); - } - return new OpenSearchScrollReader(createClient(), - sanitizeIndexName(indexName), - new SearchSourceBuilder().query(queryBuilder), - options, - SHARD_ID_PREFERENCE.apply(shardId)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - public FlintWriter createWriter(String indexName) { LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " + "batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes())); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index e2e831bd0..d5fb45f99 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core.storage; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -22,8 +23,9 @@ */ public abstract class OpenSearchReader implements FlintReader { + @VisibleForTesting /** Search request source builder. */ - private final SearchRequest searchRequest; + public final SearchRequest searchRequest; protected final IRestHighLevelClient client; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java deleted file mode 100644 index bfcf15636..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.storage; - -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; -import org.opensearch.client.RequestOptions; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.Strings; -import org.opensearch.flint.core.FlintOptions; -import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.search.builder.SearchSourceBuilder; - -import java.io.IOException; -import java.util.Optional; -import java.util.function.Function; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/ - */ -public class OpenSearchScrollReader extends OpenSearchReader { - - private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName()); - - private final FlintOptions options; - - private final TimeValue scrollDuration; - - private String scrollId = null; - - public OpenSearchScrollReader( - IRestHighLevelClient client, - String indexName, - SearchSourceBuilder searchSourceBuilder, - FlintOptions options) { - this(client, indexName, searchSourceBuilder, options, null); - } - - public OpenSearchScrollReader( - IRestHighLevelClient client, - String indexName, - SearchSourceBuilder searchSourceBuilder, - FlintOptions options, - String preference) { - super(client, - applyPreference(preference).apply(new SearchRequest().indices(indexName) - .source(searchSourceBuilder.size(options.getScrollSize())))); - this.options = options; - this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration()); - } - - /** - * search. - */ - Optional search(SearchRequest request) throws IOException { - if (Strings.isNullOrEmpty(scrollId)) { - request.scroll(scrollDuration); - SearchResponse response = client.search(request, RequestOptions.DEFAULT); - scrollId = response.getScrollId(); - return Optional.of(response); - } else { - try { - return Optional - .of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId), - RequestOptions.DEFAULT)); - } catch (OpenSearchStatusException e) { - LOG.log(Level.WARNING, "scroll context not exist", e); - scrollId = null; - return Optional.empty(); - } - } - } - - /** - * clean the scroll context. - */ - void clean() throws IOException { - try { - if (!Strings.isNullOrEmpty(scrollId)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - } - } catch (OpenSearchStatusException e) { - // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 - LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" + - ".com/opensearch-project/OpenSearch/issues/11121.", e); - } finally { - scrollId = null; - } - } - - /** - * Public for testing. - */ - public String getScrollId() { - return scrollId; - } - - static private Function applyPreference(String preference) { - if (Strings.isNullOrEmpty(preference)) { - return searchRequest -> searchRequest; - } else { - return searchRequest -> searchRequest.preference(preference); - } - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchSearchAfterQueryReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchSearchAfterQueryReader.java new file mode 100644 index 000000000..69ad748fc --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchSearchAfterQueryReader.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.common.Strings; +import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.search.builder.SearchSourceBuilder; + +import java.util.Arrays; +import java.util.Optional; +import java.util.function.Function; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Read OpenSearch Index using PIT with search_after. + */ +public class OpenSearchSearchAfterQueryReader extends OpenSearchReader { + + private static final Logger LOG = + Logger.getLogger(OpenSearchSearchAfterQueryReader.class.getName()); + + /** + * current search_after value, init value is null + */ + private Object[] search_after = null; + public OpenSearchSearchAfterQueryReader(IRestHighLevelClient client, SearchRequest request) { + super(client, request); + } + + /** + * search. + */ + Optional search(SearchRequest request) { + try { + Optional response; + if (search_after != null) { + request.source().searchAfter(search_after); + } + response = Optional.of(client.search(request, RequestOptions.DEFAULT)); + int length = response.get().getHits().getHits().length; + if (length == 0) { + search_after = null; + return Optional.empty(); + } + // update search_after + search_after = response.get().getHits().getAt(length - 1).getSortValues(); + LOG.info("update search_after " + Arrays.stream(search_after) + .map(Object::toString) + .collect(Collectors.joining(","))); + return response; + } catch (Exception e) { + LOG.warning(e.getMessage()); + throw new RuntimeException(e); + } + } + + /** + * nothing to clean + */ + void clean() {} + + static private Function applyPreference(String preference) { + if (Strings.isNullOrEmpty(preference)) { + return searchRequest -> searchRequest; + } else { + return searchRequest -> searchRequest.preference(preference); + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala new file mode 100644 index 000000000..594db748b --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexShardTable.scala @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.table + +import org.opensearch.action.search.SearchRequest +import org.opensearch.flint.core.{FlintOptions, MetaData, Table} +import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder + +/** + * Represents an OpenSearch index shard. + * + * @param metaData + * MetaData containing information about the OpenSearch index. + * @param option + * FlintOptions containing configuration options for the Flint client. + * @param shardId + * Shard Id. + */ +class OpenSearchIndexShardTable(metaData: MetaData, option: FlintOptions, shardId: Int) + extends OpenSearchIndexTable(metaData, option) { + + override def slice(): Seq[Table] = { + throw new UnsupportedOperationException("Can't slice OpenSearchIndexShardTable") + } + + override def createReader(query: String): FlintReader = { + new OpenSearchSearchAfterQueryReader( + OpenSearchClientUtils.createClient(option), + new SearchRequest() + .indices(name) + .source( + new SearchSourceBuilder() + .query(Table.queryBuilder(query)) + .size(pageSize) + .sort("_doc", SortOrder.ASC)) + .preference(s"_shards:$shardId")) + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala new file mode 100644 index 000000000..783163687 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.table + +import scala.collection.JavaConverters._ + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.JsonAST.JString +import org.json4s.jackson.JsonMethods +import org.json4s.native.Serialization +import org.opensearch.action.search.SearchRequest +import org.opensearch.client.opensearch.indices.IndicesStatsRequest +import org.opensearch.client.opensearch.indices.stats.IndicesStats +import org.opensearch.flint.core._ +import org.opensearch.flint.core.storage.{FlintReader, OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} +import org.opensearch.flint.core.table.OpenSearchIndexTable.maxSplitSizeBytes +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder + +/** + * Represents an OpenSearch index. + * + * @param metaData + * MetaData containing information about the OpenSearch index. + * @param option + * FlintOptions containing configuration options for the Flint client. + */ +class OpenSearchIndexTable(metaData: MetaData, option: FlintOptions) extends Table { + @transient implicit val formats: Formats = Serialization.formats(NoTypeHints) + + /** + * The name of the index. + */ + lazy val name: String = metaData.name + + /** + * The index stats. + */ + lazy val indexStats: IndicesStats = + OpenSearchClientUtils + .createClient(option) + .stats(new IndicesStatsRequest.Builder().index(name).build()) + .indices() + .get(name) + + /** + * The page size for OpenSearch Rest Request. + */ + lazy val pageSize: Int = { + if (option.getScrollSize.isPresent) { + option.getScrollSize.get() + } else { + val docCount = indexStats.primaries().docs().count() + if (docCount == 0) { + maxResultWindow + } else { + val totalSizeBytes = indexStats.primaries().store().sizeInBytes + val docSize = Math.ceil(totalSizeBytes / docCount).toLong + Math.max(Math.min(maxSplitSizeBytes / docSize, maxResultWindow), 1).toInt + } + } + } + + /** + * The number of shards in the index. + */ + lazy val numberOfShards: Int = { + if (option.supportShard()) { + (JsonMethods.parse(metaData.setting) \ "index.number_of_shards").extract[String].toInt + } else { + 1 + } + } + + /** + * The maximum result window for the index. + */ + lazy val maxResultWindow: Int = { + (JsonMethods.parse(metaData.setting) \ "index.max_result_window") match { + case JString(value) => value.toInt + case _ => 10000 + } + } + + /** + * Slices the table. + */ + override def slice(): Seq[Table] = { + Range(0, numberOfShards).map(shardId => + new OpenSearchIndexShardTable(metaData, option, shardId)) + } + + /** + * Creates a reader for the table. Not supported for OpenSearchIndexTable. + * + * @param query + * The query string. + * @return + * A FlintReader instance. + */ + override def createReader(query: String): FlintReader = { + new OpenSearchSearchAfterQueryReader( + OpenSearchClientUtils.createClient(option), + new SearchRequest() + .indices(name) + .source( + new SearchSourceBuilder() + .query(Table.queryBuilder(query)) + .size((pageSize)) + .sort("_doc", SortOrder.ASC) + .sort("_id", SortOrder.ASC))) + } + + /** + * Returns the schema of the table. + * + * @return + * A Schema instance. + */ + override def schema(): Schema = JsonSchema(metaData.properties) + + /** + * Returns the metadata of the table. + * + * @return + * A MetaData instance. + */ + override def metaData(): MetaData = metaData + + /** + * Is OpenSearch Table splittable. + * + * @return + * true if splittable, otherwise false. + */ + override def isSplittable(): Boolean = numberOfShards > 1 +} + +object OpenSearchIndexTable { + + /** + * Max OpenSearch Request Page size is 10MB. + */ + val maxSplitSizeBytes = 10 * 1024 * 1024 +} + +object OpenSearchCluster { + + /** + * Creates list of OpenSearchIndexTable instance of indices in OpenSearch domain. + * + * @param indexName + * tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name. + * @param options + * The options for Flint. + * @return + * An list of OpenSearchIndexTable instance. + */ + def apply(indexName: String, options: FlintOptions): Seq[OpenSearchIndexTable] = { + val client = FlintClientBuilder.build(options) + client + .getAllIndexMetadata(indexName.split(","): _*) + .asScala + .toMap + .map(entry => { + new OpenSearchIndexTable(MetaData.apply(entry._1, entry._2), options) + }) + .toSeq + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index dc2f5fe6a..01b4e266c 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -37,20 +37,44 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { | } |""".stripMargin + val testDynamic: String = s""" + | { + | "dynamic": "strict", + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": {}, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + val testIndexSettingsJson: String = """ | { "number_of_shards": 3 } |""".stripMargin "constructor" should "deserialize the given JSON and assign parsed value to field" in { - val metadata = FlintMetadata(testMetadataJson, testIndexSettingsJson) - - metadata.version shouldBe current() - metadata.name shouldBe "test_index" - metadata.kind shouldBe "test_kind" - metadata.source shouldBe "test_source_table" - metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava) - metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + Seq(testMetadataJson, testDynamic).foreach(mapping => { + val metadata = FlintMetadata(mapping, testIndexSettingsJson) + metadata.version shouldBe current() + metadata.name shouldBe "test_index" + metadata.kind shouldBe "test_kind" + metadata.source shouldBe "test_source_table" + metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava) + metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + }) } "getContent" should "serialize all fields to JSON" in { diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala b/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala new file mode 100644 index 000000000..9f6d045fc --- /dev/null +++ b/flint-core/src/test/scala/org/opensearch/flint/core/table/OpenSearchIndexTableSpec.scala @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.table + +import java.util.Optional + +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ +import org.opensearch.client.opensearch.indices.{IndicesStatsRequest, IndicesStatsResponse} +import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient, JsonSchema, MetaData} +import org.opensearch.flint.core.storage.{OpenSearchClientUtils, OpenSearchSearchAfterQueryReader} +import org.opensearch.search.builder.SearchSourceBuilder +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar + +class OpenSearchIndexTableSpec + extends AnyFlatSpec + with BeforeAndAfter + with Matchers + with MockitoSugar { + + private val clientUtils = mockStatic(classOf[OpenSearchClientUtils]) + private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS) + + before { + clientUtils + .when(() => OpenSearchClientUtils.createClient(any(classOf[FlintOptions]))) + .thenReturn(openSearchClient) + } + + def mockTable( + scrollSize: Option[Int], + docCount: Long, + storeSizeInBytes: Long, + supportShard: Boolean = true, + numberOfShards: Int = 1): OpenSearchIndexTable = { + val metaData = mock[MetaData] + val options = mock[FlintOptions] + val mockIndicesStatsResp = mock[IndicesStatsResponse](RETURNS_DEEP_STUBS) + + when(metaData.name).thenReturn("test-index") + when(metaData.setting).thenReturn(s"""{"index.number_of_shards":"$numberOfShards"}""") + scrollSize match { + case Some(size) => + when(options.getScrollSize).thenReturn(Optional.of(Integer.valueOf(size))) + case None => when(options.getScrollSize).thenReturn(Optional.empty[Integer]()) + } + when(options.supportShard()).thenReturn(supportShard) + + when(openSearchClient.stats(any[IndicesStatsRequest])).thenReturn(mockIndicesStatsResp) + when(mockIndicesStatsResp.indices().get(any[String]).primaries().docs().count()) + .thenReturn(docCount) + when(mockIndicesStatsResp.indices().get(any[String]).primaries().store().sizeInBytes) + .thenReturn(storeSizeInBytes) + + new OpenSearchIndexTable(metaData, options) { + override lazy val maxResultWindow: Int = 10000 + } + } + + "OpenSearchIndexTable" should "return the correct pageSize when scroll size is present" in { + val table = mockTable(Some(100), 1000L, 10000000L) + table.pageSize shouldBe 100 + } + + it should "return the maxResultWindow when getScrollSize is not configured and no documents are present" in { + val table = mockTable(None, 0L, 0L) + table.pageSize shouldBe table.maxResultWindow + } + + it should "return the correct pageSize when getScrollSize is not configured and docSize is less than 10MB" in { + val docCount = 128L + val docSize = 1 * 1024 * 1024 // 1MB + val table = mockTable(None, docCount, docSize * docCount) + table.pageSize shouldBe 10 + } + + it should "return 1 when getScrollSize is not configured and docSize is equal to 10MB" in { + val docSize = 10 * 1024 * 1024 // 10MB + val table = mockTable(None, 1L, docSize) + table.pageSize shouldBe 1 + } + + it should "return 1 when getScrollSize is not configured and docSize is greater than 10MB" in { + val docSize = 20 * 1024 * 1024 // 20MB + val table = mockTable(None, 1L, docSize) + table.pageSize shouldBe 1 + } + + it should "return the correct schema" in { + val metaData = mock[MetaData] + val options = mock[FlintOptions] + + when(metaData.properties).thenReturn("properties") + + val table = new OpenSearchIndexTable(metaData, options) + + table.schema() shouldBe JsonSchema("properties") + } + + it should "return the correct metadata" in { + val metaData = mock[MetaData] + val options = mock[FlintOptions] + + val table = new OpenSearchIndexTable(metaData, options) + + table.metaData() shouldBe metaData + } + + it should "return true for isSplittable when there are multiple shards" in { + val table = mockTable(None, 1000L, 10000000L, numberOfShards = 3) + table.isSplittable() shouldBe true + } + + it should "return false for isSplittable when there is only one shard" in { + val table = mockTable(None, 1000L, 10000000L, supportShard = true, numberOfShards = 1) + table.isSplittable() shouldBe false + } + + it should "return false for isSplittable when index does not support shard" in { + val table = mockTable(None, 1000L, 10000000L, supportShard = false, numberOfShards = 1) + table.isSplittable() shouldBe false + } + + it should "return the correct number of shard tables when sliced" in { + val numberOfShards = 3 + val table = mockTable(None, 1000L, 10000000L, numberOfShards = numberOfShards) + val slicedTables = table.slice() + slicedTables.size shouldBe numberOfShards + slicedTables.foreach(_ shouldBe a[OpenSearchIndexShardTable]) + } + + it should "create a reader for the table" in { + val query = "" + val table = mockTable(None, 1000L, 10000000L, numberOfShards = 1) + val reader = table.createReader(query) + reader shouldBe a[OpenSearchSearchAfterQueryReader] + + val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest + searchRequest.indices() should contain("test-index") + + val sourceBuilder = searchRequest.source().asInstanceOf[SearchSourceBuilder] + sourceBuilder.query() should not be null + sourceBuilder.size() shouldBe table.pageSize + + val sorts = sourceBuilder.sorts() + sorts.size() shouldBe 2 + sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}") + sorts.get(1).toString should include("{\n \"_id\" : {\n \"order\" : \"asc\"\n }\n}") + } + + "OpenSearchIndexShardTable" should "create reader correctly" in { + val query = "" + val indexTable = mockTable(None, 1000L, 10000000L, numberOfShards = 3) + val table = indexTable.slice().head + val reader = table.createReader(query) + reader shouldBe a[OpenSearchSearchAfterQueryReader] + + val searchRequest = reader.asInstanceOf[OpenSearchSearchAfterQueryReader].searchRequest + searchRequest.indices() should contain("test-index") + + searchRequest.preference() shouldBe "_shards:0" + + val sourceBuilder = searchRequest.source() + sourceBuilder.query() should not be null + sourceBuilder.size() shouldBe indexTable.pageSize + + val sorts = sourceBuilder.sorts() + sorts.size() shouldBe 1 + sorts.get(0).toString should include("{\n \"_doc\" : {\n \"order\" : \"asc\"\n }\n}") + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala deleted file mode 100644 index 80eab850f..000000000 --- a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.opensearch.table - -import scala.collection.JavaConverters._ - -import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions} -import org.opensearch.flint.core.metadata.FlintMetadata - -import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.types.StructType - -/** - * Represents an OpenSearch table. - * - * @param tableName - * The name of the table. - * @param metadata - * Metadata of the table. - */ -case class OpenSearchTable(tableName: String, metadata: Map[String, FlintMetadata]) { - /* - * FIXME. we use first index schema in multiple indices. we should merge StructType to widen type - */ - lazy val schema: StructType = { - metadata.values.headOption - .map(m => FlintDataType.deserialize(m.getContent)) - .getOrElse(StructType(Nil)) - } - - lazy val partitions: Array[PartitionInfo] = { - metadata.map { case (partitionName, metadata) => - PartitionInfo.apply(partitionName, metadata.indexSettings.get) - }.toArray - } -} - -object OpenSearchTable { - - /** - * Creates an OpenSearchTable instance. - * - * @param tableName - * tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name. - * @param options - * The options for Flint. - * @return - * An instance of OpenSearchTable. - */ - def apply(tableName: String, options: FlintOptions): OpenSearchTable = { - OpenSearchTable( - tableName, - FlintClientBuilder - .build(options) - .getAllIndexMetadata(tableName.split(","): _*) - .asScala - .toMap) - } -} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/PartitionInfo.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/PartitionInfo.scala deleted file mode 100644 index 0b11c2104..000000000 --- a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/PartitionInfo.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.opensearch.table - -import org.json4s.{Formats, NoTypeHints} -import org.json4s.jackson.JsonMethods -import org.json4s.native.Serialization - -/** - * Represents information about a partition in OpenSearch. Partition is backed by OpenSearch - * Index. Each partition contain a list of Shards - * - * @param partitionName - * partition name. - * @param shards - * shards. - */ -case class PartitionInfo(partitionName: String, shards: Array[ShardInfo]) {} - -object PartitionInfo { - implicit val formats: Formats = Serialization.formats(NoTypeHints) - - /** - * Creates a PartitionInfo instance. - * - * @param partitionName - * The name of the partition. - * @param settings - * The settings of the partition. - * @return - * An instance of PartitionInfo. - */ - def apply(partitionName: String, settings: String): PartitionInfo = { - val shards = - Range.apply(0, numberOfShards(settings)).map(id => ShardInfo(partitionName, id)).toArray - PartitionInfo(partitionName, shards) - } - - /** - * Extracts the number of shards from the settings string. - * - * @param settingStr - * The settings string. - * @return - * The number of shards. - */ - def numberOfShards(settingStr: String): Int = { - val setting = JsonMethods.parse(settingStr) - (setting \ "index.number_of_shards").extract[String].toInt - } -} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/ShardInfo.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/ShardInfo.scala deleted file mode 100644 index 7946bf1cb..000000000 --- a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/ShardInfo.scala +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.opensearch.table - -/** - * Represents information about a shard in OpenSearch. - * - * @param indexName - * The name of the index. - * @param id - * The ID of the shard. - */ -case class ShardInfo(indexName: String, id: Int) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala index eebad81c8..80538dd5f 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala @@ -5,8 +5,6 @@ package org.apache.spark.sql.flint -import org.opensearch.flint.core.FlintClientBuilder - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} @@ -20,14 +18,10 @@ case class FlintPartitionReaderFactory( pushedPredicates: Array[Predicate]) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - partition match { - case OpenSearchSplit(shardInfo) => - val query = FlintQueryCompiler(schema).compile(pushedPredicates) - val flintClient = FlintClientBuilder.build(options.flintOptions()) - new FlintPartitionReader( - flintClient.createReader(shardInfo.indexName, query, shardInfo.id.toString), - schema, - options) - } + val query = FlintQueryCompiler(schema).compile(pushedPredicates) + new FlintPartitionReader( + partition.asInstanceOf[OpenSearchSplit].table.createReader(query), + schema, + options) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala index b1ec83cae..ed6902841 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -7,15 +7,11 @@ package org.apache.spark.sql.flint import java.util -import scala.collection.JavaConverters._ +import org.opensearch.flint.core.table.OpenSearchCluster -import org.opensearch.flint.core.FlintClientBuilder - -import org.apache.spark.opensearch.catalog.OpenSearchCatalog -import org.apache.spark.opensearch.table.OpenSearchTable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} -import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} +import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.datatype.FlintDataType @@ -42,17 +38,21 @@ class FlintReadOnlyTable( lazy val name: String = flintSparkConf.tableName() - lazy val openSearchTable: OpenSearchTable = - OpenSearchTable.apply(name, flintSparkConf.flintOptions()) + lazy val tables: Seq[org.opensearch.flint.core.Table] = + OpenSearchCluster.apply(name, flintSparkConf.flintOptions()) + + lazy val resolvedTablesSchema: StructType = tables.headOption + .map(tbl => FlintDataType.deserialize(tbl.schema().asJson())) + .getOrElse(StructType(Nil)) lazy val schema: StructType = { - userSpecifiedSchema.getOrElse { openSearchTable.schema } + userSpecifiedSchema.getOrElse { resolvedTablesSchema } } override def capabilities(): util.Set[TableCapability] = util.EnumSet.of(BATCH_READ) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - FlintScanBuilder(openSearchTable, schema, flintSparkConf) + FlintScanBuilder(tables, schema, flintSparkConf) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala index c6e03e858..201e7c748 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala @@ -5,24 +5,33 @@ package org.apache.spark.sql.flint -import org.apache.spark.opensearch.table.{OpenSearchTable, ShardInfo} +import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.StructType case class FlintScan( - table: OpenSearchTable, + tables: Seq[org.opensearch.flint.core.Table], schema: StructType, options: FlintSparkConf, pushedPredicates: Array[Predicate]) extends Scan - with Batch { + with Batch + with Logging { override def readSchema(): StructType = schema override def planInputPartitions(): Array[InputPartition] = { - table.partitions.flatMap(p => p.shards.map(s => OpenSearchSplit(s))).toArray + tables + .flatMap(table => { + if (table.isSplittable()) { + table.slice().map(table => OpenSearchSplit(table)) + } else { + Seq(OpenSearchSplit(table)) + } + }) + .toArray } override def createReaderFactory(): PartitionReaderFactory = { @@ -42,9 +51,10 @@ case class FlintScan( } /** - * Each OpenSearchSplit is backed by an OpenSearch shard. + * Each OpenSearchSplit is backed by an OpenSearch index table. * - * @param shardInfo - * shardInfo + * @param table + * {@link org.opensearch.flint.core.Table} */ -private[spark] case class OpenSearchSplit(shardInfo: ShardInfo) extends InputPartition {} +private[spark] case class OpenSearchSplit(table: org.opensearch.flint.core.Table) + extends InputPartition {} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index 8d8d02c02..0c6f7d700 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -6,14 +6,16 @@ package org.apache.spark.sql.flint import org.apache.spark.internal.Logging -import org.apache.spark.opensearch.table.OpenSearchTable import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownV2Filters} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.storage.FlintQueryCompiler import org.apache.spark.sql.types.StructType -case class FlintScanBuilder(table: OpenSearchTable, schema: StructType, options: FlintSparkConf) +case class FlintScanBuilder( + tables: Seq[org.opensearch.flint.core.Table], + schema: StructType, + options: FlintSparkConf) extends ScanBuilder with SupportsPushDownV2Filters with Logging { @@ -21,7 +23,7 @@ case class FlintScanBuilder(table: OpenSearchTable, schema: StructType, options: private var pushedPredicate = Array.empty[Predicate] override def build(): Scan = { - FlintScan(table, schema, options, pushedPredicate) + FlintScan(tables, schema, options, pushedPredicate) } override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 7ea284959..dc110afb9 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -118,14 +118,19 @@ object FlintSparkConf { val SCROLL_SIZE = FlintConfig("spark.datasource.flint.read.scroll_size") .datasourceOption() - .doc("scroll read size") - .createWithDefault("100") + .doc("scroll read page size") + .createOptional() val SCROLL_DURATION = FlintConfig(s"spark.datasource.flint.${FlintOptions.SCROLL_DURATION}") .datasourceOption() .doc("scroll duration in minutes") .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION)) + val SUPPORT_SHARD = FlintConfig(s"spark.datasource.flint.${FlintOptions.SUPPORT_SHARD}") + .datasourceOption() + .doc("indicate does index support shard or not") + .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SUPPORT_SHARD)) + val MAX_RETRIES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.MAX_RETRIES}") .datasourceOption() .doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3") @@ -265,7 +270,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY, - SCROLL_SIZE, SCROLL_DURATION, SCHEME, AUTH, @@ -290,7 +294,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable SESSION_ID, REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, - EXCLUDE_JOB_IDS) + EXCLUDE_JOB_IDS, + SCROLL_SIZE) .map(conf => (conf.optionKey, conf.readFrom(reader))) .flatMap { case (_, None) => None diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 4203d5b86..ca8349cd5 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -23,7 +23,7 @@ class FlintSparkConfSuite extends FlintSuite { val flintOptions = FlintSparkConf().flintOptions() assert(flintOptions.getHost == "127.0.0.1") - assert(flintOptions.getScrollSize == 10) + assert(flintOptions.getScrollSize.get() == 10) // default value assert(flintOptions.getPort == 9200) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 8ede40f86..a590eccb1 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -11,7 +11,8 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} -import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} +import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} @@ -33,6 +34,10 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { private val clientBuilder = mockStatic(classOf[FlintClientBuilder]) private val client = mock[FlintClient](RETURNS_DEEP_STUBS) + /** Mock IRestHighLevelClient to avoid looking for real OpenSearch cluster */ + private val clientUtils = mockStatic(classOf[OpenSearchClientUtils]) + private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS) + /** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */ private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS) @@ -55,6 +60,10 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) .thenReturn(client) when(flint.spark).thenReturn(spark) + // Mock static + clientUtils + .when(() => OpenSearchClientUtils.createClient(any(classOf[FlintOptions]))) + .thenReturn(openSearchClient) } override protected def afterAll(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala index 47bedb69c..e1e967ded 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -70,6 +70,11 @@ trait OpenSearchSuite extends BeforeAndAfterAll { | "number_of_replicas": "0" |}""".stripMargin + val multipleShardSetting = """{ + | "number_of_shards": "2", + | "number_of_replicas": "0" + |}""".stripMargin + def simpleIndex(indexName: String): Unit = { val mappings = """{ | "properties": { @@ -101,6 +106,19 @@ trait OpenSearchSuite extends BeforeAndAfterAll { | } |}""".stripMargin + val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin + index(indexName, multipleShardSetting, mappings, docs) + } + + def multipleShardAndDocIndex(indexName: String, N: Int): Unit = { + val mappings = """{ + | "properties": { + | "id": { + | "type": "integer" + | } + | } + |}""".stripMargin + val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin index(indexName, oneNodeSetting, mappings, docs) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 1373654aa..53188fb5a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -11,20 +11,20 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.mockito.Mockito.when -import org.opensearch.client.json.jackson.JacksonJsonpMapper -import org.opensearch.client.opensearch.OpenSearchClient -import org.opensearch.client.transport.rest_client.RestClientTransport import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, OpenSearchScrollReader} +import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.table.OpenSearchCluster import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.sql.flint.config.FlintSparkConf.{DATA_SOURCE_NAME, REFRESH_POLICY, SCROLL_DURATION, SCROLL_SIZE} +import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { + lazy val options = new FlintOptions(openSearchOptions.asJava) + /** Lazy initialize after container started. */ lazy val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) @@ -146,14 +146,14 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M writer.write("\n") writer.flush() writer.close() - flintClient.createReader(indexName, "").hasNext shouldBe true + createTable(indexName, options).createReader("").hasNext shouldBe true flintClient.deleteIndex(indexName) flintClient.exists(indexName) shouldBe false } it should "percent-encode invalid index name characters" in { - val indexName = "test ,:\"+/\\|?#><" + val indexName = "test :\"+/\\|?#><" flintClient.createIndex( indexName, FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) @@ -170,7 +170,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M writer.write("\n") writer.flush() writer.close() - flintClient.createReader(indexName, "").hasNext shouldBe true + createTable(indexName, options).createReader("").hasNext shouldBe true flintClient.deleteIndex(indexName) flintClient.exists(indexName) shouldBe false @@ -185,7 +185,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M withIndexName(indexName) { simpleIndex(indexName) val match_all = null - val reader = flintClient.createReader(indexName, match_all) + val reader = createTable(indexName, options).createReader(match_all) reader.hasNext shouldBe true reader.next shouldBe """{"accountId":"123","eventName":"event","eventSource":"source"}""" @@ -194,6 +194,46 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M } } + it should "read docs from index with multiple shard successfully" in { + val indexName = "t0001" + val expectedCount = 5 + withIndexName(indexName) { + multipleShardAndDocIndex(indexName, expectedCount) + val match_all = null + val reader = createTable(indexName, options).createReader(match_all) + + var totalCount = 0 + while (reader.hasNext) { + reader.next() + totalCount += 1 + } + totalCount shouldBe expectedCount + } + } + + it should "read docs from shard table successfully" in { + val indexName = "t0001" + val expectedCount = 5 + withIndexName(indexName) { + multipleShardAndDocIndex(indexName, expectedCount) + val match_all = null + val totalCount = createTable(indexName, options) + .slice() + .map(shardTable => { + val reader = shardTable.createReader(match_all) + var count = 0 + while (reader.hasNext) { + reader.next() + count += 1 + } + count + }) + .sum + + totalCount shouldBe expectedCount + } + } + it should "write docs to index successfully " in { val indexName = "t0001" withIndexName(indexName) { @@ -218,7 +258,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M writer.close() val match_all = null - val reader = flintClient.createReader(indexName, match_all) + val reader = + createTable(indexName, new FlintOptions(options.asJava)).createReader(match_all) reader.hasNext shouldBe true reader.next shouldBe """{"aInt":1}""" reader.hasNext shouldBe false @@ -226,53 +267,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M } } - it should "scroll context close properly after read" in { - val indexName = "t0001" - withIndexName(indexName) { - simpleIndex(indexName) - val match_all = null - val reader = flintClient.createReader(indexName, match_all) - - reader.hasNext shouldBe true - reader.next shouldBe """{"accountId":"123","eventName":"event","eventSource":"source"}""" - reader.hasNext shouldBe false - reader.close() - - reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null - scrollShouldClosed() - } - } - - it should "no item return after scroll timeout" in { - val indexName = "t0001" - withIndexName(indexName) { - multipleDocIndex(indexName, 2) - - val options = - openSearchOptions + (s"${SCROLL_DURATION.optionKey}" -> "1", s"${SCROLL_SIZE.optionKey}" -> "1") - val flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava)) - val match_all = null - val reader = flintClient.createReader(indexName, match_all) - - reader.hasNext shouldBe true - reader.next - // scroll context expired after 1 minutes - Thread.sleep(60 * 1000 * 2) - reader.hasNext shouldBe false - reader.close() - - reader.asInstanceOf[OpenSearchScrollReader].getScrollId shouldBe null - scrollShouldClosed() - } - } - - def scrollShouldClosed(): Unit = { - val transport = - new RestClientTransport(openSearchClient.getLowLevelClient, new JacksonJsonpMapper) - val client = new OpenSearchClient(transport) - - val response = client.nodes().stats() - response.nodes().size() should be > 0 - response.nodes().forEach((_, stats) => stats.indices().search().scrollCurrent() shouldBe 0) + def createTable(indexName: String, options: FlintOptions): Table = { + OpenSearchCluster.apply(indexName, options).head } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index ec9901b73..422cfc947 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -20,7 +20,7 @@ import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentType} import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} import org.opensearch.flint.core.metrics.MetricConstants -import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchScrollReader, OpenSearchUpdater} +import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchUpdater} import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder} import org.opensearch.plugins.SearchPlugin import org.opensearch.search.SearchModule @@ -137,23 +137,6 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { } } - def createScrollReader(indexName: String, query: String, sort: String): FlintReader = try { - var queryBuilder: QueryBuilder = new MatchAllQueryBuilder - if (!Strings.isNullOrEmpty(query)) { - val parser = - XContentType.JSON.xContent.createParser(xContentRegistry, IGNORE_DEPRECATIONS, query) - queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser) - } - new OpenSearchScrollReader( - flintClient.createClient(), - indexName, - new SearchSourceBuilder().query(queryBuilder).sort(sort, SortOrder.ASC), - flintOptions) - } catch { - case e: IOException => - throw new RuntimeException(e) - } - def doesIndexExist(indexName: String): Boolean = { using(flintClient.createClient()) { client => try {