Skip to content

Commit

Permalink
Address feedback comments
Browse files Browse the repository at this point in the history
Signed-off-by: Manasvini B S <[email protected]>
  • Loading branch information
manasvinibs committed Sep 20, 2024
1 parent e70dc4f commit edbc586
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package org.opensearch.sql.legacy.query.planner.physical.node;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.domain.Where;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.maker.QueryMaker;
import org.opensearch.sql.legacy.query.planner.core.ExecuteParams;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.physical.Row;
import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost;
import org.opensearch.sql.legacy.query.planner.resource.ResourceManager;

public abstract class Paginate extends BatchPhysicalOperator<SearchHit> {

/** Request to submit to OpenSearch to scroll over */
protected final TableInJoinRequestBuilder request;

/** Page size to scroll over index */
protected final int pageSize;

/** Client connection to ElasticSearch */
protected Client client;

/** Currently undergoing scan */
protected SearchResponse searchResponse;

/** Time out */
protected Integer timeout;

/** Resource monitor manager */
protected ResourceManager resourceMgr;

public Paginate(TableInJoinRequestBuilder request, int pageSize) {
this.request = request;
this.pageSize = pageSize;
}

@Override
public PlanNode[] children() {
return new PlanNode[0];
}

@Override
public Cost estimate() {
return new Cost();
}

@Override
public void open(ExecuteParams params) throws Exception {
super.open(params);
client = params.get(ExecuteParams.ExecuteParamType.CLIENT);
timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT);
resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER);

Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER);
if (filter instanceof BoolQueryBuilder) {
request
.getRequestBuilder()
.setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter));

if (LOG.isDebugEnabled()) {
LOG.debug(
"Received extra query filter, re-build query: {}",
Strings.toString(
XContentType.JSON, request.getRequestBuilder().request().source(), true, true));
}
}
}

@Override
protected Collection<Row<SearchHit>> prefetch() {
Objects.requireNonNull(client, "Client connection is not ready");
Objects.requireNonNull(resourceMgr, "ResourceManager is not set");
Objects.requireNonNull(timeout, "Time out is not set");

if (searchResponse == null) {
loadFirstBatch();
updateMetaResult();
} else {
loadNextBatch();
}
return wrapRowForCurrentBatch();
}

protected abstract void loadFirstBatch();

protected abstract void loadNextBatch();

/**
* Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because
* OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added.
*/
protected QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter)
throws SqlParseException {
Where where = request.getOriginalSelect().getWhere();
BoolQueryBuilder newQuery;
if (where != null) {
newQuery = QueryMaker.explain(where, false);
newQuery.must(filter);
} else {
newQuery = filter;
}
return newQuery;
}

protected void updateMetaResult() {
resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards());
resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards());
resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards());
resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut());
}

@SuppressWarnings("unchecked")
protected Collection<Row<SearchHit>> wrapRowForCurrentBatch() {
SearchHit[] hits = searchResponse.getHits().getHits();
Row[] rows = new Row[hits.length];
for (int i = 0; i < hits.length; i++) {
rows[i] = new SearchHitRow(hits[i], request.getAlias());
}
return Arrays.asList(rows);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [ " + describeTable() + ", pageSize=" + pageSize + " ]";
}

protected String describeTable() {
return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias();
}

/*********************************************
* Getters for Explain
*********************************************/

public String getRequest() {
return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source());
}
}
Original file line number Diff line number Diff line change
@@ -1,141 +1,37 @@
package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Where;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.maker.QueryMaker;
import org.opensearch.sql.legacy.query.planner.core.ExecuteParams;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.physical.Row;
import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost;
import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator;
import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow;
import org.opensearch.sql.legacy.query.planner.resource.ResourceManager;
import org.opensearch.sql.legacy.query.planner.physical.node.Paginate;

/** OpenSearch Search API with Point in time as physical implementation of TableScan */
public class PointInTime extends BatchPhysicalOperator<SearchHit> {

/** Request to submit to OpenSearch to scroll over */
private final TableInJoinRequestBuilder request;

/** Page size to scroll over index */
private final int pageSize;

/** Client connection to ElasticSearch */
private Client client;

/** Currently undergoing search request */
private SearchResponse searchResponse;

/** Time out */
private Integer timeout;
public class PointInTime extends Paginate {

private String pitId;

private PointInTimeHandlerImpl pit;

/** Resource monitor manager */
private ResourceManager resourceMgr;

public PointInTime(TableInJoinRequestBuilder request, int pageSize) {
this.request = request;
this.pageSize = pageSize;
}

@Override
public PlanNode[] children() {
return new PlanNode[0];
}

@Override
public Cost estimate() {
return new Cost();
}

@Override
public void open(ExecuteParams params) throws Exception {
super.open(params);
client = params.get(ExecuteParams.ExecuteParamType.CLIENT);
timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT);
resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER);

Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER);
if (filter instanceof BoolQueryBuilder) {
request
.getRequestBuilder()
.setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter));

if (LOG.isDebugEnabled()) {
LOG.debug(
"Received extra query filter, re-build query: {}",
Strings.toString(
XContentType.JSON, request.getRequestBuilder().request().source(), true, true));
}
}
super(request, pageSize);
}

@Override
public void close() {
if (searchResponse != null) {
LOG.debug("Closing Point In Time (PIT) context");

// Delete the Point In Time context
pit.delete();

searchResponse = null;
} else {
LOG.debug("PIT context is already closed or was never opened");
}
}

@Override
protected Collection<Row<SearchHit>> prefetch() {
Objects.requireNonNull(client, "Client connection is not ready");
Objects.requireNonNull(resourceMgr, "ResourceManager is not set");
Objects.requireNonNull(timeout, "Time out is not set");

if (searchResponse == null) {
loadFirstBatch();
updateMetaResult();
} else {
loadNextBatchByPitId();
}
return wrapRowForCurrentBatch();
}

/**
* Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because
* OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added.
*/
private QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter)
throws SqlParseException {
Where where = request.getOriginalSelect().getWhere();
BoolQueryBuilder newQuery;
if (where != null) {
newQuery = QueryMaker.explain(where, false);
newQuery.must(filter);
} else {
newQuery = filter;
}
return newQuery;
}

private void loadFirstBatch() {
protected void loadFirstBatch() {
// Create PIT and set to request object
pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr());
pit.create();
Expand All @@ -151,14 +47,8 @@ private void loadFirstBatch() {
LOG.info("Loading first batch of response using Point In Time");
}

private void updateMetaResult() {
resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards());
resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards());
resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards());
resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut());
}

private void loadNextBatchByPitId() {
@Override
protected void loadNextBatch() {
// Add PIT with search after to fetch next batch of data
if (searchResponse.getHits().getHits() != null
&& searchResponse.getHits().getHits().length > 0) {
Expand All @@ -178,31 +68,4 @@ private void loadNextBatchByPitId() {
.get();
}
}

@SuppressWarnings("unchecked")
private Collection<Row<SearchHit>> wrapRowForCurrentBatch() {
SearchHit[] hits = searchResponse.getHits().getHits();
Row[] rows = new Row[hits.length];
for (int i = 0; i < hits.length; i++) {
rows[i] = new SearchHitRow(hits[i], request.getAlias());
}
return Arrays.asList(rows);
}

@Override
public String toString() {
return "PointInTime [ " + describeTable() + ", pageSize=" + pageSize + " ]";
}

private String describeTable() {
return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias();
}

/*********************************************
* Getters for Explain
*********************************************/

public String getRequest() {
return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source());
}
}
Loading

0 comments on commit edbc586

Please sign in to comment.