From af9217ab6a40c990323da465ccff1d0c1424f39c Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 25 Oct 2023 22:26:07 -0700 Subject: [PATCH] address comments Signed-off-by: Kaituo Li --- .../datasource/model/DataSourceMetadata.java | 35 ++++++++--- .../sql/spark/cluster/IndexCleanup.java | 61 ------------------- ...AsyncQueryExecutorServiceImplSpecTest.java | 6 +- 3 files changed, 30 insertions(+), 72 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 85f4bd137a..0fd0bd0db6 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -67,22 +67,25 @@ public DataSourceMetadata( List allowedRoles, Map properties, String resultIndex) { - String errorMessage = validateCustomResultIndex(resultIndex); - this.name = name; - this.connector = connector; - this.description = description; - this.properties = properties; - this.allowedRoles = allowedRoles; - + String errorMessage = validateCustomResultIndex(resultIndex); if (errorMessage != null) { throw new IllegalArgumentException(errorMessage); } if (resultIndex == null) { - this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name); + // since we are using datasource name to create result index, we need to make sure that the + // final + // name is valid + this.resultIndex = + convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase())); } else { this.resultIndex = resultIndex; } + + this.connector = connector; + this.description = description; + this.properties = properties; + this.allowedRoles = allowedRoles; } public DataSourceMetadata() { @@ -120,4 +123,20 @@ public String validateCustomResultIndex(String resultIndex) { } return null; } + + public String convertToValidResultIndex(String resultIndex) { + // Limit Length + if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { + resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE); + } + + // Pattern Matching: Remove characters that don't match the pattern + StringBuilder validChars = new StringBuilder(); + for (char c : resultIndex.toCharArray()) { + if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) { + validChars.append(c); + } + } + return validChars.toString(); + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java index 75cad4fed9..562f12b69e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java @@ -5,14 +5,8 @@ package org.opensearch.sql.spark.cluster; -import java.util.Arrays; -import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.stats.CommonStats; -import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; -import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -21,7 +15,6 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.reindex.DeleteByQueryAction; import org.opensearch.index.reindex.DeleteByQueryRequest; -import org.opensearch.index.store.StoreStats; /** Clean up the old docs for indices. */ public class IndexCleanup { @@ -35,60 +28,6 @@ public IndexCleanup(Client client, ClusterService clusterService) { this.clusterService = clusterService; } - /** - * delete docs when shard size is bigger than max limitation. - * - * @param indexName index name - * @param maxShardSize max shard size - * @param queryForDeleteByQueryRequest query request - * @param listener action listener - */ - public void deleteDocsBasedOnShardSize( - String indexName, - long maxShardSize, - QueryBuilder queryForDeleteByQueryRequest, - ActionListener listener) { - - if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { - LOG.debug("skip as the index:{} doesn't exist", indexName); - return; - } - - ActionListener indicesStatsResponseListener = - ActionListener.wrap( - indicesStatsResponse -> { - // Check if any shard size is bigger than maxShardSize - boolean cleanupNeeded = - Arrays.stream(indicesStatsResponse.getShards()) - .map(ShardStats::getStats) - .filter(Objects::nonNull) - .map(CommonStats::getStore) - .filter(Objects::nonNull) - .map(StoreStats::getSizeInBytes) - .anyMatch(size -> size > maxShardSize); - - if (cleanupNeeded) { - deleteDocsByQuery( - indexName, - queryForDeleteByQueryRequest, - ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); - } else { - listener.onResponse(false); - } - }, - listener::onFailure); - - getCheckpointShardStoreStats(indexName, indicesStatsResponseListener); - } - - private void getCheckpointShardStoreStats( - String indexName, ActionListener listener) { - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - indicesStatsRequest.store(); - indicesStatsRequest.indices(indexName); - client.admin().indices().stats(indicesStatsRequest, listener); - } - /** * Delete docs based on query request * diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 5d106e0582..be0b9558bb 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -34,8 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import joptsimple.internal.Strings; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -112,7 +112,7 @@ public void setup() { dataSourceService.createDataSource( new DataSourceMetadata( DATASOURCE, - Strings.EMPTY, + StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), ImmutableMap.of( @@ -331,7 +331,7 @@ public void datasourceWithBasicAuth() { dataSourceService.createDataSource( new DataSourceMetadata( "mybasicauth", - Strings.EMPTY, + StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), properties,