Skip to content

Commit

Permalink
[Multi-stage] Reduce PinotQuery copy (apache#14669)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Dec 20, 2024
1 parent 22c491f commit 07998f4
Showing 1 changed file with 62 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
Expand Down Expand Up @@ -98,15 +97,20 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
long queryArrivalTimeMs = System.currentTimeMillis();
ServerPlanRequestContext serverContext = new ServerPlanRequestContext(stagePlan, leafQueryExecutor, executorService,
executionContext.getPipelineBreakerResult());
// 1. compile the PinotQuery
// 1. Compile the PinotQuery
constructPinotQueryPlan(serverContext, executionContext.getOpChainMetadata());
// 2. convert PinotQuery into InstanceRequest list (one for each physical table)
List<InstanceRequest> instanceRequestList =
constructServerQueryRequests(executionContext, serverContext, helixManager.getHelixPropertyStore(), explain);
serverContext.setServerQueryRequests(instanceRequestList.stream()
.map(instanceRequest -> new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true))
.collect(Collectors.toList()));
// compile the OpChain
// 2. Convert PinotQuery into InstanceRequest list (one for each physical table)
PinotQuery pinotQuery = serverContext.getPinotQuery();
pinotQuery.setExplain(explain);
List<InstanceRequest> instanceRequests =
constructServerQueryRequests(executionContext, pinotQuery, helixManager.getHelixPropertyStore());
int numRequests = instanceRequests.size();
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(numRequests);
for (InstanceRequest instanceRequest : instanceRequests) {
serverQueryRequests.add(new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true));
}
serverContext.setServerQueryRequests(serverQueryRequests);
// 3. Compile the OpChain
executionContext.setLeafStageContext(serverContext);
return PlanNodeToOpChain.convert(stagePlan.getRootNode(), executionContext, relationConsumer);
}
Expand All @@ -131,85 +135,85 @@ private static void constructPinotQueryPlan(ServerPlanRequestContext serverConte

/**
* Entry point to construct a list of {@link InstanceRequest}s for executing leaf-stage v1 runner.
*
* @param serverContext the server opChain execution context of the stage.
* @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
* @return a list of server instance request to be run.
*/
public static List<InstanceRequest> constructServerQueryRequests(OpChainExecutionContext executionContext,
ServerPlanRequestContext serverContext, ZkHelixPropertyStore<ZNRecord> helixPropertyStore, boolean explain) {
int stageId = executionContext.getStageId();
PinotQuery pinotQuery, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
StageMetadata stageMetadata = executionContext.getStageMetadata();
String rawTableName = stageMetadata.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(stageMetadata.getTableName());
// ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
// network traffic. but there's chance to improve this:
// TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
Schema schema = ZKMetadataProvider.getSchema(helixPropertyStore, rawTableName);
Map<String, List<String>> tableSegmentsMap = executionContext.getWorkerMetadata().getTableSegmentsMap();
assert tableSegmentsMap != null;
List<InstanceRequest> requests = new ArrayList<>(tableSegmentsMap.size());
for (Map.Entry<String, List<String>> entry : tableSegmentsMap.entrySet()) {
TimeBoundaryInfo timeBoundary = stageMetadata.getTimeBoundary();
int numRequests = tableSegmentsMap.size();
if (numRequests == 1) {
Map.Entry<String, List<String>> entry = tableSegmentsMap.entrySet().iterator().next();
String tableType = entry.getKey();
List<String> segments = entry.getValue();
// ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
// network traffic. but there's chance to improve this:
// TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
if (TableType.OFFLINE.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
stageMetadata.getTimeBoundary(), TableType.OFFLINE, segments, explain));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
stageMetadata.getTimeBoundary(), TableType.REALTIME, segments, explain));
if (tableType.equals(TableType.OFFLINE.name())) {
String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, offlineTableName);
return List.of(
compileInstanceRequest(executionContext, pinotQuery, offlineTableName, tableConfig, schema, timeBoundary,
TableType.OFFLINE, segments));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
assert tableType.equals(TableType.REALTIME.name());
String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, realtimeTableName);
return List.of(
compileInstanceRequest(executionContext, pinotQuery, realtimeTableName, tableConfig, schema, timeBoundary,
TableType.REALTIME, segments));
}
} else {
assert numRequests == 2;
List<String> offlineSegments = tableSegmentsMap.get(TableType.OFFLINE.name());
List<String> realtimeSegments = tableSegmentsMap.get(TableType.REALTIME.name());
assert offlineSegments != null && realtimeSegments != null;
String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
TableConfig offlineTableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, offlineTableName);
TableConfig realtimeTableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, realtimeTableName);
// NOTE: Make a deep copy of PinotQuery for OFFLINE request.
return List.of(
compileInstanceRequest(executionContext, new PinotQuery(pinotQuery), offlineTableName, offlineTableConfig,
schema, timeBoundary, TableType.OFFLINE, offlineSegments),
compileInstanceRequest(executionContext, pinotQuery, realtimeTableName, realtimeTableConfig, schema,
timeBoundary, TableType.REALTIME, realtimeSegments));
}
return requests;
}

/**
* Convert {@link PinotQuery} into an {@link InstanceRequest}.
*/
private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext,
ServerPlanRequestContext serverContext, int stageId, TableConfig tableConfig, Schema schema,
TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList, boolean explain) {
private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext, PinotQuery pinotQuery,
String tableNameWithType, @Nullable TableConfig tableConfig, @Nullable Schema schema,
@Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) {
// Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
long requestId =
(executionContext.getRequestId() << 16) + ((long) stageId << 8) + (tableType == TableType.REALTIME ? 1 : 0);
// 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly
PinotQuery pinotQuery = new PinotQuery(serverContext.getPinotQuery());
pinotQuery.setExplain(explain);
// - attach table type
DataSource dataSource = pinotQuery.getDataSource();
String rawTableName = dataSource.getTableName();
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
dataSource.setTableName(tableNameWithType);
pinotQuery.setDataSource(dataSource);
// - attach time boundary.
long requestId = (executionContext.getRequestId() << 16) + ((long) executionContext.getStageId() << 8) + (
tableType == TableType.REALTIME ? 1 : 0);
// 1. Modify the PinotQuery
pinotQuery.getDataSource().setTableName(tableNameWithType);
if (timeBoundaryInfo != null) {
attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == TableType.OFFLINE);
}
// - perform global rewrite/optimize
for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
pinotQuery = queryRewriter.rewrite(pinotQuery);
}
QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);

// 2. set pinot query options according to requestMetadataMap
// 2. Update query options according to requestMetadataMap
updateQueryOptions(pinotQuery, executionContext);

// 3. wrapped around in broker request and replace with actual table name with type.
// 3. Wrap PinotQuery into BrokerRequest
BrokerRequest brokerRequest = new BrokerRequest();
brokerRequest.setPinotQuery(pinotQuery);
QuerySource querySource = new QuerySource();
querySource.setTableName(dataSource.getTableName());
querySource.setTableName(tableNameWithType);
brokerRequest.setQuerySource(querySource);

// 3. create instance request with segmentList
// 4. Create InstanceRequest with segmentList
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(requestId);
instanceRequest.setBrokerId("unknown");
Expand Down

0 comments on commit 07998f4

Please sign in to comment.