diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json index 4654cd085f..8035822357 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json index e645e7193b..3e92a17b97 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json index cf3ad5e2f4..0a0b58f17d 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json @@ -16,7 +16,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" + "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json index 6503e51996..bd7310810e 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json @@ -31,7 +31,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json index 3777c6319b..e2630e24f9 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/Paginate.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/Paginate.java new file mode 100644 index 0000000000..d9bf12d916 --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/Paginate.java @@ -0,0 +1,149 @@ +package org.opensearch.sql.legacy.query.planner.physical.node; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.Strings; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.sql.legacy.domain.Where; +import org.opensearch.sql.legacy.exception.SqlParseException; +import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; +import org.opensearch.sql.legacy.query.maker.QueryMaker; +import org.opensearch.sql.legacy.query.planner.core.ExecuteParams; +import org.opensearch.sql.legacy.query.planner.core.PlanNode; +import org.opensearch.sql.legacy.query.planner.physical.Row; +import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; +import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; + +public abstract class Paginate extends BatchPhysicalOperator { + + /** Request to submit to OpenSearch to scroll over */ + protected final TableInJoinRequestBuilder request; + + /** Page size to scroll over index */ + protected final int pageSize; + + /** Client connection to ElasticSearch */ + protected Client client; + + /** Currently undergoing scan */ + protected SearchResponse searchResponse; + + /** Time out */ + protected Integer timeout; + + /** Resource monitor manager */ + protected ResourceManager resourceMgr; + + public Paginate(TableInJoinRequestBuilder request, int pageSize) { + this.request = request; + this.pageSize = pageSize; + } + + @Override + public PlanNode[] children() { + return new PlanNode[0]; + } + + @Override + public Cost estimate() { + return new Cost(); + } + + @Override + public void open(ExecuteParams params) throws Exception { + super.open(params); + client = params.get(ExecuteParams.ExecuteParamType.CLIENT); + timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); + resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); + + Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); + if (filter instanceof BoolQueryBuilder) { + request + .getRequestBuilder() + .setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Received extra query filter, re-build query: {}", + Strings.toString( + XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); + } + } + } + + @Override + protected Collection> prefetch() { + Objects.requireNonNull(client, "Client connection is not ready"); + Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); + Objects.requireNonNull(timeout, "Time out is not set"); + + if (searchResponse == null) { + loadFirstBatch(); + updateMetaResult(); + } else { + loadNextBatch(); + } + return wrapRowForCurrentBatch(); + } + + protected abstract void loadFirstBatch(); + + protected abstract void loadNextBatch(); + + /** + * Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because + * OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. + */ + protected QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) + throws SqlParseException { + Where where = request.getOriginalSelect().getWhere(); + BoolQueryBuilder newQuery; + if (where != null) { + newQuery = QueryMaker.explain(where, false); + newQuery.must(filter); + } else { + newQuery = filter; + } + return newQuery; + } + + protected void updateMetaResult() { + resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards()); + resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards()); + resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards()); + resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut()); + } + + @SuppressWarnings("unchecked") + protected Collection> wrapRowForCurrentBatch() { + SearchHit[] hits = searchResponse.getHits().getHits(); + Row[] rows = new Row[hits.length]; + for (int i = 0; i < hits.length; i++) { + rows[i] = new SearchHitRow(hits[i], request.getAlias()); + } + return Arrays.asList(rows); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [ " + describeTable() + ", pageSize=" + pageSize + " ]"; + } + + protected String describeTable() { + return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); + } + + /********************************************* + * Getters for Explain + *********************************************/ + + public String getRequest() { + return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java index efc8fd9ddd..d91df4b273 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java @@ -1,102 +1,29 @@ package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime; -import java.util.Arrays; -import java.util.Collection; -import java.util.Objects; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.common.Strings; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.search.SearchHit; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.SortOrder; -import org.opensearch.sql.legacy.domain.Where; -import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; -import org.opensearch.sql.legacy.query.maker.QueryMaker; -import org.opensearch.sql.legacy.query.planner.core.ExecuteParams; -import org.opensearch.sql.legacy.query.planner.core.PlanNode; -import org.opensearch.sql.legacy.query.planner.physical.Row; -import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; -import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; -import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; -import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; +import org.opensearch.sql.legacy.query.planner.physical.node.Paginate; /** OpenSearch Search API with Point in time as physical implementation of TableScan */ -public class PointInTime extends BatchPhysicalOperator { - - /** Request to submit to OpenSearch to scroll over */ - private final TableInJoinRequestBuilder request; - - /** Page size to scroll over index */ - private final int pageSize; - - /** Client connection to ElasticSearch */ - private Client client; - - /** Currently undergoing search request */ - private SearchResponse searchResponse; - - /** Time out */ - private Integer timeout; +public class PointInTime extends Paginate { private String pitId; - private PointInTimeHandlerImpl pit; - /** Resource monitor manager */ - private ResourceManager resourceMgr; - public PointInTime(TableInJoinRequestBuilder request, int pageSize) { - this.request = request; - this.pageSize = pageSize; - } - - @Override - public PlanNode[] children() { - return new PlanNode[0]; - } - - @Override - public Cost estimate() { - return new Cost(); - } - - @Override - public void open(ExecuteParams params) throws Exception { - super.open(params); - client = params.get(ExecuteParams.ExecuteParamType.CLIENT); - timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); - resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); - - Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); - if (filter instanceof BoolQueryBuilder) { - request - .getRequestBuilder() - .setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Received extra query filter, re-build query: {}", - Strings.toString( - XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); - } - } + super(request, pageSize); } @Override public void close() { if (searchResponse != null) { LOG.debug("Closing Point In Time (PIT) context"); - // Delete the Point In Time context pit.delete(); - searchResponse = null; } else { LOG.debug("PIT context is already closed or was never opened"); @@ -104,38 +31,7 @@ public void close() { } @Override - protected Collection> prefetch() { - Objects.requireNonNull(client, "Client connection is not ready"); - Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); - Objects.requireNonNull(timeout, "Time out is not set"); - - if (searchResponse == null) { - loadFirstBatch(); - updateMetaResult(); - } else { - loadNextBatchByPitId(); - } - return wrapRowForCurrentBatch(); - } - - /** - * Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because - * OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. - */ - private QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) - throws SqlParseException { - Where where = request.getOriginalSelect().getWhere(); - BoolQueryBuilder newQuery; - if (where != null) { - newQuery = QueryMaker.explain(where, false); - newQuery.must(filter); - } else { - newQuery = filter; - } - return newQuery; - } - - private void loadFirstBatch() { + protected void loadFirstBatch() { // Create PIT and set to request object pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr()); pit.create(); @@ -151,14 +47,8 @@ private void loadFirstBatch() { LOG.info("Loading first batch of response using Point In Time"); } - private void updateMetaResult() { - resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards()); - resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards()); - resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards()); - resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut()); - } - - private void loadNextBatchByPitId() { + @Override + protected void loadNextBatch() { // Add PIT with search after to fetch next batch of data if (searchResponse.getHits().getHits() != null && searchResponse.getHits().getHits().length > 0) { @@ -178,31 +68,4 @@ private void loadNextBatchByPitId() { .get(); } } - - @SuppressWarnings("unchecked") - private Collection> wrapRowForCurrentBatch() { - SearchHit[] hits = searchResponse.getHits().getHits(); - Row[] rows = new Row[hits.length]; - for (int i = 0; i < hits.length; i++) { - rows[i] = new SearchHitRow(hits[i], request.getAlias()); - } - return Arrays.asList(rows); - } - - @Override - public String toString() { - return "PointInTime [ " + describeTable() + ", pageSize=" + pageSize + " ]"; - } - - private String describeTable() { - return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); - } - - /********************************************* - * Getters for Explain - *********************************************/ - - public String getRequest() { - return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); - } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java index 145fc277a8..5019e9cde8 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java @@ -5,139 +5,38 @@ package org.opensearch.sql.legacy.query.planner.physical.node.scroll; -import java.util.Arrays; -import java.util.Collection; -import java.util.Objects; import org.opensearch.action.search.ClearScrollResponse; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.common.Strings; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.search.SearchHit; import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.SortOrder; -import org.opensearch.sql.legacy.domain.Where; -import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; -import org.opensearch.sql.legacy.query.maker.QueryMaker; -import org.opensearch.sql.legacy.query.planner.core.ExecuteParams; -import org.opensearch.sql.legacy.query.planner.core.PlanNode; -import org.opensearch.sql.legacy.query.planner.physical.Row; -import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; -import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; -import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; -import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; +import org.opensearch.sql.legacy.query.planner.physical.node.Paginate; /** OpenSearch Scroll API as physical implementation of TableScan */ -public class Scroll extends BatchPhysicalOperator { - - /** Request to submit to OpenSearch to scroll over */ - private final TableInJoinRequestBuilder request; - - /** Page size to scroll over index */ - private final int pageSize; - - /** Client connection to ElasticSearch */ - private Client client; - - /** Currently undergoing Scroll */ - private SearchResponse scrollResponse; - - /** Time out */ - private Integer timeout; - - /** Resource monitor manager */ - private ResourceManager resourceMgr; +public class Scroll extends Paginate { public Scroll(TableInJoinRequestBuilder request, int pageSize) { - this.request = request; - this.pageSize = pageSize; - } - - @Override - public PlanNode[] children() { - return new PlanNode[0]; - } - - @Override - public Cost estimate() { - return new Cost(); - } - - @Override - public void open(ExecuteParams params) throws Exception { - super.open(params); - client = params.get(ExecuteParams.ExecuteParamType.CLIENT); - timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); - resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); - - Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); - if (filter instanceof BoolQueryBuilder) { - request - .getRequestBuilder() - .setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Received extra query filter, re-build query: {}", - Strings.toString( - XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); - } - } + super(request, pageSize); } @Override public void close() { - if (scrollResponse != null) { + if (searchResponse != null) { LOG.debug("Closing all scroll resources"); ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(scrollResponse.getScrollId()).get(); + client.prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); if (!clearScrollResponse.isSucceeded()) { LOG.warn("Failed to close scroll: {}", clearScrollResponse.status()); } - scrollResponse = null; + searchResponse = null; } else { LOG.debug("Scroll already be closed"); } } @Override - protected Collection> prefetch() { - Objects.requireNonNull(client, "Client connection is not ready"); - Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); - Objects.requireNonNull(timeout, "Time out is not set"); - - if (scrollResponse == null) { - loadFirstBatch(); - updateMetaResult(); - } else { - loadNextBatchByScrollId(); - } - return wrapRowForCurrentBatch(); - } - - /** - * Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because - * OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. - */ - private QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) - throws SqlParseException { - Where where = request.getOriginalSelect().getWhere(); - BoolQueryBuilder newQuery; - if (where != null) { - newQuery = QueryMaker.explain(where, false); - newQuery.must(filter); - } else { - newQuery = filter; - } - return newQuery; - } - - private void loadFirstBatch() { - scrollResponse = + protected void loadFirstBatch() { + searchResponse = request .getRequestBuilder() .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) @@ -146,45 +45,12 @@ private void loadFirstBatch() { .get(); } - private void updateMetaResult() { - resourceMgr.getMetaResult().addTotalNumOfShards(scrollResponse.getTotalShards()); - resourceMgr.getMetaResult().addSuccessfulShards(scrollResponse.getSuccessfulShards()); - resourceMgr.getMetaResult().addFailedShards(scrollResponse.getFailedShards()); - resourceMgr.getMetaResult().updateTimeOut(scrollResponse.isTimedOut()); - } - - private void loadNextBatchByScrollId() { - scrollResponse = + @Override + protected void loadNextBatch() { + searchResponse = client - .prepareSearchScroll(scrollResponse.getScrollId()) + .prepareSearchScroll(searchResponse.getScrollId()) .setScroll(TimeValue.timeValueSeconds(timeout)) .get(); } - - @SuppressWarnings("unchecked") - private Collection> wrapRowForCurrentBatch() { - SearchHit[] hits = scrollResponse.getHits().getHits(); - Row[] rows = new Row[hits.length]; - for (int i = 0; i < hits.length; i++) { - rows[i] = new SearchHitRow(hits[i], request.getAlias()); - } - return Arrays.asList(rows); - } - - @Override - public String toString() { - return "Scroll [ " + describeTable() + ", pageSize=" + pageSize + " ]"; - } - - private String describeTable() { - return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); - } - - /********************************************* - * Getters for Explain - *********************************************/ - - public String getRequest() { - return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index c7b5589ced..071ee2fd45 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -180,7 +180,11 @@ public void cleanup(OpenSearchRequest request) { } }); } else if (request instanceof OpenSearchQueryRequest) { - request.clean(pitId -> {}); + request.clean( + pitId -> { + DeletePitRequest deletePitRequest = new DeletePitRequest(pitId); + deletePit(deletePitRequest); + }); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 1e2d8b4107..e6ccf283a5 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -191,7 +191,11 @@ public void cleanup(OpenSearchRequest request) { } }); } else if (request instanceof OpenSearchQueryRequest) { - request.clean(pitId -> {}); + request.clean( + pitId -> { + DeletePitRequest deletePitRequest = new DeletePitRequest(pitId); + deletePit(deletePitRequest); + }); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 04c5dad3e0..0bc71e6d46 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -29,7 +29,6 @@ import org.opensearch.search.SearchModule; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; @@ -66,8 +65,6 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { private String pitId; - private OpenSearchClient client; - private TimeValue cursorKeepAlive; private Object[] searchAfter; @@ -111,14 +108,13 @@ public OpenSearchQueryRequest( OpenSearchExprValueFactory factory, List includes, TimeValue cursorKeepAlive, - OpenSearchClient client) { + String pitId) { this.indexName = indexName; this.sourceBuilder = sourceBuilder; this.exprValueFactory = factory; this.includes = includes; this.cursorKeepAlive = cursorKeepAlive; - this.client = client; - this.pitId = createPIT(); + this.pitId = pitId; } @Override @@ -179,23 +175,17 @@ public OpenSearchResponse searchWithPIT(Function return openSearchResponse; } - public String createPIT() { - CreatePitRequest createPitRequest = - new CreatePitRequest(this.cursorKeepAlive, false, indexName.getIndexNames()); - return this.client.createPit(createPitRequest); - } - - public void deletePit() { - DeletePitRequest deletePitRequest = new DeletePitRequest(this.pitId); - this.client.deletePit(deletePitRequest); - searchDone = true; - this.pitId = null; - } - @Override public void clean(Consumer cleanAction) { - if (needClean && this.pitId != null) { - deletePit(); + try { + // clean on the last page only, to prevent deleting the PitId in the middle of paging. + if (needClean && this.pitId != null) { + cleanAction.accept(this.pitId); + searchDone = true; + this.pitId = null; + } + } finally { + this.pitId = null; } } @@ -255,8 +245,6 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th includes = in.readStringList(); indexName = new IndexName(in); - this.client = engine.getClient(); - int length = in.readVInt(); this.searchAfter = new Object[length]; for (int i = 0; i < length; i++) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 26a470d235..6fa9b17697 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -26,6 +26,7 @@ import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; import org.apache.lucene.search.join.ScoreMode; +import org.opensearch.action.search.CreatePitRequest; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.InnerHitBuilder; @@ -114,8 +115,9 @@ private OpenSearchRequest buildRequestWithPit( if (startFrom + size > maxResultWindow) { sourceBuilder.size(maxResultWindow - startFrom); // Search with PIT request + String pitId = createPit(indexName, cursorKeepAlive, client); return new OpenSearchQueryRequest( - indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId); } else { sourceBuilder.from(startFrom); sourceBuilder.size(requestedTotalSize); @@ -128,8 +130,9 @@ private OpenSearchRequest buildRequestWithPit( } sourceBuilder.size(pageSize); // Search with PIT request + String pitId = createPit(indexName, cursorKeepAlive, client); return new OpenSearchQueryRequest( - indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId); } } @@ -159,6 +162,14 @@ private OpenSearchRequest buildRequestWithScroll( } } + private String createPit( + OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) { + // Create PIT ID for request + CreatePitRequest createPitRequest = + new CreatePitRequest(cursorKeepAlive, false, indexName.getIndexNames()); + return client.createPit(createPitRequest); + } + boolean isBoolFilterQuery(QueryBuilder current) { return (current instanceof BoolQueryBuilder); }