From 403790b88f9e2f1f92adec903ec0d2ea572406b7 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Wed, 1 Nov 2023 14:45:00 -0700 Subject: [PATCH] Support auth for queryRunner (#11897) --- .../pinot/common/auth/AuthProviderUtils.java | 55 ++++++++++++++++ .../command/AbstractBaseAdminCommand.java | 50 --------------- .../tools/admin/command/AddSchemaCommand.java | 4 +- .../tools/admin/command/AddTableCommand.java | 7 ++- .../tools/admin/command/AddTenantCommand.java | 4 +- .../admin/command/BootstrapTableCommand.java | 3 +- .../tools/admin/command/ChangeTableState.java | 4 +- .../admin/command/DeleteSchemaCommand.java | 3 +- .../admin/command/DeleteTableCommand.java | 3 +- .../admin/command/ImportDataCommand.java | 4 +- .../LaunchDataIngestionJobCommand.java | 4 +- .../command/OperateClusterConfigCommand.java | 4 +- .../tools/admin/command/PostQueryCommand.java | 5 +- .../admin/command/UploadSegmentCommand.java | 4 +- .../pinot/tools/perf/PerfBenchmarkDriver.java | 8 ++- .../tools/perf/PerfBenchmarkDriverConf.java | 36 +++++++++++ .../pinot/tools/perf/PerfBenchmarkRunner.java | 20 ++++++ .../apache/pinot/tools/perf/QueryRunner.java | 62 ++++++++----------- 18 files changed, 177 insertions(+), 103 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java index 48b4fc94bba..513bbf0929e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java @@ -168,4 +168,59 @@ static String getOrDefault(AuthConfig config, String key, String defaultValue) { } throw new IllegalArgumentException("Expected String but got " + config.getProperties().get(key).getClass()); } + + /** + * Generate an (optional) HTTP Authorization header given an auth config + * + * @param authProvider auth provider + * @return list of headers + */ + public static List
makeAuthHeaders(AuthProvider authProvider) { + return toRequestHeaders(authProvider); + } + + /** + * Generate an (optional) HTTP Authorization header given an auth config + * + * @param authProvider auth provider + * @return Map of headers + */ + public static Map makeAuthHeadersMap(AuthProvider authProvider) { + if (authProvider == null) { + return Collections.emptyMap(); + } + return authProvider.getRequestHeaders().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); + } + + /** + * Generate auth token from pass-thru token or generate basic auth from user/password pair + * + * @param provider optional provider + * @param tokenUrl optional token url + * @param authToken optional pass-thru token + * @param user optional username + * @param password optional password + * @return auth provider, or NullauthProvider if neither pass-thru token nor user info available + */ + public static AuthProvider makeAuthProvider(@Nullable AuthProvider provider, String tokenUrl, String authToken, + String user, String password) { + if (provider != null) { + return provider; + } + + if (StringUtils.isNotBlank(tokenUrl)) { + return new UrlAuthProvider(tokenUrl); + } + + if (StringUtils.isNotBlank(authToken)) { + return new StaticTokenAuthProvider(authToken); + } + + if (StringUtils.isNotBlank(user)) { + return new StaticTokenAuthProvider(BasicAuthUtils.toBasicAuthToken(user, password)); + } + + return new NullAuthProvider(); + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java index a22b97c4dea..a9daa01a016 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AbstractBaseAdminCommand.java @@ -31,16 +31,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.commons.configuration2.ex.ConfigurationException; -import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; -import org.apache.pinot.common.auth.AuthProviderUtils; -import org.apache.pinot.common.auth.BasicAuthUtils; -import org.apache.pinot.common.auth.NullAuthProvider; -import org.apache.pinot.common.auth.StaticTokenAuthProvider; -import org.apache.pinot.common.auth.UrlAuthProvider; -import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.tools.AbstractBaseCommand; import org.apache.pinot.tools.utils.PinotConfigUtils; @@ -130,46 +122,4 @@ Map readConfigFromFile(String configFileName) throws ConfigurationException { return PinotConfigUtils.readConfigFromFile(configFileName); } - - /** - * Generate an (optional) HTTP Authorization header given an auth config - * - * @param authProvider auth provider - * @return list of headers - */ - static List
makeAuthHeaders(AuthProvider authProvider) { - return AuthProviderUtils.toRequestHeaders(authProvider); - } - - /** - * Generate auth token from pass-thru token or generate basic auth from user/password pair - * - * @param provider optional provider - * @param tokenUrl optional token url - * @param authToken optional pass-thru token - * @param user optional username - * @param password optional password - * @return auth provider, or NullauthProvider if neither pass-thru token nor user info available - */ - @Nullable - static AuthProvider makeAuthProvider(AuthProvider provider, String tokenUrl, String authToken, String user, - String password) { - if (provider != null) { - return provider; - } - - if (StringUtils.isNotBlank(tokenUrl)) { - return new UrlAuthProvider(tokenUrl); - } - - if (StringUtils.isNotBlank(authToken)) { - return new StaticTokenAuthProvider(authToken); - } - - if (StringUtils.isNotBlank(user)) { - return new StaticTokenAuthProvider(BasicAuthUtils.toBasicAuthToken(user, password)); - } - - return new NullAuthProvider(); - } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java index 7ddc21b88de..7643628113c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddSchemaCommand.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.net.URI; import java.util.Collections; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.data.Schema; @@ -178,7 +179,8 @@ public boolean execute() .getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort)); schemaURI = new URI(schemaURI + "?override=" + _override + "?force=" + _force); fileUploadDownloadClient.addSchema(schemaURI, - schema.getSchemaName(), schemaFile, makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, + schema.getSchemaName(), schemaFile, AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)), Collections.emptyList()); } catch (Exception e) { LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java index 76bdfada058..898ef86e86c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTableCommand.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.Callable; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.TableConfigs; import org.apache.pinot.spi.config.table.TableConfig; @@ -188,7 +189,8 @@ public boolean sendTableCreationRequest(JsonNode node) throws IOException { String res = AbstractBaseAdminCommand.sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsCreate(), node.toString(), - makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); + AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); LOGGER.info(res); return res.contains("successfully added"); } @@ -197,7 +199,8 @@ public boolean sendTableUpdateRequest(JsonNode node, String tableName) throws IOException { String res = AbstractBaseAdminCommand.sendRequest("PUT", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsUpdate(tableName), node.toString(), - makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); + AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); LOGGER.info(res); return res.contains("TableConfigs updated"); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java index 53203ad21f6..b8cc9a29cf3 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/AddTenantCommand.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tools.admin.command; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; @@ -153,7 +154,8 @@ public boolean execute() Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount); String res = AbstractBaseAdminCommand .sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(), - tenant.toJsonString(), makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, + tenant.toJsonString(), AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); LOGGER.info(res); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java index 2e6b95b9c70..59fdc70a7e8 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BootstrapTableCommand.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tools.admin.command; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.CommonConstants; @@ -139,6 +140,6 @@ public boolean execute() _controllerHost = NetUtils.getHostAddress(); } return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir, - makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)).execute(); + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)).execute(); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java index e6007b42bd8..188cdfc7892 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ChangeTableState.java @@ -25,6 +25,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; @@ -93,7 +94,8 @@ public boolean execute() URI_TABLES_PATH + _tableName, "state=" + stateValue, null); HttpGet httpGet = new HttpGet(uri); - makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)) + AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)) .forEach(header -> httpGet.addHeader(header.getName(), header.getValue())); HttpResponse response = httpClient.execute(httpGet); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteSchemaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteSchemaCommand.java index 1378a63f8f3..823edfef938 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteSchemaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteSchemaCommand.java @@ -19,6 +19,7 @@ package org.apache.pinot.tools.admin.command; import java.util.Collections; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; @@ -150,7 +151,7 @@ public boolean execute() fileUploadDownloadClient.getHttpClient().sendDeleteRequest( FileUploadDownloadClient.getDeleteSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _schemaName), Collections.emptyMap(), - makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); } catch (Exception e) { LOGGER.error("Got Exception while deleting Pinot Schema: " + _schemaName, e); return false; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteTableCommand.java index 54c8709d6c3..fa4a3c53bc8 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteTableCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DeleteTableCommand.java @@ -19,6 +19,7 @@ package org.apache.pinot.tools.admin.command; import java.util.Collections; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; @@ -172,7 +173,7 @@ public boolean execute() try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { fileUploadDownloadClient.getHttpClient().sendDeleteRequest(FileUploadDownloadClient .getDeleteTableURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), - _tableName, _type, _retention), Collections.emptyMap(), makeAuthProvider(_authProvider, + _tableName, _type, _retention), Collections.emptyMap(), AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); } catch (Exception e) { LOGGER.error("Got Exception while deleting Pinot Table: " + _tableName, e); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java index 696058ef223..7b85106060a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.filesystem.PinotFSFactory; @@ -259,7 +260,8 @@ private SegmentGenerationJobSpec generateSegmentGenerationJobSpec() { spec.setCleanUpOutputDir(true); spec.setOverwriteOutput(true); spec.setJobType("SegmentCreationAndTarPush"); - spec.setAuthToken(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken()); + spec.setAuthToken( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken()); // set ExecutionFrameworkSpec ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java index a56b820a5fb..ff129c91ce4 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.TlsUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; @@ -123,7 +124,8 @@ public boolean execute() } if (StringUtils.isBlank(spec.getAuthToken())) { - spec.setAuthToken(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken()); + spec.setAuthToken(AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password) + .getTaskToken()); } try { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java index 37868d85d57..409f0c7d960 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OperateClusterConfigCommand.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.http.Header; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -154,7 +155,8 @@ public String run() } String clusterConfigUrl = _controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs"; - List
headers = makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, + List
headers = AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); switch (_operation.toUpperCase()) { case "ADD": diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java index aad2e05430b..44b7f80d3bf 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; @@ -146,8 +147,8 @@ public String run() payload.putAll(_additionalOptions); } String request = JsonUtils.objectToString(payload); - return sendRequest("POST", url, request, makeAuthHeaders(makeAuthProvider(_authProvider, - _authTokenUrl, _authToken, _user, _password))); + return sendRequest("POST", url, request, AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); } @Override diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java index 1e610b6f182..c174046617d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.http.Header; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.spi.auth.AuthProvider; @@ -187,7 +188,8 @@ public boolean execute() LOGGER.info("Uploading segment tar file: {}", segmentTarFile); List
headerList = - makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); + AuthProviderUtils.makeAuthHeaders( + AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)); FileInputStream fileInputStream = new FileInputStream(segmentTarFile); fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java index 1f0e5d17baf..4328cec7465 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java @@ -31,7 +31,6 @@ import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -45,6 +44,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerStarter; @@ -102,6 +102,7 @@ public class PerfBenchmarkDriver { // updates ZKSegmentMetadata only, is not exposed from controller API so we need to update segments directly via // PinotHelixResourceManager. private PinotHelixResourceManager _helixResourceManager; + private Map _headers; public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf) { this(conf, "/tmp/", "HEAP", null, conf.isVerbose()); @@ -120,6 +121,9 @@ public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf, String tempDir, String _loadMode = loadMode; _segmentFormatVersion = segmentFormatVersion; _verbose = verbose; + _headers = AuthProviderUtils.makeAuthHeadersMap( + AuthProviderUtils.makeAuthProvider(null, _conf.getAuthTokenUrl(), _conf.getAuthToken(), _conf.getUser(), + _conf.getPassword())); init(); } @@ -407,7 +411,7 @@ private void postQueries() public JsonNode postQuery(String query) throws Exception { - return postQuery(query, Collections.emptyMap()); + return postQuery(query, _headers); } public JsonNode postQuery(String query, Map headers) diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java index 2c7b0a4e61a..3376fd9ba09 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java @@ -71,6 +71,10 @@ public class PerfBenchmarkDriverConf { String _resultsOutputDirectory; boolean _verbose = false; + private String _user; + private String _password; + private String _authToken; + private String _authTokenUrl; public String getClusterName() { return _clusterName; @@ -279,4 +283,36 @@ public boolean isVerbose() { public void setVerbose(boolean verbose) { _verbose = verbose; } + + public void setUser(String user) { + _user = user; + } + + public String getUser() { + return _user; + } + + public void setPassword(String password) { + _password = password; + } + + public String getPassword() { + return _password; + } + + public void setAuthToken(String authToken) { + _authToken = authToken; + } + + public String getAuthToken() { + return _authToken; + } + + public void setAuthTokenUrl(String authTokenUrl) { + _authTokenUrl = authTokenUrl; + } + + public String getAuthTokenUrl() { + return _authTokenUrl; + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java index c816b1eaea3..05a3b60379d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java @@ -80,6 +80,18 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command description = "Comma separated bloom filter columns to be created (non-batch load).") private String _bloomFilterColumns; + @CommandLine.Option(names = {"-user"}, required = false, description = "Username for basic auth.") + private String _user; + + @CommandLine.Option(names = {"-password"}, required = false, description = "Password for basic auth.") + private String _password; + + @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") + private String _authToken; + + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, usageHelp = true, description = "Print this message.") private boolean _help = false; @@ -115,6 +127,10 @@ public void startAllButServer() throws Exception { PerfBenchmarkDriverConf perfBenchmarkDriverConf = new PerfBenchmarkDriverConf(); perfBenchmarkDriverConf.setStartServer(false); + perfBenchmarkDriverConf.setUser(_user); + perfBenchmarkDriverConf.setPassword(_password); + perfBenchmarkDriverConf.setAuthToken(_authToken); + perfBenchmarkDriverConf.setAuthTokenUrl(_authTokenUrl); PerfBenchmarkDriver driver = new PerfBenchmarkDriver(perfBenchmarkDriverConf, _tempDir, _loadMode, _segmentFormatVersion, false); driver.run(); @@ -127,6 +143,10 @@ private void startServerWithPreLoadedSegments() perfBenchmarkDriverConf.setStartController(false); perfBenchmarkDriverConf.setStartBroker(false); perfBenchmarkDriverConf.setServerInstanceDataDir(_dataDir); + perfBenchmarkDriverConf.setUser(_user); + perfBenchmarkDriverConf.setPassword(_password); + perfBenchmarkDriverConf.setAuthToken(_authToken); + perfBenchmarkDriverConf.setAuthTokenUrl(_authTokenUrl); final PerfBenchmarkDriver driver = new PerfBenchmarkDriver(perfBenchmarkDriverConf, _tempDir, _loadMode, _segmentFormatVersion, false); driver.run(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java index 8307a2152d3..a588d370979 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java @@ -36,6 +36,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.io.IOUtils; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.tools.AbstractBaseCommand; import org.apache.pinot.tools.Command; import org.slf4j.Logger; @@ -99,6 +100,14 @@ public class QueryRunner extends AbstractBaseCommand implements Command { @CommandLine.Option(names = {"-verbose"}, required = false, description = "Enable verbose query logging (default: " + "false).") private boolean _verbose = false; + @CommandLine.Option(names = {"-user"}, required = false, description = "Username for basic auth.") + private String _user; + @CommandLine.Option(names = {"-password"}, required = false, description = "Password for basic auth.") + private String _password; + @CommandLine.Option(names = {"-authToken"}, required = false, description = "Http auth token.") + private String _authToken; + @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description = "Http auth token url.") + private String _authTokenUrl; @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print " + "this message.") private boolean _help; @@ -168,6 +177,10 @@ public boolean execute() conf.setStartBroker(false); conf.setStartServer(false); conf.setVerbose(_verbose); + conf.setUser(_user); + conf.setPassword(_password); + conf.setAuthToken(_authToken); + conf.setAuthTokenUrl(_authTokenUrl); List queries = makeQueries(IOUtils.readLines(new FileInputStream(_queryFile)), QueryMode.valueOf(_queryMode.toUpperCase()), @@ -179,7 +192,9 @@ public boolean execute() + "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout); singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs, - _numIntervalsToReportAndClearStatistics, _timeout); + _numIntervalsToReportAndClearStatistics, _timeout, + AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null, + _authTokenUrl, _authToken, _user, _password))); break; case "multiThreads": if (_numThreads <= 0) { @@ -192,7 +207,9 @@ public boolean execute() _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout); multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs, - _numIntervalsToReportAndClearStatistics, _timeout); + _numIntervalsToReportAndClearStatistics, _timeout, + AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null, + _authTokenUrl, _authToken, _user, _password))); break; case "targetQPS": if (_numThreads <= 0) { @@ -211,7 +228,9 @@ public boolean execute() _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout); targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, - _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout); + _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout, + AuthProviderUtils.makeAuthHeadersMap( + AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password))); break; case "increasingQPS": if (_numThreads <= 0) { @@ -241,7 +260,9 @@ public boolean execute() _numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _queueDepth, _timeout); increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS, - _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout); + _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout, + AuthProviderUtils.makeAuthHeadersMap( + AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password))); break; default: LOGGER.error("Invalid mode: {}", _mode); @@ -251,13 +272,6 @@ public boolean execute() return true; } - public static QuerySummary singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, List queries, - int numTimesToRunQueries, int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout) - throws Exception { - return singleThreadedQueryRunner(conf, queries, numTimesToRunQueries, reportIntervalMs, - numIntervalsToReportAndClearStatistics, timeout, Collections.emptyMap()); - } - /** * Use single thread to run queries as fast as possible. *

Use a single thread to send queries back to back and log statistic information periodically. @@ -355,14 +369,6 @@ public static QuerySummary singleThreadedQueryRunner(PerfBenchmarkDriverConf con return querySummary; } - public static QuerySummary multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, List queries, - int numTimesToRunQueries, int numThreads, int queueDepth, int reportIntervalMs, - int numIntervalsToReportAndClearStatistics, long timeout) - throws Exception { - return multiThreadedQueryRunner(conf, queries, numTimesToRunQueries, numThreads, queueDepth, reportIntervalMs, - numIntervalsToReportAndClearStatistics, timeout, Collections.emptyMap()); - } - /** * Use multiple threads to run queries as fast as possible. *

Use a concurrent linked queue to buffer the queries to be sent. Use the main thread to insert queries into the @@ -475,14 +481,6 @@ public static QuerySummary multiThreadedQueryRunner(PerfBenchmarkDriverConf conf return querySummary; } - public static QuerySummary targetQPSQueryRunner(PerfBenchmarkDriverConf conf, List queries, - int numTimesToRunQueries, int numThreads, int queueDepth, double startQPS, int reportIntervalMs, - int numIntervalsToReportAndClearStatistics, long timeout) - throws Exception { - return targetQPSQueryRunner(conf, queries, numTimesToRunQueries, numThreads, queueDepth, startQPS, reportIntervalMs, - numIntervalsToReportAndClearStatistics, timeout, Collections.emptyMap()); - } - /** * Use multiple threads to run query at a target QPS. *

Use a concurrent linked queue to buffer the queries to be sent. Use the main thread to insert queries into the @@ -607,15 +605,6 @@ public static QuerySummary targetQPSQueryRunner(PerfBenchmarkDriverConf conf, Li return querySummary; } - public static QuerySummary increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, List queries, - int numTimesToRunQueries, int numThreads, int queueDepth, double startQPS, double deltaQPS, int reportIntervalMs, - int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS, long timeout) - throws Exception { - return increasingQPSQueryRunner(conf, queries, numTimesToRunQueries, numThreads, queueDepth, startQPS, deltaQPS, - reportIntervalMs, numIntervalsToReportAndClearStatistics, numIntervalsToIncreaseQPS, timeout, - Collections.emptyMap()); - } - /** * Use multiple threads to run query at an increasing target QPS. *

Use a concurrent linked queue to buffer the queries to be sent. Use the main thread to insert queries into the @@ -957,7 +946,6 @@ public double getPercentile(double p) { return _statisticsList.get(_statisticsList.size() - 1).getPercentile(p); } - public List getStatisticsList() { return _statisticsList; }