From 8c6dc0c508cfa9d5f722ff56922de0ff818cb36a Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 10 Jul 2024 09:10:37 -0700 Subject: [PATCH] Add pit for multi query (#2753) * Add search after for join Signed-off-by: Rupal Mahajan * Enable search after by default Signed-off-by: Rupal Mahajan * Add pit Signed-off-by: Rupal Mahajan * nit Signed-off-by: Rupal Mahajan * Fix tests Signed-off-by: Rupal Mahajan * ignore joinWithGeoIntersectNL Signed-off-by: Rupal Mahajan * Rerun CI with scroll Signed-off-by: Rupal Mahajan * draft Signed-off-by: Rupal Mahajan * Remove unused code and retrigger CI with search_after true Signed-off-by: Rupal Mahajan * Address comments Signed-off-by: Rupal Mahajan * Remove unused code change Signed-off-by: Rupal Mahajan * Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE Signed-off-by: Rupal Mahajan * Fix scroll condition Signed-off-by: Rupal Mahajan * nit Signed-off-by: Rupal Mahajan * Add pit before query execution Signed-off-by: Rupal Mahajan * Refactor get response with pit method Signed-off-by: Rupal Mahajan * Update remaining scroll search calls Signed-off-by: Rupal Mahajan * Fix integ test failures Signed-off-by: Rupal Mahajan * nit Signed-off-by: Rupal Mahajan * Move pit from join request builder to executor Signed-off-by: Rupal Mahajan * Remove unused methods Signed-off-by: Rupal Mahajan * Move pit from request to executor Signed-off-by: Rupal Mahajan * Fix pit.delete call missed while merge Signed-off-by: Rupal Mahajan * Move getResponseWithHits method to util class Signed-off-by: Rupal Mahajan * add try catch for create delete pit in minus executor Signed-off-by: Rupal Mahajan * move all common fields to ElasticHitsExecutor Signed-off-by: Rupal Mahajan * add javadoc for ElasticHitsExecutor Signed-off-by: Rupal Mahajan * Add missing javadoc Signed-off-by: Rupal Mahajan * Forcing an empty commit as last commit is stuck processing updates Signed-off-by: Rupal Mahajan --------- Signed-off-by: Rupal Mahajan --- .../legacy/executor/ElasticHitsExecutor.java | 91 ++++++++- .../executor/join/ElasticJoinExecutor.java | 51 +---- .../legacy/executor/multi/MinusExecutor.java | 181 ++++++++++-------- .../legacy/executor/multi/UnionExecutor.java | 3 +- 4 files changed, 199 insertions(+), 127 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java index 62a6d63ef7..2b80575e1e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java @@ -5,13 +5,96 @@ package org.opensearch.sql.legacy.executor; +import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; +import static org.opensearch.search.sort.SortOrder.ASC; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.sql.legacy.domain.Select; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; + +/** Executor for search requests with pagination. */ +public abstract class ElasticHitsExecutor { + protected static final Logger LOG = LogManager.getLogger(); + protected PointInTimeHandler pit; + protected Client client; + + /** + * Executes search request + * + * @throws IOException If an input or output exception occurred + * @throws SqlParseException If parsing exception occurred + */ + protected abstract void run() throws IOException, SqlParseException; + + /** + * Get search hits after execution + * + * @return Search hits + */ + protected abstract SearchHits getHits(); + + /** + * Get response for search request with pit/scroll + * + * @param request search request + * @param select sql select + * @param size fetch size + * @param previousResponse response for previous request + * @param pit point in time + * @return search response for subsequent request + */ + public SearchResponse getResponseWithHits( + SearchRequestBuilder request, + Select select, + int size, + SearchResponse previousResponse, + PointInTimeHandler pit) { + // Set Size + request.setSize(size); + SearchResponse responseWithHits; -/** Created by Eliran on 21/8/2016. */ -public interface ElasticHitsExecutor { - void run() throws IOException, SqlParseException; + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + // Set sort field for search_after + boolean ordered = select.isOrderdSelect(); + if (!ordered) { + request.addSort(DOC_FIELD_NAME, ASC); + } + // Set PIT + request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + // from and size is alternate method to paginate result. + // If select has from clause, search after is not required. + if (previousResponse != null && select.getFrom().isEmpty()) { + request.searchAfter(previousResponse.getHits().getSortFields()); + } + responseWithHits = request.get(); + } else { + // Set scroll + TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE); + if (previousResponse != null) { + responseWithHits = + client + .prepareSearchScroll(previousResponse.getScrollId()) + .setScroll(keepAlive) + .execute() + .actionGet(); + } else { + request.setScroll(keepAlive); + responseWithHits = request.get(); + } + } - SearchHits getHits(); + return responseWithHits; + } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java index 05c7af2bda..061868c9b5 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java @@ -5,7 +5,6 @@ package org.opensearch.sql.legacy.executor.join; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.io.IOException; @@ -16,15 +15,11 @@ import java.util.Map; import java.util.Set; import java.util.stream.Stream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; -import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.MapperService; @@ -32,14 +27,10 @@ import org.opensearch.rest.RestChannel; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.builder.PointInTimeBuilder; -import org.opensearch.search.sort.FieldSortBuilder; -import org.opensearch.search.sort.SortOrder; import org.opensearch.sql.legacy.domain.Field; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; -import org.opensearch.sql.legacy.pit.PointInTimeHandler; import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder; import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder; @@ -49,17 +40,14 @@ import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder; /** Created by Eliran on 15/9/2015. */ -public abstract class ElasticJoinExecutor implements ElasticHitsExecutor { - private static final Logger LOG = LogManager.getLogger(); +public abstract class ElasticJoinExecutor extends ElasticHitsExecutor { protected List results; // Keep list to avoid copy to new array in SearchHits protected MetaSearchResult metaResults; protected final int MAX_RESULTS_ON_ONE_FETCH = 10000; private Set aliasesOnReturn; private boolean allFieldsReturn; - protected Client client; protected String[] indices; - protected PointInTimeHandler pit; protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) { metaResults = new MetaSearchResult(); @@ -283,38 +271,13 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) { public SearchResponse getResponseWithHits( TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) { - // Set Size - SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size); - SearchResponse responseWithHits; - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - // Set sort field for search_after - boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect(); - if (!ordered) { - request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); - } - // Set PIT - request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - if (previousResponse != null) { - request.searchAfter(previousResponse.getHits().getSortFields()); - } - responseWithHits = request.get(); - } else { - // Set scroll - TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE); - if (previousResponse != null) { - responseWithHits = - client - .prepareSearchScroll(previousResponse.getScrollId()) - .setScroll(keepAlive) - .execute() - .actionGet(); - } else { - request.setScroll(keepAlive); - responseWithHits = request.get(); - } - } - return responseWithHits; + return getResponseWithHits( + tableRequest.getRequestBuilder(), + tableRequest.getOriginalSelect(), + size, + previousResponse, + pit); } public String[] getIndices(JoinRequestBuilder joinRequestBuilder) { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java index 03e16424e7..f58b25e821 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor.multi; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -18,7 +20,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.ArrayUtils; import org.opensearch.index.mapper.MapperService; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -28,16 +30,16 @@ import org.opensearch.sql.legacy.domain.Where; import org.opensearch.sql.legacy.domain.hints.Hint; import org.opensearch.sql.legacy.domain.hints.HintType; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; -import org.opensearch.sql.legacy.executor.join.ElasticUtils; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.DefaultQueryAction; import org.opensearch.sql.legacy.query.multi.MultiQueryRequestBuilder; import org.opensearch.sql.legacy.utils.Util; /** Created by Eliran on 26/8/2016. */ -public class MinusExecutor implements ElasticHitsExecutor { - private Client client; +public class MinusExecutor extends ElasticHitsExecutor { private MultiQueryRequestBuilder builder; private SearchHits minusHits; private boolean useTermsOptimization; @@ -63,45 +65,63 @@ public MinusExecutor(Client client, MultiQueryRequestBuilder builder) { @Override public void run() throws SqlParseException { - if (this.useTermsOptimization && this.fieldsOrderFirstTable.length != 1) { - throw new SqlParseException( - "Terms optimization failed: terms optimization for minus execution is supported with one" - + " field"); - } - if (this.useTermsOptimization && !this.useScrolling) { - throw new SqlParseException( - "Terms optimization failed: using scrolling is required for terms optimization"); - } - if (!this.useScrolling || !this.useTermsOptimization) { - Set comperableHitResults; - if (!this.useScrolling) { - // 1. get results from first search , put in set - // 2. get reults from second search - // 2.1 for each result remove from set - comperableHitResults = simpleOneTimeQueryEach(); + try { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit = + new PointInTimeHandlerImpl( + client, + ArrayUtils.concat( + builder.getOriginalSelect(true).getIndexArr(), + builder.getOriginalSelect(false).getIndexArr())); + pit.create(); + } + + if (this.useTermsOptimization && this.fieldsOrderFirstTable.length != 1) { + throw new SqlParseException( + "Terms optimization failed: terms optimization for minus execution is supported with" + + " one field"); + } + if (this.useTermsOptimization && !this.useScrolling) { + throw new SqlParseException( + "Terms optimization failed: using scrolling is required for terms optimization"); + } + if (!this.useScrolling || !this.useTermsOptimization) { + Set comperableHitResults; + if (!this.useScrolling) { + // 1. get results from first search , put in set + // 2. get reults from second search + // 2.1 for each result remove from set + comperableHitResults = simpleOneTimeQueryEach(); + } else { + // if scrolling + // 1. get all results in scrolls (till some limit) . put on set + // 2. scroll on second table + // 3. on each scroll result remove items from set + comperableHitResults = runWithScrollings(); + } + fillMinusHitsFromResults(comperableHitResults); + return; } else { - // if scrolling - // 1. get all results in scrolls (till some limit) . put on set - // 2. scroll on second table - // 3. on each scroll result remove items from set - comperableHitResults = runWithScrollings(); + // if scrolling and optimization + // 0. save the original second table where , init set + // 1. on each scroll on first table , create miniSet + // 1.1 build where from all results (terms filter) , and run query + // 1.1.1 on each result remove from miniSet + // 1.1.2 add all results left from miniset to bigset + Select firstSelect = this.builder.getOriginalSelect(true); + MinusOneFieldAndOptimizationResult optimizationResult = + runWithScrollingAndAddFilter(fieldsOrderFirstTable[0], fieldsOrderSecondTable[0]); + String fieldName = getFieldName(firstSelect.getFields().get(0)); + Set results = optimizationResult.getFieldValues(); + SearchHit someHit = optimizationResult.getSomeHit(); + fillMinusHitsFromOneField(fieldName, results, someHit); + } + } catch (Exception e) { + LOG.error("Failed during multi query run.", e); + } finally { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit.delete(); } - fillMinusHitsFromResults(comperableHitResults); - return; - } else { - // if scrolling and optimization - // 0. save the original second table where , init set - // 1. on each scroll on first table , create miniSet - // 1.1 build where from all results (terms filter) , and run query - // 1.1.1 on each result remove from miniSet - // 1.1.2 add all results left from miniset to bigset - Select firstSelect = this.builder.getOriginalSelect(true); - MinusOneFieldAndOptimizationResult optimizationResult = - runWithScrollingAndAddFilter(fieldsOrderFirstTable[0], fieldsOrderSecondTable[0]); - String fieldName = getFieldName(firstSelect.getFields().get(0)); - Set results = optimizationResult.getFieldValues(); - SearchHit someHit = optimizationResult.getSomeHit(); - fillMinusHitsFromOneField(fieldName, results, someHit); } } @@ -187,11 +207,12 @@ private void fillMinusHitsFromResults(Set comperableHitResu private Set runWithScrollings() { SearchResponse scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, - this.builder.getFirstSearchRequest(), + getResponseWithHits( + builder.getFirstSearchRequest(), builder.getOriginalSelect(true), - this.maxDocsToFetchOnEachScrollShard); + maxDocsToFetchOnEachScrollShard, + null, + pit); Set results = new HashSet<>(); SearchHit[] hits = scrollResp.getHits().getHits(); @@ -199,7 +220,6 @@ private Set runWithScrollings() { return new HashSet<>(); } int totalDocsFetchedFromFirstTable = 0; - // fetch from first table . fill set. while (hits != null && hits.length != 0) { totalDocsFetchedFromFirstTable += hits.length; @@ -208,19 +228,21 @@ private Set runWithScrollings() { break; } scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getFirstSearchRequest(), + builder.getOriginalSelect(true), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, + getResponseWithHits( this.builder.getSecondSearchRequest(), builder.getOriginalSelect(false), - this.maxDocsToFetchOnEachScrollShard); + this.maxDocsToFetchOnEachScrollShard, + null, + pit); hits = scrollResp.getHits().getHits(); if (hits == null || hits.length == 0) { @@ -234,11 +256,12 @@ private Set runWithScrollings() { break; } scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getSecondSearchRequest(), + builder.getOriginalSelect(false), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } @@ -303,11 +326,12 @@ private boolean checkIfOnlyOneField(Select firstSelect, Select secondSelect) { private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( String firstFieldName, String secondFieldName) throws SqlParseException { SearchResponse scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, - this.builder.getFirstSearchRequest(), + getResponseWithHits( + builder.getFirstSearchRequest(), builder.getOriginalSelect(true), - this.maxDocsToFetchOnEachScrollShard); + maxDocsToFetchOnEachScrollShard, + null, + pit); Set results = new HashSet<>(); int currentNumOfResults = 0; SearchHit[] hits = scrollResp.getHits().getHits(); @@ -335,14 +359,16 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( break; } SearchResponse responseForSecondTable = - ElasticUtils.scrollOneTimeWithHits( - this.client, + getResponseWithHits( queryAction.getRequestBuilder(), secondQuerySelect, - this.maxDocsToFetchOnEachScrollShard); + this.maxDocsToFetchOnEachScrollShard, + null, + pit); SearchHits secondQuerySearchHits = responseForSecondTable.getHits(); SearchHit[] secondQueryHits = secondQuerySearchHits.getHits(); + while (secondQueryHits.length > 0) { totalDocsFetchedFromSecondTable += secondQueryHits.length; removeValuesFromSetAccordingToHits(secondFieldName, currentSetFromResults, secondQueryHits); @@ -350,11 +376,12 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( break; } responseForSecondTable = - client - .prepareSearchScroll(responseForSecondTable.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + queryAction.getRequestBuilder(), + secondQuerySelect, + maxDocsToFetchOnEachScrollShard, + responseForSecondTable, + pit); secondQueryHits = responseForSecondTable.getHits().getHits(); } results.addAll(currentSetFromResults); @@ -363,13 +390,13 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( "too many results for first table, stoping at:" + totalDocsFetchedFromFirstTable); break; } - scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getFirstSearchRequest(), + builder.getOriginalSelect(true), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } return new MinusOneFieldAndOptimizationResult(results, someHit); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java index 6b8b64c4e8..375c40a5c1 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java @@ -23,11 +23,10 @@ import org.opensearch.sql.legacy.utils.Util; /** Created by Eliran on 21/8/2016. */ -public class UnionExecutor implements ElasticHitsExecutor { +public class UnionExecutor extends ElasticHitsExecutor { private MultiQueryRequestBuilder multiQueryBuilder; private SearchHits results; - private Client client; private int currentId; public UnionExecutor(Client client, MultiQueryRequestBuilder builder) {