Skip to content

Commit

Permalink
Support auth for queryRunner (apache#11897)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Nov 1, 2023
1 parent 03a9ec7 commit 403790b
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,59 @@ static String getOrDefault(AuthConfig config, String key, String defaultValue) {
}
throw new IllegalArgumentException("Expected String but got " + config.getProperties().get(key).getClass());
}

/**
* Generate an (optional) HTTP Authorization header given an auth config
*
* @param authProvider auth provider
* @return list of headers
*/
public static List<Header> makeAuthHeaders(AuthProvider authProvider) {
return toRequestHeaders(authProvider);
}

/**
* Generate an (optional) HTTP Authorization header given an auth config
*
* @param authProvider auth provider
* @return Map of headers
*/
public static Map<String, String> makeAuthHeadersMap(AuthProvider authProvider) {
if (authProvider == null) {
return Collections.emptyMap();
}
return authProvider.getRequestHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
}

/**
* Generate auth token from pass-thru token or generate basic auth from user/password pair
*
* @param provider optional provider
* @param tokenUrl optional token url
* @param authToken optional pass-thru token
* @param user optional username
* @param password optional password
* @return auth provider, or NullauthProvider if neither pass-thru token nor user info available
*/
public static AuthProvider makeAuthProvider(@Nullable AuthProvider provider, String tokenUrl, String authToken,
String user, String password) {
if (provider != null) {
return provider;
}

if (StringUtils.isNotBlank(tokenUrl)) {
return new UrlAuthProvider(tokenUrl);
}

if (StringUtils.isNotBlank(authToken)) {
return new StaticTokenAuthProvider(authToken);
}

if (StringUtils.isNotBlank(user)) {
return new StaticTokenAuthProvider(BasicAuthUtils.toBasicAuthToken(user, password));
}

return new NullAuthProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.auth.BasicAuthUtils;
import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.auth.StaticTokenAuthProvider;
import org.apache.pinot.common.auth.UrlAuthProvider;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.tools.AbstractBaseCommand;
import org.apache.pinot.tools.utils.PinotConfigUtils;

Expand Down Expand Up @@ -130,46 +122,4 @@ Map<String, Object> readConfigFromFile(String configFileName)
throws ConfigurationException {
return PinotConfigUtils.readConfigFromFile(configFileName);
}

/**
* Generate an (optional) HTTP Authorization header given an auth config
*
* @param authProvider auth provider
* @return list of headers
*/
static List<Header> makeAuthHeaders(AuthProvider authProvider) {
return AuthProviderUtils.toRequestHeaders(authProvider);
}

/**
* Generate auth token from pass-thru token or generate basic auth from user/password pair
*
* @param provider optional provider
* @param tokenUrl optional token url
* @param authToken optional pass-thru token
* @param user optional username
* @param password optional password
* @return auth provider, or NullauthProvider if neither pass-thru token nor user info available
*/
@Nullable
static AuthProvider makeAuthProvider(AuthProvider provider, String tokenUrl, String authToken, String user,
String password) {
if (provider != null) {
return provider;
}

if (StringUtils.isNotBlank(tokenUrl)) {
return new UrlAuthProvider(tokenUrl);
}

if (StringUtils.isNotBlank(authToken)) {
return new StaticTokenAuthProvider(authToken);
}

if (StringUtils.isNotBlank(user)) {
return new StaticTokenAuthProvider(BasicAuthUtils.toBasicAuthToken(user, password));
}

return new NullAuthProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.Collections;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -178,7 +179,8 @@ public boolean execute()
.getUploadSchemaURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort));
schemaURI = new URI(schemaURI + "?override=" + _override + "?force=" + _force);
fileUploadDownloadClient.addSchema(schemaURI,
schema.getSchemaName(), schemaFile, makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
schema.getSchemaName(), schemaFile, AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken,
_user, _password)), Collections.emptyList());
} catch (Exception e) {
LOGGER.error("Got Exception to upload Pinot Schema: " + schema.getSchemaName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.TableConfigs;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -188,7 +189,8 @@ public boolean sendTableCreationRequest(JsonNode node)
throws IOException {
String res = AbstractBaseAdminCommand.sendRequest("POST",
ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsCreate(), node.toString(),
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)));
AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)));
LOGGER.info(res);
return res.contains("successfully added");
}
Expand All @@ -197,7 +199,8 @@ public boolean sendTableUpdateRequest(JsonNode node, String tableName)
throws IOException {
String res = AbstractBaseAdminCommand.sendRequest("PUT",
ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsUpdate(tableName), node.toString(),
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)));
AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)));
LOGGER.info(res);
return res.contains("TableConfigs updated");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;

