diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index cc640110fcf..46a0839ddec 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -92,6 +92,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; +import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; @@ -259,6 +260,11 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption long requestId = _brokerIdGenerator.get(); requestContext.setRequestId(requestId); + if (httpHeaders != null) { + requestContext.setRequestHttpHeaders(httpHeaders.getRequestHeaders().entrySet().stream() + .filter(entry -> PinotBrokerQueryEventListenerFactory.getAllowlistQueryRequestHeaders() + .contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } // First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level // check comes later. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java index 828f214a16b..4aa4add27cb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java @@ -20,19 +20,25 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Optional; import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME; +import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS; import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME; public class PinotBrokerQueryEventListenerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class); private static BrokerQueryEventListener _brokerQueryEventListener = null; + private static List _allowlistQueryRequestHeaders = new ArrayList<>(); private PinotBrokerQueryEventListenerFactory() { } @@ -44,6 +50,8 @@ private PinotBrokerQueryEventListenerFactory() { public synchronized static void init(PinotConfiguration eventListenerConfiguration) { // Initializes BrokerQueryEventListener. initializeBrokerQueryEventListener(eventListenerConfiguration); + // Initializes request headers + initializeAllowlistQueryRequestHeaders(eventListenerConfiguration); } /** @@ -78,6 +86,19 @@ private static void initializeBrokerQueryEventListener(PinotConfiguration eventL + "Please check if any pinot-event-listener related jar is actually added to the classpath."); } + /** + * Initializes allowlist request-headers to extract from query request. + * @param eventListenerConfiguration The subset of the configuration containing the event-listener-related keys + */ + private static void initializeAllowlistQueryRequestHeaders(PinotConfiguration eventListenerConfiguration) { + List allowlistQueryRequestHeaders = + Splitter.on(",").omitEmptyStrings().trimResults() + .splitToList(eventListenerConfiguration.getProperty(CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS, "")); + + LOGGER.info("{}: allowlist headers will be used for PinotBrokerQueryEventListener", allowlistQueryRequestHeaders); + registerAllowlistQueryRequestHeaders(allowlistQueryRequestHeaders); + } + /** * Registers a broker event listener. */ @@ -86,6 +107,14 @@ private static void registerBrokerEventListener(BrokerQueryEventListener brokerQ _brokerQueryEventListener = brokerQueryEventListener; } + /** + * Registers allowlist http headers for query-requests. + */ + private static void registerAllowlistQueryRequestHeaders(List allowlistQueryRequestHeaders) { + LOGGER.info("Registering query request headers allowlist : {}", allowlistQueryRequestHeaders); + _allowlistQueryRequestHeaders = ImmutableList.copyOf(allowlistQueryRequestHeaders); + } + /** * Returns the brokerQueryEventListener. If the BrokerQueryEventListener is null, * first creates and initializes the BrokerQueryEventListener. @@ -103,4 +132,9 @@ public static synchronized BrokerQueryEventListener getBrokerQueryEventListener( public static BrokerQueryEventListener getBrokerQueryEventListener() { return getBrokerQueryEventListener(new PinotConfiguration(Collections.emptyMap())); } + + @VisibleForTesting + public static List getAllowlistQueryRequestHeaders() { + return _allowlistQueryRequestHeaders; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index 6ce063d2536..ec2b8a5a6b7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -80,6 +80,7 @@ public class DefaultRequestContext implements RequestScope { private long _explainPlanNumMatchAllFilterSegments; private Map _traceInfo = new HashMap<>(); private List _processingExceptions = new ArrayList<>(); + private Map> _requestHttpHeaders = new HashMap<>(); public DefaultRequestContext() { } @@ -562,6 +563,16 @@ public void setProcessingExceptions(List processingExceptions) { _processingExceptions.addAll(processingExceptions); } + @Override + public Map> getRequestHttpHeaders() { + return _requestHttpHeaders; + } + + @Override + public void setRequestHttpHeaders(Map> requestHttpHeaders) { + _requestHttpHeaders.putAll(requestHttpHeaders); + } + @Override public void close() { } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index f8ec35a921d..d1ca85f997f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -217,6 +217,10 @@ default boolean isSampledRequest() { void setProcessingExceptions(List processingExceptions); + Map> getRequestHttpHeaders(); + + void setRequestHttpHeaders(Map> requestHttpHeaders); + enum FanoutType { OFFLINE, REALTIME, HYBRID } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 368e194a394..4a39e278abf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -44,6 +44,7 @@ private CommonConstants() { public static final String UNKNOWN = "unknown"; public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME = "factory.className"; public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME = "factory.className"; + public static final String CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS = "request.context.tracked.header.keys"; public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME = "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory"; public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =