Skip to content

Commit

Permalink
Cleanup deprecated query options (apache#13040)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored May 1, 2024
1 parent 5d1dc73 commit 076cd40
Show file tree
Hide file tree
Showing 26 changed files with 277 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class PinotClientRequest {
@ManualAuthorization
public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
@ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
@ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
@Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
Expand All @@ -123,9 +122,6 @@ public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @Quer
if (traceEnabled != null) {
requestJson.put(Request.TRACE, traceEnabled);
}
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
BrokerResponse brokerResponse = executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true, httpHeaders);
asyncResponse.resume(getPinotQueryResponse(brokerResponse));
} catch (WebApplicationException wae) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,21 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));

try {
long compilationStartTimeNs;
// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}

// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
PinotQuery pinotQuery;
try {
// Parse the request
sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request);
// Compile the request into PinotQuery
compilationStartTimeNs = System.nanoTime();
pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,24 +84,22 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
requestContext.setBrokerId(_brokerId);

// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request);
sqlNodeAndOptions = RequestUtils.parseQuery(request.get(Request.SQL).asText(), request);
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL: {}, {}", request, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}
if (request.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
sqlNodeAndOptions.setExtraOptions(
RequestUtils.getOptionsFromJson(request, CommonConstants.Broker.Request.QUERY_OPTIONS));
}

if (_multiStageBrokerRequestHandler != null && Boolean.parseBoolean(
sqlNodeAndOptions.getOptions().get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
return _multiStageBrokerRequestHandler.handleRequest(request, requesterIdentity, requestContext, httpHeaders);
sqlNodeAndOptions.getOptions().get(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
return _multiStageBrokerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
requestContext, httpHeaders);
} else {
return _singleStageBrokerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
requestContext, httpHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,26 +109,25 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
HttpHeaders httpHeaders) {
LOGGER.debug("SQL query for request {}: {}", requestId, query);

long compilationStartTimeNs;
// Parse the query if needed
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
} catch (Exception e) {
// Do not log or emit metric here because it is pure user error
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}

// Compile the request
long compilationStartTimeNs = System.nanoTime();
long queryTimeoutMs;
QueryEnvironment.QueryPlannerResult queryPlanResult;
try {
// Parse the request
sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request);
} catch (RuntimeException e) {
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(e);
LOGGER.info("Caught exception parsing request {}: {}, {}", requestId, query, consolidatedMessage);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(
QueryException.getException(QueryException.SQL_PARSING_ERROR, consolidatedMessage));
}
try {
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
queryTimeoutMs = timeoutMsFromQueryOption == null ? _brokerTimeoutMs : timeoutMsFromQueryOption;
String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
// Compile the request
compilationStartTimeNs = System.nanoTime();
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(database, _tableCache), database), _workerManager,
_tableCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,40 @@
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


/**
* Tests the various options set in the broker request
*/
public class BrokerRequestOptionsTest {
// TODO: remove this legacy option size checker after 0.11 release cut.
private static final int LEGACY_PQL_QUERY_OPTION_SIZE = 2;

@Test
public void testSetOptions() {
long requestId = 1;
String query = "select * from testTable";

// None of the options
ObjectNode jsonRequest = JsonUtils.newObjectNode();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);;
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE);
assertTrue(sqlNodeAndOptions.getOptions().isEmpty());

// TRACE
// Has trace false
jsonRequest.put(Request.TRACE, false);
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE);
assertTrue(sqlNodeAndOptions.getOptions().isEmpty());

// Has trace true
jsonRequest.put(Request.TRACE, true);
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), "true");

// DEBUG_OPTIONS (debug options will also be included as query options)
// Has debugOptions
jsonRequest = JsonUtils.newObjectNode();
jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo");

// Has multiple debugOptions
jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo;debugOption2=bar");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo");
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption2"), "bar");

// Invalid debug options
jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
try {
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.fail();
} catch (Exception e) {
// Expected
}
assertEquals(sqlNodeAndOptions.getOptions().size(), 1);
assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), "true");

// QUERY_OPTIONS
jsonRequest = JsonUtils.newObjectNode();
Expand All @@ -97,41 +69,28 @@ public void testSetOptions() {
queryOptions.put("queryOption1", "foo");
sqlNodeAndOptions.getOptions().putAll(queryOptions);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");
assertEquals(sqlNodeAndOptions.getOptions().size(), 1);
assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");

// Has queryOptions in query
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET queryOption1='foo'; select * from testTable");
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");
assertEquals(sqlNodeAndOptions.getOptions().size(), 1);
assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");

// Has query options in json payload
jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=foo");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");
assertEquals(sqlNodeAndOptions.getOptions().size(), 1);
assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");

// Has query options in both json payload and sqlNodeAndOptions, sqlNodeAndOptions takes priority
jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=bar;queryOption2=moo");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET queryOption1='foo'; select * from testTable;");
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), "moo");

// Has all 3
jsonRequest = JsonUtils.newObjectNode();
jsonRequest.put(Request.TRACE, true);
jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo");
jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=bar;queryOption2=moo");
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 4 + LEGACY_PQL_QUERY_OPTION_SIZE);
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "bar");
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), "moo");
Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), "true");
Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo");
assertEquals(sqlNodeAndOptions.getOptions().size(), 2);
assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo");
assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), "moo");
}
}
9 changes: 5 additions & 4 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@
<id>generate-sources</id>
<phase>generate-sources</phase>
<configuration>
<tasks>
<target>
<delete dir="target/generated-sources/gen-javabean"/>
<mkdir dir="target/generated-sources"/>
<exec executable="/usr/local/bin/thrift">
Expand All @@ -387,10 +387,11 @@
<arg value="target/generated-sources"/>
<arg value="src/thrift/response.thrift"/>
</exec>
<copy todir="src/main/java/">
<move todir="src/main/java/">
<fileset dir="target/generated-sources/gen-javabean"/>
</copy>
</tasks>
</move>
<delete dir="target/generated-sources/gen-javabean"/>
</target>
</configuration>
<goals>
<goal>run</goal>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.apache.pinot.common.request;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-12-07")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2024-05-01")
public class BrokerRequest implements org.apache.thrift.TBase<BrokerRequest, BrokerRequest._Fields>, java.io.Serializable, Cloneable, Comparable<BrokerRequest> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BrokerRequest");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.apache.pinot.common.request;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-12-07")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2024-05-01")
public class DataSource implements org.apache.thrift.TBase<DataSource, DataSource._Fields>, java.io.Serializable, Cloneable, Comparable<DataSource> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("DataSource");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.apache.pinot.common.request;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-12-07")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2024-05-01")
public class Expression implements org.apache.thrift.TBase<Expression, Expression._Fields>, java.io.Serializable, Cloneable, Comparable<Expression> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Expression");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.apache.pinot.common.request;


@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-12-07")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2024-05-01")
public enum ExpressionType implements org.apache.thrift.TEnum {
LITERAL(0),
IDENTIFIER(1),
Expand Down
Loading

0 comments on commit 076cd40

Please sign in to comment.