import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
Expand Down Expand Up @@ -153,7 +154,8 @@ public boolean execute()
Tenant tenant = new Tenant(_role, _name, _instanceCount, _offlineInstanceCount, _realtimeInstanceCount);
String res = AbstractBaseAdminCommand
.sendRequest("POST", ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTenantCreate(),
tenant.toJsonString(), makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user,
tenant.toJsonString(), AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user,
_password)));

LOGGER.info(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;

import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -139,6 +140,6 @@ public boolean execute()
_controllerHost = NetUtils.getHostAddress();
}
return new BootstrapTableTool(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort), _dir,
makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)).execute();
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
Expand Down Expand Up @@ -93,7 +94,8 @@ public boolean execute()
URI_TABLES_PATH + _tableName, "state=" + stateValue, null);

HttpGet httpGet = new HttpGet(uri);
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))
AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))
.forEach(header -> httpGet.addHeader(header.getName(), header.getValue()));

HttpResponse response = httpClient.execute(httpGet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.tools.admin.command;

import java.util.Collections;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -150,7 +151,7 @@ public boolean execute()
fileUploadDownloadClient.getHttpClient().sendDeleteRequest(
FileUploadDownloadClient.getDeleteSchemaURI(_controllerProtocol, _controllerHost,
Integer.parseInt(_controllerPort), _schemaName), Collections.emptyMap(),
makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password));
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password));
} catch (Exception e) {
LOGGER.error("Got Exception while deleting Pinot Schema: " + _schemaName, e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.tools.admin.command;

import java.util.Collections;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -172,7 +173,7 @@ public boolean execute()
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
fileUploadDownloadClient.getHttpClient().sendDeleteRequest(FileUploadDownloadClient
.getDeleteTableURI(_controllerProtocol, _controllerHost, Integer.parseInt(_controllerPort),
_tableName, _type, _retention), Collections.emptyMap(), makeAuthProvider(_authProvider,
_tableName, _type, _retention), Collections.emptyMap(), AuthProviderUtils.makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user, _password));
} catch (Exception e) {
LOGGER.error("Got Exception while deleting Pinot Table: " + _tableName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
Expand Down Expand Up @@ -259,7 +260,8 @@ private SegmentGenerationJobSpec generateSegmentGenerationJobSpec() {
spec.setCleanUpOutputDir(true);
spec.setOverwriteOutput(true);
spec.setJobType("SegmentCreationAndTarPush");
spec.setAuthToken(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken());
spec.setAuthToken(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken());

// set ExecutionFrameworkSpec
ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
Expand Down Expand Up @@ -123,7 +124,8 @@ public boolean execute()
}

if (StringUtils.isBlank(spec.getAuthToken())) {
spec.setAuthToken(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password).getTaskToken());
spec.setAuthToken(AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)
.getTaskToken());
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -154,7 +155,8 @@ public String run()
}
String clusterConfigUrl =
_controllerProtocol + "://" + _controllerHost + ":" + _controllerPort + "/cluster/configs";
List<Header> headers = makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user,
List<Header> headers = AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user,
_password));
switch (_operation.toUpperCase()) {
case "ADD":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -146,8 +147,8 @@ public String run()
payload.putAll(_additionalOptions);
}
String request = JsonUtils.objectToString(payload);
return sendRequest("POST", url, request, makeAuthHeaders(makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user, _password)));
return sendRequest("POST", url, request, AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.http.Header;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.spi.auth.AuthProvider;
Expand Down Expand Up @@ -187,7 +188,8 @@ public boolean execute()

LOGGER.info("Uploading segment tar file: {}", segmentTarFile);
List<Header> headerList =
makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password));
AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password));

FileInputStream fileInputStream = new FileInputStream(segmentTarFile);
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -45,6 +44,7 @@
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
Expand Down Expand Up @@ -102,6 +102,7 @@ public class PerfBenchmarkDriver {
// updates ZKSegmentMetadata only, is not exposed from controller API so we need to update segments directly via
// PinotHelixResourceManager.
private PinotHelixResourceManager _helixResourceManager;
private Map<String, String> _headers;

public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf) {
this(conf, "/tmp/", "HEAP", null, conf.isVerbose());
Expand All @@ -120,6 +121,9 @@ public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf, String tempDir, String
_loadMode = loadMode;
_segmentFormatVersion = segmentFormatVersion;
_verbose = verbose;
_headers = AuthProviderUtils.makeAuthHeadersMap(
AuthProviderUtils.makeAuthProvider(null, _conf.getAuthTokenUrl(), _conf.getAuthToken(), _conf.getUser(),
_conf.getPassword()));
init();
}

Expand Down Expand Up @@ -407,7 +411,7 @@ private void postQueries()

public JsonNode postQuery(String query)
throws Exception {
return postQuery(query, Collections.emptyMap());
return postQuery(query, _headers);
}

public JsonNode postQuery(String query, Map<String, String> headers)
Expand Down
Loading

0 comments on commit 403790b

Please sign in to comment.