diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 85f4bd137a..9e47f9b37e 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -67,22 +67,21 @@ public DataSourceMetadata( List allowedRoles, Map properties, String resultIndex) { - String errorMessage = validateCustomResultIndex(resultIndex); - this.name = name; - this.connector = connector; - this.description = description; - this.properties = properties; - this.allowedRoles = allowedRoles; - + String errorMessage = validateCustomResultIndex(resultIndex); if (errorMessage != null) { throw new IllegalArgumentException(errorMessage); } if (resultIndex == null) { - this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name); + this.resultIndex = fromNameToCustomResultIndex(); } else { this.resultIndex = resultIndex; } + + this.connector = connector; + this.description = description; + this.properties = properties; + this.allowedRoles = allowedRoles; } public DataSourceMetadata() { @@ -120,4 +119,34 @@ public String validateCustomResultIndex(String resultIndex) { } return null; } + + /** + * Since we are using datasource name to create result index, we need to make sure that the final + * name is valid + * + * @param resultIndex result index name + * @return valid result index name + */ + private String convertToValidResultIndex(String resultIndex) { + // Limit Length + if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { + resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE); + } + + // Pattern Matching: Remove characters that don't match the pattern + StringBuilder validChars = new StringBuilder(); + for (char c : resultIndex.toCharArray()) { + if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) { + validChars.append(c); + } + } + return validChars.toString(); + } + + public String fromNameToCustomResultIndex() { + if (name == null) { + throw new IllegalArgumentException("Datasource name cannot be null"); + } + return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase())); + } } diff --git a/spark/build.gradle b/spark/build.gradle index 6d3bb17c62..ed91b9820b 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -124,7 +124,7 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.execution.session.SessionModel', 'org.opensearch.sql.spark.execution.statement.StatementModel', // TODO: add tests for purging flint indices - 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener', + 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*', 'org.opensearch.sql.spark.cluster.FlintIndexRetention', 'org.opensearch.sql.spark.cluster.IndexCleanup' ] diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java index 75cad4fed9..562f12b69e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java @@ -5,14 +5,8 @@ package org.opensearch.sql.spark.cluster; -import java.util.Arrays; -import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.stats.CommonStats; -import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; -import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -21,7 +15,6 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.reindex.DeleteByQueryAction; import org.opensearch.index.reindex.DeleteByQueryRequest; -import org.opensearch.index.store.StoreStats; /** Clean up the old docs for indices. */ public class IndexCleanup { @@ -35,60 +28,6 @@ public IndexCleanup(Client client, ClusterService clusterService) { this.clusterService = clusterService; } - /** - * delete docs when shard size is bigger than max limitation. - * - * @param indexName index name - * @param maxShardSize max shard size - * @param queryForDeleteByQueryRequest query request - * @param listener action listener - */ - public void deleteDocsBasedOnShardSize( - String indexName, - long maxShardSize, - QueryBuilder queryForDeleteByQueryRequest, - ActionListener listener) { - - if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { - LOG.debug("skip as the index:{} doesn't exist", indexName); - return; - } - - ActionListener indicesStatsResponseListener = - ActionListener.wrap( - indicesStatsResponse -> { - // Check if any shard size is bigger than maxShardSize - boolean cleanupNeeded = - Arrays.stream(indicesStatsResponse.getShards()) - .map(ShardStats::getStats) - .filter(Objects::nonNull) - .map(CommonStats::getStore) - .filter(Objects::nonNull) - .map(StoreStats::getSizeInBytes) - .anyMatch(size -> size > maxShardSize); - - if (cleanupNeeded) { - deleteDocsByQuery( - indexName, - queryForDeleteByQueryRequest, - ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); - } else { - listener.onResponse(false); - } - }, - listener::onFailure); - - getCheckpointShardStoreStats(indexName, indicesStatsResponseListener); - } - - private void getCheckpointShardStoreStats( - String indexName, ActionListener listener) { - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - indicesStatsRequest.store(); - indicesStatsRequest.indices(indexName); - client.admin().indices().stats(indicesStatsRequest, listener); - } - /** * Delete docs based on query request * diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 5d106e0582..772f19571c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; @@ -34,8 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import joptsimple.internal.Strings; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -109,10 +108,10 @@ public void setup() { pluginSettings = new OpenSearchSettings(clusterSettings); client = (NodeClient) cluster().client(); dataSourceService = createDataSourceService(); - dataSourceService.createDataSource( + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( DATASOURCE, - Strings.EMPTY, + StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), ImmutableMap.of( @@ -124,9 +123,10 @@ public void setup() { "http://localhost:9200", "glue.indexstore.opensearch.auth", "noauth"), - null)); + null); + dataSourceService.createDataSource(dataSourceMetadata); stateStore = new StateStore(client, clusterService); - createIndex(DEFAULT_RESULT_INDEX); + createIndex(dataSourceMetadata.fromNameToCustomResultIndex()); } @After @@ -331,7 +331,7 @@ public void datasourceWithBasicAuth() { dataSourceService.createDataSource( new DataSourceMetadata( "mybasicauth", - Strings.EMPTY, + StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), properties, @@ -372,7 +372,7 @@ public void withSessionCreateAsyncQueryFailed() { assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); - // 2. fetch async query result. not result write to SPARK_RESPONSE_BUFFER_INDEX_NAME yet. + // 2. fetch async query result. not result write to DEFAULT_RESULT_INDEX yet. // mock failed statement. StatementModel submitted = statementModel.get(); StatementModel mocked =