-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* Adding PIT for pagination queries in new SQL engine code paths * Fix legacy code using scroll API instead of PIT for batch physical operator * Fix local debugger issue * Refactor integ-tests data for PIT and fix unit tests * Address feedback comments * Adding test coverage --------- Signed-off-by: Manasvini B S <[email protected]> Signed-off-by: Simeon Widdis <[email protected]> Co-authored-by: Manasvini B Suryanarayana <[email protected]> (cherry picked from commit f6ca54c) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
a214ab7
commit 4aec575
Showing
33 changed files
with
1,470 additions
and
302 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
144 changes: 144 additions & 0 deletions
144
legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/Paginate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package org.opensearch.sql.legacy.query.planner.physical.node; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Objects; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.common.Strings; | ||
import org.opensearch.index.query.BoolQueryBuilder; | ||
import org.opensearch.index.query.QueryBuilder; | ||
import org.opensearch.search.SearchHit; | ||
import org.opensearch.sql.legacy.domain.Where; | ||
import org.opensearch.sql.legacy.exception.SqlParseException; | ||
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; | ||
import org.opensearch.sql.legacy.query.maker.QueryMaker; | ||
import org.opensearch.sql.legacy.query.planner.core.ExecuteParams; | ||
import org.opensearch.sql.legacy.query.planner.core.PlanNode; | ||
import org.opensearch.sql.legacy.query.planner.physical.Row; | ||
import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; | ||
import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; | ||
|
||
public abstract class Paginate extends BatchPhysicalOperator<SearchHit> { | ||
|
||
/** Request to submit to OpenSearch to scan over */ | ||
protected final TableInJoinRequestBuilder request; | ||
|
||
protected final int pageSize; | ||
|
||
protected Client client; | ||
|
||
protected SearchResponse searchResponse; | ||
|
||
protected Integer timeout; | ||
|
||
protected ResourceManager resourceMgr; | ||
|
||
public Paginate(TableInJoinRequestBuilder request, int pageSize) { | ||
this.request = request; | ||
this.pageSize = pageSize; | ||
} | ||
|
||
@Override | ||
public PlanNode[] children() { | ||
return new PlanNode[0]; | ||
} | ||
|
||
@Override | ||
public Cost estimate() { | ||
return new Cost(); | ||
} | ||
|
||
@Override | ||
public void open(ExecuteParams params) throws Exception { | ||
super.open(params); | ||
client = params.get(ExecuteParams.ExecuteParamType.CLIENT); | ||
timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); | ||
resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); | ||
|
||
Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); | ||
if (filter instanceof BoolQueryBuilder) { | ||
request | ||
.getRequestBuilder() | ||
.setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); | ||
|
||
if (LOG.isDebugEnabled()) { | ||
LOG.debug( | ||
"Received extra query filter, re-build query: {}", | ||
Strings.toString( | ||
XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected Collection<Row<SearchHit>> prefetch() { | ||
Objects.requireNonNull(client, "Client connection is not ready"); | ||
Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); | ||
Objects.requireNonNull(timeout, "Time out is not set"); | ||
|
||
if (searchResponse == null) { | ||
loadFirstBatch(); | ||
updateMetaResult(); | ||
} else { | ||
loadNextBatch(); | ||
} | ||
return wrapRowForCurrentBatch(); | ||
} | ||
|
||
protected abstract void loadFirstBatch(); | ||
|
||
protected abstract void loadNextBatch(); | ||
|
||
/** | ||
* Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because | ||
* OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. | ||
*/ | ||
protected QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) | ||
throws SqlParseException { | ||
Where where = request.getOriginalSelect().getWhere(); | ||
BoolQueryBuilder newQuery; | ||
if (where != null) { | ||
newQuery = QueryMaker.explain(where, false); | ||
newQuery.must(filter); | ||
} else { | ||
newQuery = filter; | ||
} | ||
return newQuery; | ||
} | ||
|
||
protected void updateMetaResult() { | ||
resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards()); | ||
resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards()); | ||
resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards()); | ||
resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected Collection<Row<SearchHit>> wrapRowForCurrentBatch() { | ||
SearchHit[] hits = searchResponse.getHits().getHits(); | ||
Row[] rows = new Row[hits.length]; | ||
for (int i = 0; i < hits.length; i++) { | ||
rows[i] = new SearchHitRow(hits[i], request.getAlias()); | ||
} | ||
return Arrays.asList(rows); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return getClass().getSimpleName() + " [ " + describeTable() + ", pageSize=" + pageSize + " ]"; | ||
} | ||
|
||
protected String describeTable() { | ||
return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); | ||
} | ||
|
||
/********************************************* | ||
* Getters for Explain | ||
*********************************************/ | ||
|
||
public String getRequest() { | ||
return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.