From f0460f124912fd5678ec3263f5b222b5742d754c Mon Sep 17 00:00:00 2001 From: dtb <2011xuesong@gmail.com> Date: Tue, 7 May 2024 10:44:04 +0800 Subject: [PATCH] code refactoring completed --- HowToKerberize.md | 15 +- .../AbstractThriftMetastoreClientManager.java | 174 ++++++++++ .../client/DefaultMetaStoreClientFactory.java | 20 +- .../SaslThriftMetastoreClientManager.java | 233 +++++++++++++ .../client/ThriftMetastoreClientManager.java | 319 ++---------------- .../server/FederatedHMSHandler.java | 13 +- .../server/FederatedHMSHandlerFactory.java | 10 +- .../server/MetaStoreProxyServer.java | 56 +-- .../waggledance/server/SaslServerWrapper.java | 106 ++++++ .../bdp/waggledance/util/SaslHelper.java | 60 ---- .../FederatedHMSHandlerFactoryTest.java | 7 +- .../server/FederatedHMSHandlerTest.java | 16 +- 12 files changed, 595 insertions(+), 434 deletions(-) create mode 100644 waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java create mode 100644 waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java create mode 100644 waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java diff --git a/HowToKerberize.md b/HowToKerberize.md index b876cab19..30908f83c 100644 --- a/HowToKerberize.md +++ b/HowToKerberize.md @@ -70,11 +70,6 @@ federated-meta-stores: - .* ``` -In start shell , add jvm properties maybe useful. -``` --Djavax.security.auth.useSubjectCredsOnly=false -``` - Connect to Waggle Dance via beeline, change ` hive.metastore.uris` in Hive configuration file `hive-site.xml`: ``` @@ -87,10 +82,6 @@ Connect to Waggle Dance via beeline, change ` hive.metastore.uris` in Hive confi Waggle Dance should be started by a privileged user with a fresh keytab. -If Waggle Dance throws a GSS exception, you have problem with the keytab file. -Try to perform `kdestroy` and `kinit` operations and check the keytab file ownership flags. - -If the Metastore throws an exception with code -127, Waggle Dance is probably using the wrong authentication policy. -Check the values in `hive-conf.xml` and make sure that HIVE_HOME and HIVE_CONF_DIR are defined. - -Don't forget to restart hive services! +Just start the service directly, no kinit operation is required. +Because the ticket information is saved in jvm instead of being saved in a local file. +In this way, it can automatically renew without the need for additional operations to renew local tickets. \ No newline at end of file diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java new file mode 100644 index 000000000..e49eb82a3 --- /dev/null +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.client; + +import java.io.Closeable; +import java.net.URI; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConfUtil; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; + +import lombok.extern.log4j.Log4j2; + +import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory; + +@Log4j2 +public abstract class AbstractThriftMetastoreClientManager implements Closeable { + + + static final AtomicInteger CONN_COUNT = new AtomicInteger(0); + final HiveConf conf; + final HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory; + final URI[] metastoreUris; + ThriftHiveMetastore.Iface client = null; + TTransport transport = null; + boolean isConnected = false; + // for thrift connects + int retries = 5; + long retryDelaySeconds = 0; + + final int connectionTimeout; + final String msUri; + + AbstractThriftMetastoreClientManager( + HiveConf conf, + HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory, + int connectionTimeout) { + this.conf = conf; + this.hiveCompatibleThriftHiveMetastoreIfaceFactory = hiveCompatibleThriftHiveMetastoreIfaceFactory; + this.connectionTimeout = connectionTimeout; + msUri = conf.getVar(ConfVars.METASTOREURIS); + + if (HiveConfUtil.isEmbeddedMetaStore(msUri)) { + throw new RuntimeException("You can't waggle an embedded metastore"); + } + + // get the number retries + retries = HiveConf.getIntVar(conf, ConfVars.METASTORETHRIFTCONNECTIONRETRIES); + retryDelaySeconds = conf.getTimeVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); + + // user wants file store based configuration + if (msUri != null) { + String[] metastoreUrisString = msUri.split(","); + metastoreUris = new URI[metastoreUrisString.length]; + try { + int i = 0; + for (String s : metastoreUrisString) { + URI tmpUri = new URI(s); + if (tmpUri.getScheme() == null) { + throw new IllegalArgumentException("URI: " + s + " does not have a scheme"); + } + metastoreUris[i++] = tmpUri; + } + } catch (IllegalArgumentException e) { + throw (e); + } catch (Exception e) { + String exInfo = "Got exception: " + e.getClass().getName() + " " + e.getMessage(); + log.error(exInfo, e); + throw new RuntimeException(exInfo, e); + } + } else { + log.error("NOT getting uris from conf"); + throw new RuntimeException("MetaStoreURIs not found in conf file"); + } + } + + void open() { + open(null); + } + + abstract void open(HiveUgiArgs ugiArgs); + + void reconnect(HiveUgiArgs ugiArgs) { + close(); + // Swap the first element of the metastoreUris[] with a random element from the rest + // of the array. Rationale being that this method will generally be called when the default + // connection has died and the default connection is likely to be the first array element. + promoteRandomMetaStoreURI(); + open(ugiArgs); + } + + public String getHiveConfValue(String key, String defaultValue) { + return conf.get(key, defaultValue); + } + + public void setHiveConfValue(String key, String value) { + conf.set(key, value); + } + + public String generateNewTokenSignature(String defaultTokenSignature) { + String tokenSignature = conf.get(ConfVars.METASTORE_TOKEN_SIGNATURE.varname, + defaultTokenSignature); + conf.set(ConfVars.METASTORE_TOKEN_SIGNATURE.varname, + tokenSignature); + return tokenSignature; + } + + public Boolean isSaslEnabled() { + return conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + } + + @Override + public void close() { + if (!isConnected) { + return; + } + isConnected = false; + try { + if (client != null) { + client.shutdown(); + } + } catch (TException e) { + log.debug("Unable to shutdown metastore client. Will try closing transport directly.", e); + } + // Transport would have got closed via client.shutdown(), so we don't need this, but + // just in case, we make this call. + if ((transport != null) && transport.isOpen()) { + transport.close(); + transport = null; + } + log.info("Closed a connection to metastore, current connections: {}", CONN_COUNT.decrementAndGet()); + } + + boolean isOpen() { + return (transport != null) && transport.isOpen(); + } + + protected ThriftHiveMetastore.Iface getClient() { + return client; + } + + /** + * Swaps the first element of the metastoreUris array with a random element from the remainder of the array. + */ + private void promoteRandomMetaStoreURI() { + if (metastoreUris.length <= 1) { + return; + } + Random rng = new Random(); + int index = rng.nextInt(metastoreUris.length - 1) + 1; + URI tmp = metastoreUris[0]; + metastoreUris[0] = metastoreUris[index]; + metastoreUris[index] = tmp; + } +} diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java index a111677a1..fe9b5873c 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.thrift.transport.TTransportException; import lombok.extern.log4j.Log4j2; @@ -40,7 +41,7 @@ public class DefaultMetaStoreClientFactory implements MetaStoreClientFactory { @Log4j2 private static class ReconnectingMetastoreClientInvocationHandler implements InvocationHandler { - private final ThriftMetastoreClientManager base; + private final AbstractThriftMetastoreClientManager base; private final String name; private final int maxRetries; @@ -49,7 +50,7 @@ private static class ReconnectingMetastoreClientInvocationHandler implements Inv private ReconnectingMetastoreClientInvocationHandler( String name, int maxRetries, - ThriftMetastoreClientManager base) { + AbstractThriftMetastoreClientManager base) { this.name = name; this.maxRetries = maxRetries; this.base = base; @@ -94,7 +95,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } } - private Object doRealCall(Method method, Object[] args, int attempt) throws IllegalAccessException, Throwable { + private Object doRealCall(Method method, Object[] args, int attempt) throws Throwable { do { try { return method.invoke(base.getClient(), args); @@ -146,15 +147,22 @@ public CloseableThriftHiveMetastoreIface newInstance( String name, int reconnectionRetries, int connectionTimeout) { - return newInstance(name, reconnectionRetries, new ThriftMetastoreClientManager(hiveConf, - new HiveCompatibleThriftHiveMetastoreIfaceFactory(), connectionTimeout)); + boolean useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + HiveCompatibleThriftHiveMetastoreIfaceFactory factory = new HiveCompatibleThriftHiveMetastoreIfaceFactory(); + AbstractThriftMetastoreClientManager base = null; + if (useSasl) { + base = new SaslThriftMetastoreClientManager(hiveConf, factory, connectionTimeout); + } else { + base = new ThriftMetastoreClientManager(hiveConf, factory, connectionTimeout); + } + return newInstance(name, reconnectionRetries, base); } @VisibleForTesting CloseableThriftHiveMetastoreIface newInstance( String name, int reconnectionRetries, - ThriftMetastoreClientManager base) { + AbstractThriftMetastoreClientManager base) { ReconnectingMetastoreClientInvocationHandler reconnectingHandler = new ReconnectingMetastoreClientInvocationHandler( name, reconnectionRetries, base); return (CloseableThriftHiveMetastoreIface) Proxy.newProxyInstance(getClass().getClassLoader(), diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java new file mode 100644 index 000000000..57838c20e --- /dev/null +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java @@ -0,0 +1,233 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.client; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.io.IOException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.security.sasl.SaslException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.service.auth.KerberosSaslHelper; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; + +import lombok.extern.log4j.Log4j2; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory; +import com.hotels.bdp.waggledance.context.CommonBeans; + +@Log4j2 +public class SaslThriftMetastoreClientManager extends AbstractThriftMetastoreClientManager { + + private final boolean impersonationEnabled; + private static final Duration delegationTokenCacheTtl = Duration.ofHours( + 1); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime) + private static final long delegationTokenCacheMaximumSize = 1000; + private static final LoadingCache delegationTokenCache = CacheBuilder.newBuilder() + .expireAfterWrite(delegationTokenCacheTtl.toMillis(), MILLISECONDS) + .maximumSize(delegationTokenCacheMaximumSize) + .build(CacheLoader.from(SaslThriftMetastoreClientManager::loadDelegationToken)); + + SaslThriftMetastoreClientManager(HiveConf conf, + HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory, + int connectionTimeout) { + super(conf, hiveCompatibleThriftHiveMetastoreIfaceFactory, connectionTimeout); + impersonationEnabled = conf.getBoolean(CommonBeans.IMPERSONATION_ENABLED_KEY, false); + } + + @Override + void open(HiveUgiArgs ugiArgs) { + if (isConnected) { + return; + } + createMetastoreClientAndOpen(null, ugiArgs); + if (impersonationEnabled) { + try { + String userName = UserGroupInformation.getCurrentUser().getShortUserName(); + DelegationTokenKey key = new DelegationTokenKey(msUri, userName, client); + String delegationToken = delegationTokenCache.get(key); + close(); + createMetastoreClientAndOpen(delegationToken, ugiArgs); + } catch (IOException | ExecutionException e) { + log.error("Couldn't create delegation token client"); + throw new RuntimeException(e); + } + } + } + + void createMetastoreClientAndOpen(String delegationToken, HiveUgiArgs ugiArgs) { + TException te = null; + boolean useSsl = conf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL); + boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL); + int clientSocketTimeout = (int) conf.getTimeVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, + TimeUnit.MILLISECONDS); + + for (int attempt = 0; !isConnected && (attempt < retries); ++attempt) { + for (URI store : metastoreUris) { + log.info("Trying to connect to metastore with URI {}", store); + try { + transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, + connectionTimeout); + // Wrap thrift connection with SASL for secure connection. + try { + UserGroupInformation.setConfiguration(conf); + + // check if we should use delegation tokens to authenticate + // the call below gets hold of the tokens if they are set up by hadoop + // this should happen on the map/reduce tasks if the client added the + // tokens into hadoop's credential store in the front end during job + // submission. + if (impersonationEnabled && delegationToken != null) { + // authenticate using delegation tokens via the "DIGEST" mechanism + transport = KerberosSaslHelper + .getTokenTransport(delegationToken, + store.getHost(), transport, + MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl)); + } else { + String principalConfig = conf.getVar(ConfVars.METASTORE_KERBEROS_PRINCIPAL); + transport = UserGroupInformation.getLoginUser().doAs( + (PrivilegedExceptionAction) () -> KerberosSaslHelper.getKerberosTransport( + principalConfig, store.getHost(), transport, + MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl), false)); + } + } catch (IOException | InterruptedException exception) { + log.error("Couldn't create client transport, URI " + store, exception); + throw new MetaException(exception.toString()); + } + + TProtocol protocol; + if (useCompactProtocol) { + protocol = new TCompactProtocol(transport); + } else { + protocol = new TBinaryProtocol(transport); + } + client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance( + new ThriftHiveMetastore.Client(protocol)); + try { + transport.open(); + log + .info("Opened a connection to metastore '" + + store + + "', total current connections to all metastores: " + + CONN_COUNT.incrementAndGet()); + + isConnected = true; + if (ugiArgs != null) { + log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store); + client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups()); + } else { + log.debug("Connection opened with out #set_ugi call', on URI {}", store); + } + } catch (TException e) { + te = e; + if (log.isDebugEnabled()) { + log.warn("Failed to connect to the MetaStore Server, URI " + store, e); + } else { + // Don't print full exception trace if DEBUG is not on. + log.warn("Failed to connect to the MetaStore Server, URI {}", store); + } + } + } catch (MetaException e) { + log.error("Unable to connect to metastore with URI " + store + " in attempt " + attempt, + e); + } + if (isConnected) { + break; + } + } + // Wait before launching the next round of connection retries. + if (!isConnected && (retryDelaySeconds > 0) && ((attempt + 1) < retries)) { + try { + log.info("Waiting {} seconds before next connection attempt.", retryDelaySeconds); + Thread.sleep(retryDelaySeconds * 1000); + } catch (InterruptedException ignore) { + } + } + } + + if (!isConnected) { + throw new RuntimeException("Could not connect to meta store using any of the URIs [" + + msUri + + "] provided. Most recent failure: " + + StringUtils.stringifyException(te)); + } + log.debug("Connected to metastore."); + } + + private static class DelegationTokenKey { + + String msUri; + String username; + ThriftHiveMetastore.Iface client; + + public DelegationTokenKey(String msUri, String username, Iface client) { + this.msUri = msUri; + this.username = username; + this.client = client; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DelegationTokenKey that = (DelegationTokenKey) o; + return Objects.equals(msUri, that.msUri) && Objects.equals(username, + that.username); + } + + @Override + public int hashCode() { + return Objects.hash(msUri, username); + } + + } + + private static String loadDelegationToken(DelegationTokenKey key) { + try { + return key.client.get_delegation_token(key.username, key.username); + } catch (TException e) { + log.error("could not get delegation token,username:{},uri: {}", key.username, key.msUri); + throw new RuntimeException(e); + } + } +} diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java index 3ab9e2447..7d123ddae 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java @@ -15,181 +15,39 @@ */ package com.hotels.bdp.waggledance.client; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import java.io.Closeable; -import java.io.IOException; import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.time.Duration; -import java.util.Objects; -import java.util.Random; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.security.sasl.SaslException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.conf.HiveConfUtil; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; -import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.service.auth.KerberosSaslHelper; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; import lombok.extern.log4j.Log4j2; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory; -import com.hotels.bdp.waggledance.context.CommonBeans; @Log4j2 -class ThriftMetastoreClientManager implements Closeable { - - - private static final AtomicInteger CONN_COUNT = new AtomicInteger(0); - private final HiveConf conf; - private final HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory; - private final URI[] metastoreUris; - private ThriftHiveMetastore.Iface client = null; - private TTransport transport = null; - private boolean isConnected = false; - // for thrift connects - private int retries = 5; - private long retryDelaySeconds = 0; - - private final int connectionTimeout; - private final String msUri; - private final boolean impersonationEnabled; - private static final Duration delegationTokenCacheTtl = Duration.ofHours(1); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime) - private static final long delegationTokenCacheMaximumSize = 1000; - private static final LoadingCache delegationTokenCache = CacheBuilder.newBuilder() - .expireAfterWrite(delegationTokenCacheTtl.toMillis(), MILLISECONDS) - .maximumSize(delegationTokenCacheMaximumSize) - .build(CacheLoader.from(ThriftMetastoreClientManager::loadDelegationToken)); +class ThriftMetastoreClientManager extends AbstractThriftMetastoreClientManager { ThriftMetastoreClientManager( HiveConf conf, HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory, int connectionTimeout) { - this.conf = conf; - this.hiveCompatibleThriftHiveMetastoreIfaceFactory = hiveCompatibleThriftHiveMetastoreIfaceFactory; - this.connectionTimeout = connectionTimeout; - msUri = conf.getVar(ConfVars.METASTOREURIS); - impersonationEnabled = conf.getBoolean(CommonBeans.IMPERSONATION_ENABLED_KEY,false); - - if (HiveConfUtil.isEmbeddedMetaStore(msUri)) { - throw new RuntimeException("You can't waggle an embedded metastore"); - } - - // get the number retries - retries = HiveConf.getIntVar(conf, ConfVars.METASTORETHRIFTCONNECTIONRETRIES); - retryDelaySeconds = conf.getTimeVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); - - // user wants file store based configuration - if (msUri != null) { - String[] metastoreUrisString = msUri.split(","); - metastoreUris = new URI[metastoreUrisString.length]; - try { - int i = 0; - for (String s : metastoreUrisString) { - URI tmpUri = new URI(s); - if (tmpUri.getScheme() == null) { - throw new IllegalArgumentException("URI: " + s + " does not have a scheme"); - } - metastoreUris[i++] = tmpUri; - } - } catch (IllegalArgumentException e) { - throw (e); - } catch (Exception e) { - String exInfo = "Got exception: " + e.getClass().getName() + " " + e.getMessage(); - log.error(exInfo, e); - throw new RuntimeException(exInfo, e); - } - } else { - log.error("NOT getting uris from conf"); - throw new RuntimeException("MetaStoreURIs not found in conf file"); - } - } - - private static String loadDelegationToken(DelegationTokenKey key) { - try { - return key.client.get_delegation_token(key.username, key.username); - } catch (TException e) { - throw new RuntimeException(e); - } - } - - private static class DelegationTokenKey{ - String msUri; - String username; - ThriftHiveMetastore.Iface client; - - public DelegationTokenKey(String msUri, String username, Iface client) { - this.msUri = msUri; - this.username = username; - this.client = client; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DelegationTokenKey that = (DelegationTokenKey) o; - return Objects.equals(msUri, that.msUri) && Objects.equals(username, - that.username); - } - - @Override - public int hashCode() { - return Objects.hash(msUri, username); - } - } - - void open() { - open(null); + super(conf,hiveCompatibleThriftHiveMetastoreIfaceFactory,connectionTimeout); } void open(HiveUgiArgs ugiArgs) { if (isConnected) { return; } - createMetastoreClientAndOpen(null, ugiArgs); - if (impersonationEnabled) { - try { - String userName = UserGroupInformation.getCurrentUser().getShortUserName(); - DelegationTokenKey key = new DelegationTokenKey(msUri, userName, client); - String delegationToken = delegationTokenCache.get(key); - close(); - createMetastoreClientAndOpen(delegationToken, ugiArgs); - } catch (IOException | ExecutionException e) { - throw new RuntimeException(e); - } - } - } - - void createMetastoreClientAndOpen(String delegationToken, HiveUgiArgs ugiArgs) { TException te = null; - boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); - boolean useSsl = conf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL); boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL); int clientSocketTimeout = (int) conf.getTimeVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); @@ -197,82 +55,41 @@ void createMetastoreClientAndOpen(String delegationToken, HiveUgiArgs ugiArgs) { for (int attempt = 0; !isConnected && (attempt < retries); ++attempt) { for (URI store : metastoreUris) { log.info("Trying to connect to metastore with URI {}", store); + transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout); + if (useFramedTransport) { + transport = new TFramedTransport(transport); + } + TProtocol protocol; + if (useCompactProtocol) { + protocol = new TCompactProtocol(transport); + } else { + protocol = new TBinaryProtocol(transport); + } + client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance(new ThriftHiveMetastore.Client(protocol)); try { - transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout); - if (useSasl) { - // Wrap thrift connection with SASL for secure connection. - try { - UserGroupInformation.setConfiguration(conf); - - // check if we should use delegation tokens to authenticate - // the call below gets hold of the tokens if they are set up by hadoop - // this should happen on the map/reduce tasks if the client added the - // tokens into hadoop's credential store in the front end during job - // submission. -// String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE); - // tokenSig could be null - if (impersonationEnabled && delegationToken != null) { - // authenticate using delegation tokens via the "DIGEST" mechanism - transport = KerberosSaslHelper - .getTokenTransport(delegationToken, - store.getHost(), transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl)); - } else { - String principalConfig = conf.getVar(ConfVars.METASTORE_KERBEROS_PRINCIPAL); - transport = UserGroupInformation.getLoginUser().doAs( - (PrivilegedExceptionAction) () -> { - try { - return KerberosSaslHelper - .getKerberosTransport(principalConfig, store.getHost(), transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl), false); - } catch (SaslException e) { - throw new RuntimeException(e); - } - }); - } - } catch (IOException ioe) { - log.error("Couldn't create client transport, URI " + store, ioe); - throw new MetaException(ioe.toString()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } else if (useFramedTransport) { - transport = new TFramedTransport(transport); + transport.open(); + log + .info("Opened a connection to metastore '" + + store + + "', total current connections to all metastores: " + + CONN_COUNT.incrementAndGet()); + + isConnected = true; + if (ugiArgs != null) { + log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store); + client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups()); + } else { + log.debug("Connection opened with out #set_ugi call', on URI {}", store); } - TProtocol protocol; - if (useCompactProtocol) { - protocol = new TCompactProtocol(transport); + } catch (TException e) { + te = e; + if (log.isDebugEnabled()) { + log.warn("Failed to connect to the MetaStore Server, URI " + store, e); } else { - protocol = new TBinaryProtocol(transport); + // Don't print full exception trace if DEBUG is not on. + log.warn("Failed to connect to the MetaStore Server, URI {}", store); } - client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance(new ThriftHiveMetastore.Client(protocol)); - try { - transport.open(); - log - .info("Opened a connection to metastore '" - + store - + "', total current connections to all metastores: " - + CONN_COUNT.incrementAndGet()); - - isConnected = true; - if (ugiArgs != null) { - log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store); - client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups()); - } else { - log.debug("Connection opened with out #set_ugi call', on URI {}", store); - } - } catch (TException e) { - te = e; - if (log.isDebugEnabled()) { - log.warn("Failed to connect to the MetaStore Server, URI " + store, e); - } else { - // Don't print full exception trace if DEBUG is not on. - log.warn("Failed to connect to the MetaStore Server, URI {}", store); - } } - } catch (MetaException e) { - log.error("Unable to connect to metastore with URI " + store + " in attempt " + attempt, e); - } if (isConnected) { break; } @@ -295,76 +112,4 @@ void createMetastoreClientAndOpen(String delegationToken, HiveUgiArgs ugiArgs) { log.debug("Connected to metastore."); } - void reconnect(HiveUgiArgs ugiArgs) { - close(); - // Swap the first element of the metastoreUris[] with a random element from the rest - // of the array. Rationale being that this method will generally be called when the default - // connection has died and the default connection is likely to be the first array element. - promoteRandomMetaStoreURI(); - open(ugiArgs); - } - - public String getHiveConfValue(String key, String defaultValue) { - return conf.get(key, defaultValue); - } - - public void setHiveConfValue(String key, String value) { - conf.set(key, value); - } - - public String generateNewTokenSignature(String defaultTokenSignature) { - String tokenSignature = conf.get(ConfVars.METASTORE_TOKEN_SIGNATURE.varname, - defaultTokenSignature); - conf.set(ConfVars.METASTORE_TOKEN_SIGNATURE.varname, - tokenSignature); - return tokenSignature; - } - - public Boolean isSaslEnabled() { - return conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); - } - - @Override - public void close() { - if (!isConnected) { - return; - } - isConnected = false; - try { - if (client != null) { - client.shutdown(); - } - } catch (TException e) { - log.debug("Unable to shutdown metastore client. Will try closing transport directly.", e); - } - // Transport would have got closed via client.shutdown(), so we don't need this, but - // just in case, we make this call. - if ((transport != null) && transport.isOpen()) { - transport.close(); - transport = null; - } - log.info("Closed a connection to metastore, current connections: {}", CONN_COUNT.decrementAndGet()); - } - - boolean isOpen() { - return (transport != null) && transport.isOpen(); - } - - protected ThriftHiveMetastore.Iface getClient() { - return client; - } - - /** - * Swaps the first element of the metastoreUris array with a random element from the remainder of the array. - */ - private void promoteRandomMetaStoreURI() { - if (metastoreUris.length <= 1) { - return; - } - Random rng = new Random(); - int index = rng.nextInt(metastoreUris.length - 1) + 1; - URI tmp = metastoreUris[0]; - metastoreUris[0] = metastoreUris[index]; - metastoreUris[index] = tmp; - } } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java index 84213adc9..127717f91 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java @@ -245,16 +245,19 @@ class FederatedHMSHandler extends FacebookBase implements CloseableIHMSHandler { private final NotifyingFederationService notifyingFederationService; private final WaggleDanceConfiguration waggleDanceConfiguration; private Configuration conf; + private SaslServerWrapper saslServerWrapper; FederatedHMSHandler( MappingEventListener databaseMappingService, NotifyingFederationService notifyingFederationService, - WaggleDanceConfiguration waggleDanceConfiguration) { + WaggleDanceConfiguration waggleDanceConfiguration, + SaslServerWrapper saslServerWrapper) { super("waggle-dance-handler"); this.databaseMappingService = databaseMappingService; this.notifyingFederationService = notifyingFederationService; this.waggleDanceConfiguration = waggleDanceConfiguration; this.notifyingFederationService.subscribe(databaseMappingService); + this.saslServerWrapper= saslServerWrapper; } private ThriftHiveMetastore.Iface getPrimaryClient() throws TException { @@ -1370,9 +1373,9 @@ public List set_ugi(String user_name, List group_names) throws M public String get_delegation_token(String token_owner, String renewer_kerberos_principal_name) throws MetaException, TException { try { - return MetaStoreProxyServer.getSaslServerAndMDT().getDelegationTokenManager() + return saslServerWrapper.getDelegationTokenManager() .getDelegationToken(token_owner, renewer_kerberos_principal_name, - MetaStoreProxyServer.getIPAddress()); + saslServerWrapper.getIPAddress()); } catch (IOException | InterruptedException e) { throw new MetaException(e.getMessage()); } @@ -1382,7 +1385,7 @@ public String get_delegation_token(String token_owner, String renewer_kerberos_p @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public long renew_delegation_token(String token_str_form) throws MetaException, TException { try { - return MetaStoreProxyServer.getSaslServerAndMDT().getDelegationTokenManager() + return saslServerWrapper.getDelegationTokenManager() .renewDelegationToken(token_str_form); } catch (IOException e) { throw new MetaException(e.getMessage()); @@ -1395,7 +1398,7 @@ public long renew_delegation_token(String token_str_form) throws MetaException, @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public void cancel_delegation_token(String token_str_form) throws MetaException, TException { try { - MetaStoreProxyServer.getSaslServerAndMDT().getDelegationTokenManager() + saslServerWrapper.getDelegationTokenManager() .cancelDelegationToken(token_str_form); } catch (IOException e) { throw new MetaException(e.getMessage()); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java index 36df03275..d46015eb7 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java @@ -38,6 +38,7 @@ public class FederatedHMSHandlerFactory { private final MetaStoreMappingFactory metaStoreMappingFactory; private final WaggleDanceConfiguration waggleDanceConfiguration; private final QueryMapping queryMapping; + private SaslServerWrapper saslServerWrapper; @Autowired public FederatedHMSHandlerFactory( @@ -45,20 +46,23 @@ public FederatedHMSHandlerFactory( NotifyingFederationService notifyingFederationService, MetaStoreMappingFactory metaStoreMappingFactory, WaggleDanceConfiguration waggleDanceConfiguration, - QueryMapping queryMapping) { + QueryMapping queryMapping, + SaslServerWrapper saslServerWrapper) { this.hiveConf = hiveConf; this.notifyingFederationService = notifyingFederationService; this.metaStoreMappingFactory = metaStoreMappingFactory; this.waggleDanceConfiguration = waggleDanceConfiguration; this.queryMapping = queryMapping; + this.saslServerWrapper = saslServerWrapper; } public CloseableIHMSHandler create() { MappingEventListener service = createDatabaseMappingService(); MonitoredDatabaseMappingService monitoredService = new MonitoredDatabaseMappingService(service); - CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService, - waggleDanceConfiguration); + CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, + notifyingFederationService, + waggleDanceConfiguration, saslServerWrapper); HiveConf conf = new HiveConf(hiveConf); baseHandler.setConf(conf); return baseHandler; diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java index 49408f629..76b3d29c0 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java @@ -25,8 +25,6 @@ package com.hotels.bdp.waggledance.server; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -42,11 +40,9 @@ import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TProcessorFactory; @@ -66,11 +62,8 @@ import lombok.extern.log4j.Log4j2; -import com.google.common.annotations.VisibleForTesting; - import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration; import com.hotels.bdp.waggledance.util.SaslHelper; -import com.hotels.bdp.waggledance.util.SaslHelper.SaslServerAndMDT; @Component @Order(Ordered.HIGHEST_PRECEDENCE) @@ -90,20 +83,20 @@ public class MetaStoreProxyServer implements ApplicationRunner { private final Lock startLock; private final Condition startCondition; private TServer tServer; - private static HadoopThriftAuthBridge.Server saslServer; - private static SaslServerAndMDT saslServerAndMDT; - private static boolean useSasl; + private SaslServerWrapper saslServerWrapper; @Autowired public MetaStoreProxyServer( HiveConf hiveConf, WaggleDanceConfiguration waggleDanceConfiguration, - TProcessorFactory tProcessorFactory) { + TProcessorFactory tProcessorFactory, + SaslServerWrapper saslServerWrapper) { this.hiveConf = hiveConf; this.waggleDanceConfiguration = waggleDanceConfiguration; this.tProcessorFactory = tProcessorFactory; startLock = new ReentrantLock(); startCondition = startLock.newCondition(); + this.saslServerWrapper = saslServerWrapper; } private boolean isRunning() { @@ -168,7 +161,7 @@ private void startWaggleDance( boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE); boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL); - useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + boolean useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); //load 'hadoop.proxyuser' configs ProxyUsers.refreshSuperUserGroupsConfiguration(hiveConf); @@ -179,14 +172,9 @@ private void startWaggleDance( serverSocket = new TServerSocketKeepAlive(serverSocket); } - if(useSasl) { - UserGroupInformation.setConfiguration(hiveConf); - saslServerAndMDT = SaslHelper.createSaslServer(hiveConf); - saslServer = saslServerAndMDT.getSaslServer(); - } - - TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSasl, saslServer); - TProcessorFactory tProcessorFactory = getTProcessorFactory(useSasl, saslServer); + TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSasl, + saslServerWrapper.getSaslServer()); + TProcessorFactory tProcessorFactory = getTProcessorFactory(useSasl, saslServerWrapper.getSaslServer()); log.info("Starting WaggleDance Server"); TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket) @@ -317,32 +305,4 @@ public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTime } } } - - static String getIPAddress() { - if (useSasl) { - if (saslServer != null && saslServer.getRemoteAddress() != null) { - return saslServer.getRemoteAddress().getHostAddress(); - } - } else { - // if kerberos is not enabled - try { - Method method = HMSHandler.class.getDeclaredMethod("getThreadLocalIpAddress", null); - method.setAccessible(true); - return (String) method.invoke(null, null); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - return null; - } - - @VisibleForTesting - public static void setSaslServerAndMDT( - SaslServerAndMDT saslServerAndMDT) { - MetaStoreProxyServer.saslServerAndMDT = saslServerAndMDT; - } - - public static SaslServerAndMDT getSaslServerAndMDT() { - return saslServerAndMDT; - } } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java new file mode 100644 index 000000000..60332a755 --- /dev/null +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.server; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.security.DBTokenStore; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server; +import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.transport.TTransportException; +import org.springframework.stereotype.Component; + +import lombok.Getter; +import lombok.extern.log4j.Log4j2; + +import com.hotels.bdp.waggledance.util.SaslHelper; + +@Component +@Log4j2 +public class SaslServerWrapper { + + private MetastoreDelegationTokenManager delegationTokenManager; + @Getter + private static boolean useSasl; + + private HadoopThriftAuthBridge.Server saslServer = null; + + protected SaslServerWrapper(HiveConf conf) + throws TTransportException { + useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + if (!useSasl) { + return; + } + + UserGroupInformation.setConfiguration(conf); + + if (SaslHelper.isSASLWithKerberizedHadoop(conf)) { + saslServer = + HadoopThriftAuthBridge.getBridge().createServer( + conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), + conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), + conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL)); + + // Start delegation token manager + delegationTokenManager = new MetastoreDelegationTokenManager(); + try { + Object baseHandler = null; + String tokenStoreClass = conf.getVar( + HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); + + if (tokenStoreClass.equals(DBTokenStore.class.getName())) { + // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It + // will be got via Hive.get(conf).getMSC in a thread where the DelegationTokenStore + // is called. To avoid the cyclic reference, we pass the Hive class to DBTokenStore where + // it is used to get a threadLocal Hive object with a synchronized MetaStoreClient using + // Java reflection. + // Note: there will be two HS2 life-long opened MSCs, one is stored in HS2 thread local + // Hive object, the other is in a daemon thread spawned in DelegationTokenSecretManager + // to remove expired tokens. + baseHandler = Hive.class; + } + + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, + HadoopThriftAuthBridge.Server.ServerMode.METASTORE); + saslServer.setSecretManager(delegationTokenManager.getSecretManager()); + } catch (IOException e) { + throw new TTransportException("Failed to start token manager", e); + } + + } + } + + public MetastoreDelegationTokenManager getDelegationTokenManager() { + return delegationTokenManager; + } + + public Server getSaslServer() { + return saslServer; + } + + + String getIPAddress() { + if (saslServer != null && saslServer.getRemoteAddress() != null) { + return saslServer.getRemoteAddress().getHostAddress(); + } + return null; + } +} diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java index 163db0a64..1063553c8 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java @@ -17,7 +17,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -25,11 +24,7 @@ import javax.security.sasl.Sasl; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.security.DBTokenStore; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server; -import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hive.service.auth.HiveAuthConstants; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.auth.SaslQOP; @@ -43,61 +38,6 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class SaslHelper { - public static class SaslServerAndMDT { - - HadoopThriftAuthBridge.Server saslServer; - MetastoreDelegationTokenManager delegationTokenManager; - - public Server getSaslServer() { - return saslServer; - } - - public MetastoreDelegationTokenManager getDelegationTokenManager() { - return delegationTokenManager; - } - } - - public static SaslServerAndMDT createSaslServer(HiveConf conf) throws TTransportException { - SaslServerAndMDT saslServerAndMDT = new SaslServerAndMDT(); - HadoopThriftAuthBridge.Server saslServer = null; - if (SaslHelper.isSASLWithKerberizedHadoop(conf)) { - saslServer = - HadoopThriftAuthBridge.getBridge().createServer( - conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), - conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), - conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL)); - - // Start delegation token manager - MetastoreDelegationTokenManager delegationTokenManager = new MetastoreDelegationTokenManager(); - try { - Object baseHandler = null; - String tokenStoreClass = conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); - - if (tokenStoreClass.equals(DBTokenStore.class.getName())) { - // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It - // will be got via Hive.get(conf).getMSC in a thread where the DelegationTokenStore - // is called. To avoid the cyclic reference, we pass the Hive class to DBTokenStore where - // it is used to get a threadLocal Hive object with a synchronized MetaStoreClient using - // Java reflection. - // Note: there will be two HS2 life-long opened MSCs, one is stored in HS2 thread local - // Hive object, the other is in a daemon thread spawned in DelegationTokenSecretManager - // to remove expired tokens. - baseHandler = Hive.class; - } - - delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, HadoopThriftAuthBridge.Server.ServerMode.METASTORE); - saslServer.setSecretManager(delegationTokenManager.getSecretManager()); - } - catch (IOException e) { - throw new TTransportException("Failed to start token manager", e); - } - - saslServerAndMDT.saslServer = saslServer; - saslServerAndMDT.delegationTokenManager = delegationTokenManager; - } - return saslServerAndMDT; - } - public static boolean isSASLWithKerberizedHadoop(HiveConf hiveconf) { return "kerberos".equalsIgnoreCase(hiveconf.get(HADOOP_SECURITY_AUTHENTICATION, "simple")) && !hiveconf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.getAuthName()); diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java index 23c5633f7..638779e9d 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java @@ -44,13 +44,14 @@ public class FederatedHMSHandlerFactoryTest { private @Mock NotifyingFederationService notifyingFederationService; private @Mock MetaStoreMappingFactory metaStoreMappingFactory; private @Mock QueryMapping queryMapping; + private @Mock SaslServerWrapper saslServerWrapper; private FederatedHMSHandlerFactory factory; @Before public void init() { when(notifyingFederationService.getAll()).thenReturn(new ArrayList<>()); factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory, - waggleDanceConfiguration, queryMapping); + waggleDanceConfiguration, queryMapping, saslServerWrapper); } @Test @@ -64,7 +65,7 @@ public void typical() throws Exception { public void prefixedDatabase() throws Exception { when(waggleDanceConfiguration.getDatabaseResolution()).thenReturn(DatabaseResolution.PREFIXED); factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory, - waggleDanceConfiguration, queryMapping); + waggleDanceConfiguration, queryMapping, saslServerWrapper); CloseableIHMSHandler handler = factory.create(); assertThat(handler, is(instanceOf(FederatedHMSHandler.class))); } @@ -72,7 +73,7 @@ public void prefixedDatabase() throws Exception { @Test(expected = WaggleDanceException.class) public void noMode() { factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory, - waggleDanceConfiguration, queryMapping); + waggleDanceConfiguration, queryMapping, saslServerWrapper); factory.create(); } diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java index 6a5ca70d4..f726039df 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java @@ -230,7 +230,6 @@ import com.hotels.bdp.waggledance.mapping.service.MappingEventListener; import com.hotels.bdp.waggledance.mapping.service.PanopticOperationHandler; import com.hotels.bdp.waggledance.mapping.service.impl.NotifyingFederationService; -import com.hotels.bdp.waggledance.util.SaslHelper.SaslServerAndMDT; @RunWith(MockitoJUnitRunner.class) public class FederatedHMSHandlerTest { @@ -248,15 +247,15 @@ public class FederatedHMSHandlerTest { private @Mock DatabaseMapping primaryMapping; private @Mock Iface primaryClient; private @Mock WaggleDanceConfiguration waggleDanceConfiguration; - private @Mock SaslServerAndMDT saslServerAndMDT; + private @Mock SaslServerWrapper saslServerWrapper; private @Mock MetastoreDelegationTokenManager metastoreDelegationTokenManager; - private @Mock MetaStoreProxyServer metaStoreProxyServer; private FederatedHMSHandler handler; @Before public void setUp() throws NoSuchObjectException { - handler = new FederatedHMSHandler(databaseMappingService, notifyingFederationService, waggleDanceConfiguration); + handler = new FederatedHMSHandler(databaseMappingService, notifyingFederationService, + waggleDanceConfiguration, saslServerWrapper); when(databaseMappingService.primaryDatabaseMapping()).thenReturn(primaryMapping); when(databaseMappingService.getAvailableDatabaseMappings()).thenReturn(Collections.singletonList(primaryMapping)); when(primaryMapping.getClient()).thenReturn(primaryClient); @@ -1527,8 +1526,7 @@ public void grant_revoke_privileges() throws TException { @Test public void get_delegation_token() throws TException, IOException, InterruptedException { String expected = "expected"; - MetaStoreProxyServer.setSaslServerAndMDT(saslServerAndMDT); - when(saslServerAndMDT.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); + when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); when(metastoreDelegationTokenManager.getDelegationToken("owner", "kerberos_principal", null)).thenReturn(expected); String result = handler.get_delegation_token("owner", "kerberos_principal"); @@ -1538,8 +1536,7 @@ public void get_delegation_token() throws TException, IOException, InterruptedEx @Test public void renew_delegation_token() throws TException, IOException { long expected = 10L; - MetaStoreProxyServer.setSaslServerAndMDT(saslServerAndMDT); - when(saslServerAndMDT.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); + when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); when(metastoreDelegationTokenManager.renewDelegationToken("token")).thenReturn(expected); long result = handler.renew_delegation_token("token"); assertThat(result, is(expected)); @@ -1547,8 +1544,7 @@ public void renew_delegation_token() throws TException, IOException { @Test public void cancel_delegation_token() throws TException, IOException { - MetaStoreProxyServer.setSaslServerAndMDT(saslServerAndMDT); - when(saslServerAndMDT.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); + when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager); handler.cancel_delegation_token("token"); verify(metastoreDelegationTokenManager).cancelDelegationToken("token"); }