Skip to content

Commit

Permalink
Add missing refactoring of Scroll to PIT API calls for Joins and Pagi…
Browse files Browse the repository at this point in the history
…nation query (opensearch-project#2981)

* Adding PIT for pagination queries in new SQL engine code paths

Signed-off-by: Manasvini B S <[email protected]>

* Fix legacy code using scroll API instead of PIT for batch physical operator

Signed-off-by: Manasvini B S <[email protected]>

* Fix local debugger issue

Signed-off-by: Manasvini B S <[email protected]>

* Refactor integ-tests data for PIT and fix unit tests

Signed-off-by: Manasvini B S <[email protected]>

* Address feedback comments

Signed-off-by: Manasvini B S <[email protected]>

* Adding test coverage

Signed-off-by: Manasvini B S <[email protected]>

---------

Signed-off-by: Manasvini B S <[email protected]>
Signed-off-by: Simeon Widdis <[email protected]>
  • Loading branch information
manasvinibs authored and Swiddis committed Oct 23, 2024
1 parent 6274b88 commit f702281
Show file tree
Hide file tree
Showing 33 changed files with 1,470 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void validTotalResultWithAndWithoutPaginationOrderBy() throws IOException
String selectQuery =
StringUtils.format(
"SELECT firstname, state FROM %s ORDER BY balance DESC ", TEST_INDEX_ACCOUNT);
verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 26, false);
verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 25, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ public void onFailure(Exception e) {

private Settings defaultSettings() {
return new Settings() {
private final Map<Key, Integer> defaultSettings =
new ImmutableMap.Builder<Key, Integer>().put(Key.QUERY_SIZE_LIMIT, 200).build();
private final Map<Key, Object> defaultSettings =
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.build();

@Override
public <T> T getSettingValue(Key key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,30 @@ public class PaginationFilterIT extends SQLIntegTestCase {
*/
private static final Map<String, Integer> STATEMENT_TO_NUM_OF_PAGES =
Map.of(
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT, 1000,
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT,
1000,
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " WHERE match(address, 'street')",
385,
385,
"SELECT * FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(city, 'Ola')",
1,
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(city, 'Ola')",
1,
"SELECT firstname, lastname, highlight(address) FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT firstname, lastname, highlight('*') FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true", 60,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10", 1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15", 1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BANK, 7);
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true",
60,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10",
1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15",
1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BANK,
7);

private final String sqlStatement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ private Settings defaultSettings() {
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ private void populateDefaultCursor(DefaultCursor cursor) {
Integer limit = cursor.getLimit();
long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit());
if (rowsLeft <= 0) {
// close the cursor
// Delete Point In Time ID
if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.sql.legacy.query.planner.logical.node;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Map;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.logical.LogicalOperator;
import org.opensearch.sql.legacy.query.planner.physical.PhysicalOperator;
import org.opensearch.sql.legacy.query.planner.physical.node.pointInTime.PointInTime;
import org.opensearch.sql.legacy.query.planner.physical.node.scroll.Scroll;

/** Table scan */
Expand All @@ -33,6 +37,9 @@ public PlanNode[] children() {

@Override
public <T> PhysicalOperator[] toPhysical(Map<LogicalOperator, PhysicalOperator<T>> optimalOps) {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
return new PhysicalOperator[] {new PointInTime(request, pageSize)};
}
return new PhysicalOperator[] {new Scroll(request, pageSize)};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.opensearch.sql.legacy.query.planner.physical.node;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.domain.Where;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.maker.QueryMaker;
import org.opensearch.sql.legacy.query.planner.core.ExecuteParams;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.physical.Row;
import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost;
import org.opensearch.sql.legacy.query.planner.resource.ResourceManager;

public abstract class Paginate extends BatchPhysicalOperator<SearchHit> {

/** Request to submit to OpenSearch to scan over */
protected final TableInJoinRequestBuilder request;

protected final int pageSize;

protected Client client;

protected SearchResponse searchResponse;

protected Integer timeout;

protected ResourceManager resourceMgr;

public Paginate(TableInJoinRequestBuilder request, int pageSize) {
this.request = request;
this.pageSize = pageSize;
}

@Override
public PlanNode[] children() {
return new PlanNode[0];
}

@Override
public Cost estimate() {
return new Cost();
}

@Override
public void open(ExecuteParams params) throws Exception {
super.open(params);
client = params.get(ExecuteParams.ExecuteParamType.CLIENT);
timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT);
resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER);

Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER);
if (filter instanceof BoolQueryBuilder) {
request
.getRequestBuilder()
.setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter));

if (LOG.isDebugEnabled()) {
LOG.debug(
"Received extra query filter, re-build query: {}",
Strings.toString(
XContentType.JSON, request.getRequestBuilder().request().source(), true, true));
}
}
}

@Override
protected Collection<Row<SearchHit>> prefetch() {
Objects.requireNonNull(client, "Client connection is not ready");
Objects.requireNonNull(resourceMgr, "ResourceManager is not set");
Objects.requireNonNull(timeout, "Time out is not set");

if (searchResponse == null) {
loadFirstBatch();
updateMetaResult();
} else {
loadNextBatch();
}
return wrapRowForCurrentBatch();
}

protected abstract void loadFirstBatch();

protected abstract void loadNextBatch();

/**
* Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because
* OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added.
*/
protected QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter)
throws SqlParseException {
Where where = request.getOriginalSelect().getWhere();
BoolQueryBuilder newQuery;
if (where != null) {
newQuery = QueryMaker.explain(where, false);
newQuery.must(filter);
} else {
newQuery = filter;
}
return newQuery;
}

protected void updateMetaResult() {
resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards());
resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards());
resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards());
resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut());
}

@SuppressWarnings("unchecked")
protected Collection<Row<SearchHit>> wrapRowForCurrentBatch() {
SearchHit[] hits = searchResponse.getHits().getHits();
Row[] rows = new Row[hits.length];
for (int i = 0; i < hits.length; i++) {
rows[i] = new SearchHitRow(hits[i], request.getAlias());
}
return Arrays.asList(rows);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [ " + describeTable() + ", pageSize=" + pageSize + " ]";
}

protected String describeTable() {
return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias();
}

/*********************************************
* Getters for Explain
*********************************************/

public String getRequest() {
return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy.query.planner.physical.node.scroll;
package org.opensearch.sql.legacy.query.planner.physical.node;

import com.google.common.base.Strings;
import java.util.HashMap;
Expand Down Expand Up @@ -32,7 +32,7 @@
* retain() in Project | {"firstName": "Allen", "age": 30 } | "" | retain("e.name.first", "e.age")
* ----------------------------------------------------------------------------------------------------------------------
*/
class SearchHitRow implements Row<SearchHit> {
public class SearchHitRow implements Row<SearchHit> {

/** Native OpenSearch data object for each row */
private final SearchHit hit;
Expand All @@ -43,7 +43,7 @@ class SearchHitRow implements Row<SearchHit> {
/** Table alias owned the row. Empty if this row comes from combination of two other rows */
private final String tableAlias;

SearchHitRow(SearchHit hit, String tableAlias) {
public SearchHitRow(SearchHit hit, String tableAlias) {
this.hit = hit;
this.source = hit.getSourceAsMap();
this.tableAlias = tableAlias;
Expand Down
Loading

0 comments on commit f702281

Please sign in to comment.