From 25242567ae760bc4a669c91a4391461b83e6d6ab Mon Sep 17 00:00:00 2001 From: Patrick Duin Date: Fri, 19 Apr 2024 16:19:08 +0200 Subject: [PATCH] Added extensions and Rate limiting options (#314) * Added extensions and Rate limiting options --- .github/workflows/main.yml | 2 +- .github/workflows/release.yml | 2 +- CHANGELOG.md | 10 +- README.md | 4 +- pom.xml | 8 +- waggle-dance-api/pom.xml | 2 +- waggle-dance-boot/pom.xml | 7 +- waggle-dance-core/pom.xml | 18 +- .../hotels/bdp/waggledance/WaggleDance.java | 2 +- .../CloseableThriftHiveMetastoreIface.java | 2 +- ...ThriftHiveMetastoreIfaceClientFactory.java | 2 +- .../client/DefaultMetaStoreClientFactory.java | 9 +- .../client/ThriftClientFactory.java | 27 +++ .../client/ThriftMetastoreClientManager.java | 4 +- .../tunnelling/TunnelableFactorySupplier.java | 2 +- .../TunnelingMetaStoreClientFactory.java | 2 +- .../conf/WaggleDanceConfiguration.java | 3 +- .../bdp/waggledance/context/CommonBeans.java | 12 +- .../model/MetaStoreMappingFactoryImpl.java | 12 +- .../PrefixBasedDatabaseMappingService.java | 4 +- .../server/ExceptionWrappingHMSHandler.java | 16 +- .../server/FederatedHMSHandler.java | 4 +- .../server/WaggleDanceServerException.java | 2 +- .../DefaultMetaStoreClientFactoryTest.java | 2 +- ...leThriftHiveMetastoreIfaceFactoryTest.java | 2 +- .../TunnelableFactorySupplierTest.java | 2 +- .../TunnelingMetaStoreClientFactoryTest.java | 2 +- .../model/MetaStoreMappingImplTest.java | 2 +- .../ExceptionWrappingHMSHandlerTest.java | 2 +- waggle-dance-extensions/README.md | 73 ++++++++ waggle-dance-extensions/pom.xml | 63 +++++++ .../extensions/ExtensionBeans.java | 107 +++++++++++ .../ratelimit/BucketBandwidthProvider.java | 24 +++ .../client/ratelimit/BucketKeyGenerator.java | 33 ++++ .../client/ratelimit/BucketService.java | 24 +++ .../ratelimit/GreedyBandwidthProvider.java | 42 +++++ .../IntervallyBandwidthProvider.java | 42 +++++ .../client/ratelimit/RateLimitMetrics.java | 35 ++++ .../ratelimit/RateLimitingClientFactory.java | 54 ++++++ .../RateLimitingInvocationHandler.java | 131 ++++++++++++++ .../client/ratelimit/RefillType.java | 33 ++++ .../memory/InMemoryBucketService.java | 52 ++++++ .../ratelimit/redis/RedisBucketService.java | 47 +++++ .../ratelimit/BucketKeyGeneratorTest.java | 62 +++++++ .../RateLimitingInvocationHandlerTest.java | 166 +++++++++++++++++ .../src/test/resources/log4j2.xml | 30 ++++ waggle-dance-integration-tests/pom.xml | 13 +- .../com/hotels/bdp/waggledance/TestUtils.java | 15 +- .../bdp/waggledance/WaggleDanceRunner.java | 77 +++++--- .../waggledance/junit/ServerSocketRule.java | 15 +- .../WaggleDanceIntegrationTest.java | 95 ++++------ .../WaggleDanceRateLimitIntegrationTest.java | 170 ++++++++++++++++++ .../junit/ServerSocketRuleTest.java | 8 +- .../src/test/resources/log4j.xml | 4 +- .../src/test/resources/log4j2.xml | 4 +- waggle-dance-rest/pom.xml | 2 +- waggle-dance-rpm/pom.xml | 2 +- waggle-dance/pom.xml | 2 +- waggle-dance/src/main/resources/log4j2.xml | 4 +- 59 files changed, 1455 insertions(+), 142 deletions(-) create mode 100644 waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftClientFactory.java create mode 100644 waggle-dance-extensions/README.md create mode 100644 waggle-dance-extensions/pom.xml create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketBandwidthProvider.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGenerator.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketService.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/GreedyBandwidthProvider.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/IntervallyBandwidthProvider.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitMetrics.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RefillType.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/memory/InMemoryBucketService.java create mode 100644 waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/redis/RedisBucketService.java create mode 100644 waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGeneratorTest.java create mode 100644 waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java create mode 100644 waggle-dance-extensions/src/test/resources/log4j2.xml create mode 100644 waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/extensions/WaggleDanceRateLimitIntegrationTest.java diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b56c8882a..2cd05f503 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,7 +9,7 @@ on: jobs: test: name: Package and run all tests - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v2 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d5f72d679..883225ce5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,7 +9,7 @@ on: jobs: release: name: Release to Maven Central - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout source code diff --git a/CHANGELOG.md b/CHANGELOG.md index a49388ea2..a59ad4dbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ -## [3.12.0] - 2023-02-08 +## [3.13.0] - 2024-04-19 +### Added +- Added `waggle-dance-extensions` module. See [extensions README](waggle-dance-extensions/README.md.) +- Added support to enable Rate Limiting in Waggle Dance. +### Changed +- Changed and added some log messages for better tracking of calls. +- Changed Integration Test WaggleDanceRunner to allow for reuse. + +## [3.12.0] - 2024-02-08 ### Added - Added optional `primary-meta-store.read-only-remote-meta-store-uris` config to allow traffic to be diverted based on calls made. See README.md. diff --git a/README.md b/README.md index 39059e863..03a636433 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,8 @@ The table below describes all the available configuration values for Waggle Danc | `configuration-properties` | No | Map of Hive properties that will be added to the HiveConf used when creating the Thrift clients (they will be shared among all the clients). | | `queryFunctionsAcrossAllMetastores` | No | Controls if the Thrift `getAllFunctions` should be fired to all configured metastores or only the primary metastore. The advice is to set this to false. Executing `getAllFunctions` can have an unwanted performance impact when a metastore is slow to respond. The function call is typically only called when a client is initialized and is largely irrelevant. Default is `true` (to be backward compatible) | +Extensions (for instance Rate Limiting) are described here: [waggle-dance-extensions/README.md](waggle-dance-extensions/README.md) + ### Federation Federation config is by default located in: `$WAGGLE_DANCE_HOME/conf/waggle-dance-federation.yml`. @@ -611,4 +613,4 @@ The Waggle Dance logo uses the [Beetype Filled font](http://www.1001fonts.com/be ## Legal This project is available under the [Apache 2.0 License](http://www.apache.org/licenses/LICENSE-2.0.html). -Copyright 2016-2019 Expedia, Inc. +Copyright 2016-2024 Expedia, Inc. diff --git a/pom.xml b/pom.xml index 5bf15c14b..53cfaa4be 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.hotels waggle-dance-parent - 3.12.1-SNAPSHOT + 3.13.0-SNAPSHOT pom Waggle Dance Parent Hive Metastore federation service. @@ -24,6 +24,7 @@ + waggle-dance-extensions waggle-dance-core waggle-dance-api waggle-dance-rest @@ -275,6 +276,11 @@ jackson-dataformat-cbor ${jackson.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson.version} + com.fasterxml.jackson.core jackson-core diff --git a/waggle-dance-api/pom.xml b/waggle-dance-api/pom.xml index 6357e9cb2..c9bb02448 100644 --- a/waggle-dance-api/pom.xml +++ b/waggle-dance-api/pom.xml @@ -4,7 +4,7 @@ com.hotels waggle-dance-parent - 3.12.1-SNAPSHOT + 3.13.0-SNAPSHOT waggle-dance-api diff --git a/waggle-dance-boot/pom.xml b/waggle-dance-boot/pom.xml index bf1d2b5f2..4aa32aee7 100644 --- a/waggle-dance-boot/pom.xml +++ b/waggle-dance-boot/pom.xml @@ -4,7 +4,7 @@ com.hotels waggle-dance-parent - 3.12.1-SNAPSHOT + 3.13.0-SNAPSHOT waggle-dance-boot @@ -25,6 +25,11 @@ waggle-dance-rest ${project.version} + + com.hotels + waggle-dance-extensions + ${project.version} + diff --git a/waggle-dance-core/pom.xml b/waggle-dance-core/pom.xml index 3e51ad06e..bc5a358bd 100644 --- a/waggle-dance-core/pom.xml +++ b/waggle-dance-core/pom.xml @@ -4,7 +4,7 @@ com.hotels waggle-dance-parent - 3.12.1-SNAPSHOT + 3.13.0-SNAPSHOT waggle-dance-core @@ -158,6 +158,8 @@ org.apache.hive hive-exec + ${hive.version} + core log4j @@ -167,6 +169,10 @@ org.pentaho pentaho-aggdesigner-algorithm + + org.apache.calcite.avatica + avatica + @@ -194,6 +200,10 @@ jdk.tools jdk.tools + + org.apache.hive + hive-exec + @@ -346,6 +356,12 @@ com.hotels beeju test + + + org.apache.hive + hive-exec + + fm.last.commons diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/WaggleDance.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/WaggleDance.java index cb149f2e6..19f4d5a7b 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/WaggleDance.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/WaggleDance.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIface.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIface.java index 8a09e32f3..f8da0cb07 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIface.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIface.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java index 5a3b2c3b2..73128f0bc 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java @@ -36,7 +36,7 @@ import com.hotels.hcommon.hive.metastore.conf.HiveConfFactory; import com.hotels.hcommon.hive.metastore.util.MetaStoreUriNormaliser; -public class CloseableThriftHiveMetastoreIfaceClientFactory { +public class CloseableThriftHiveMetastoreIfaceClientFactory implements ThriftClientFactory { private static final int DEFAULT_CLIENT_FACTORY_RECONNECTION_RETRY = 3; private final TunnelingMetaStoreClientFactory tunnelingMetaStoreClientFactory; diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java index 78dec5c63..5c2f6a5a2 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,11 @@ package com.hotels.bdp.waggledance.client; import java.io.IOException; -import java.lang.reflect.*; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; @@ -158,7 +162,6 @@ private SaslMetastoreClientHander( this.clientManager = clientManager; } - @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftClientFactory.java new file mode 100644 index 000000000..7b91dc992 --- /dev/null +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftClientFactory.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.client; + +import com.hotels.bdp.waggledance.api.model.AbstractMetaStore; + +public interface ThriftClientFactory { + + /** + * @param metaStore (configuration object) + * @return client that will be used to query the metaStore. + */ + public CloseableThriftHiveMetastoreIface newInstance(AbstractMetaStore metaStore); +} diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java index c7a0c67af..aac972592 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -121,7 +121,7 @@ void open(HiveUgiArgs ugiArgs) { for (int attempt = 0; !isConnected && (attempt < retries); ++attempt) { for (URI store : metastoreUris) { - LOG.info("Trying to connect to metastore with URI " + store); + LOG.debug("Trying to connect to metastore with URI " + store); try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout); if (useSasl) { diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplier.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplier.java index e068014dd..d5937e7f9 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplier.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplier.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactory.java index d2db8ad73..e9ea17b23 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2021 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/conf/WaggleDanceConfiguration.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/conf/WaggleDanceConfiguration.java index 8a22f98ef..c98a0b973 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/conf/WaggleDanceConfiguration.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/conf/WaggleDanceConfiguration.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,5 +145,4 @@ public boolean isQueryFunctionsAcrossAllMetastores() { public void setQueryFunctionsAcrossAllMetastores(boolean queryFunctionsAcrossAllMetastores) { this.queryFunctionsAcrossAllMetastores = queryFunctionsAcrossAllMetastores; } - } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java index f92efd167..72eb96ff1 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java @@ -19,12 +19,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIfaceClientFactory; import com.hotels.bdp.waggledance.client.DefaultMetaStoreClientFactory; import com.hotels.bdp.waggledance.client.GlueClientFactory; import com.hotels.bdp.waggledance.client.SplitTrafficMetastoreClientFactory; +import com.hotels.bdp.waggledance.client.ThriftClientFactory; import com.hotels.bdp.waggledance.client.tunnelling.TunnelingMetaStoreClientFactory; import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration; import com.hotels.bdp.waggledance.mapping.model.ASTQueryMapping; @@ -59,13 +61,21 @@ public SplitTrafficMetastoreClientFactory splitTrafficMetaStoreClientFactory() { return new SplitTrafficMetastoreClientFactory(); } + @Bean - public CloseableThriftHiveMetastoreIfaceClientFactory metaStoreClientFactory( + public ThriftClientFactory defaultWaggleDanceClientFactory( WaggleDanceConfiguration waggleDanceConfiguration, SplitTrafficMetastoreClientFactory splitTrafficMetaStoreClientFactory) { return new CloseableThriftHiveMetastoreIfaceClientFactory(new TunnelingMetaStoreClientFactory(), new DefaultMetaStoreClientFactory(), new GlueClientFactory(), waggleDanceConfiguration, splitTrafficMetaStoreClientFactory); } + //Only load when no other beans with this name can be found. + @ConditionalOnMissingBean + @Bean + public ThriftClientFactory thriftClientFactory(ThriftClientFactory defaultWaggleDanceClientFactory) { + return defaultWaggleDanceClientFactory; + } + @Bean public QueryMapping queryMapping() { return ASTQueryMapping.INSTANCE; diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingFactoryImpl.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingFactoryImpl.java index 4492c95ac..a45ca5537 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingFactoryImpl.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingFactoryImpl.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2022 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ import com.hotels.bdp.waggledance.api.model.AbstractMetaStore; import com.hotels.bdp.waggledance.api.model.DatabaseResolution; import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; -import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIfaceClientFactory; +import com.hotels.bdp.waggledance.client.ThriftClientFactory; import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration; import com.hotels.bdp.waggledance.mapping.service.MetaStoreMappingFactory; import com.hotels.bdp.waggledance.mapping.service.PrefixNamingStrategy; @@ -46,24 +46,24 @@ public class MetaStoreMappingFactoryImpl implements MetaStoreMappingFactory { private final WaggleDanceConfiguration waggleDanceConfiguration; private final PrefixNamingStrategy prefixNamingStrategy; - private final CloseableThriftHiveMetastoreIfaceClientFactory metaStoreClientFactory; + private final ThriftClientFactory thriftClientFactory; private final AccessControlHandlerFactory accessControlHandlerFactory; @Autowired public MetaStoreMappingFactoryImpl( WaggleDanceConfiguration waggleDanceConfiguration, PrefixNamingStrategy prefixNamingStrategy, - CloseableThriftHiveMetastoreIfaceClientFactory metaStoreClientFactory, + ThriftClientFactory thriftClientFactory, AccessControlHandlerFactory accessControlHandlerFactory) { this.waggleDanceConfiguration = waggleDanceConfiguration; this.prefixNamingStrategy = prefixNamingStrategy; - this.metaStoreClientFactory = metaStoreClientFactory; + this.thriftClientFactory = thriftClientFactory; this.accessControlHandlerFactory = accessControlHandlerFactory; } private CloseableThriftHiveMetastoreIface createClient(AbstractMetaStore metaStore) { try { - return metaStoreClientFactory.newInstance(metaStore); + return thriftClientFactory.newInstance(metaStore); } catch (Exception e) { LOG.error("Can't create a client for metastore '{}':", metaStore.getName(), e); return newUnreachableMetastoreClient(metaStore); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/PrefixBasedDatabaseMappingService.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/PrefixBasedDatabaseMappingService.java index 5c5711f74..4f7c4f297 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/PrefixBasedDatabaseMappingService.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/PrefixBasedDatabaseMappingService.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,7 +99,7 @@ private void add(AbstractMetaStore metaStore) { primaryDatabaseMapping = databaseMapping; if (!metaStoreMapping.isAvailable()) { throw new WaggleDanceException( - String.format("Primary metastore is unavailable {}", metaStore.getRemoteMetaStoreUris()) + String.format("Primary metastore is unavailable %s", metaStore.getRemoteMetaStoreUris()) ); } } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandler.java index fd79f036a..f0f0496c7 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandler.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.reflect.UndeclaredThrowableException; import java.util.Arrays; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.MetaException; import org.slf4j.Logger; @@ -34,10 +36,12 @@ public class ExceptionWrappingHMSHandler implements InvocationHandler { private final static Logger LOG = LoggerFactory.getLogger(ExceptionWrappingHMSHandler.class); private final IHMSHandler baseHandler; + private String user = ""; public static IHMSHandler newProxyInstance(IHMSHandler baseHandler) { - return (IHMSHandler) Proxy.newProxyInstance(ExceptionWrappingHMSHandler.class.getClassLoader(), - new Class[] { IHMSHandler.class }, new ExceptionWrappingHMSHandler(baseHandler)); + return (IHMSHandler) Proxy + .newProxyInstance(ExceptionWrappingHMSHandler.class.getClassLoader(), new Class[] { IHMSHandler.class }, + new ExceptionWrappingHMSHandler(baseHandler)); } public ExceptionWrappingHMSHandler(IHMSHandler baseHandler) { @@ -46,7 +50,13 @@ public ExceptionWrappingHMSHandler(IHMSHandler baseHandler) { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getName().equals("set_ugi")) { + user = (String) args[0]; + } try { + LOG + .info("WD Audit:[User:{}, method:{}, source_ip:{}, args:{}]", user, method.getName(), + HMSHandler.getThreadLocalIpAddress(), StringUtils.left(Arrays.toString(args), 256)); return method.invoke(baseHandler, args); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java index 46acabacd..de58a96cc 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java @@ -135,9 +135,9 @@ public void create_database(Database database) @Override @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public Database get_database(String name) throws NoSuchObjectException, MetaException, TException { - LOG.info("Fetching database {}", name); + LOG.debug("Fetching database {}", name); DatabaseMapping mapping = databaseMappingService.databaseMapping(name); - LOG.info("Mapping is '{}'", mapping.getDatabasePrefix()); + LOG.debug("Mapping is '{}'", mapping.getDatabasePrefix()); Database result = mapping.getClient().get_database(mapping.transformInboundDatabaseName(name)); return mapping.transformOutboundDatabase(mapping.getMetastoreFilter().filterDatabase(result)); } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/WaggleDanceServerException.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/WaggleDanceServerException.java index a2d8ed717..d4d73d027 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/WaggleDanceServerException.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/WaggleDanceServerException.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2021 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactoryTest.java index 88398ad92..efbd888ad 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactoryTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/compatibility/HiveCompatibleThriftHiveMetastoreIfaceFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/compatibility/HiveCompatibleThriftHiveMetastoreIfaceFactoryTest.java index 9dcf97700..3cc5fff60 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/compatibility/HiveCompatibleThriftHiveMetastoreIfaceFactoryTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/compatibility/HiveCompatibleThriftHiveMetastoreIfaceFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplierTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplierTest.java index 40eee5231..3d0d2da3a 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplierTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelableFactorySupplierTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactoryTest.java index dee61f1d4..5e9b9fbce 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactoryTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/client/tunnelling/TunnelingMetaStoreClientFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2021 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingImplTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingImplTest.java index 57b8692e6..16ad633a6 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingImplTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/model/MetaStoreMappingImplTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandlerTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandlerTest.java index 019f62636..1c56b59b5 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandlerTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/ExceptionWrappingHMSHandlerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-extensions/README.md b/waggle-dance-extensions/README.md new file mode 100644 index 000000000..cb5bd2238 --- /dev/null +++ b/waggle-dance-extensions/README.md @@ -0,0 +1,73 @@ +## Waggle Dance Extensions + +### Overview +This project consists of extensions to Waggle Dance. By design these extension should add functionality that users may optionally configure to use. +The main Waggle Dance project works without these extensions. + +### Rate Limiting + +Extension that allows for Rate Limiting calls to Waggle Dance. + +To enable and configure see the following table, you can add these properties to the waggle-dance-server.yml: + + | Property | Required | Description | + | --- | --- | --- | + | waggledance.extensions.ratelimit.enabled | no | Whether the rate limiting extension is enabled. Default is `false` | + | waggledance.extensions.ratelimit.keyPrefix | no | Optional prefix for the bucket keys. Default is (empty string) `` | + | waggledance.extensions.ratelimit.storage | yes (if `enabled: true`) | The storage backend for the rate limiter, possible values `MEMORY` or `REDIS` | + | waggledance.extensions.ratelimit.capacity | no | The capacity of the bucket. Default `2000` | + | waggledance.extensions.ratelimit.refillType | no | The refill type, possible values `GREEDY` or `INTERVALLY`. See [Bucket4j](https://bucket4j.com/8.9.0/toc.html#refill-types) for explanation. Default is `GREEDY` | + | waggledance.extensions.ratelimit.tokensPerMinute | no | The number of tokens to add to the bucket per minute. Default `1000` | + | waggledance.extensions.ratelimit.reddison.embedded.config | yes (if `storage: REDIS`) | The configuration for Redisson client, can be added in a similar way as described [here](https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter#2-add-settings-into-applicationsettings-file) | + +#### InMemory Rate limiting. + +Example config: + +``` +waggledance.extensions.ratelimit.enabled: true +waggledance.extensions.ratelimit.storage: memory +waggledance.extensions.ratelimit.capacity: 2000 +waggledance.extensions.ratelimit.tokensPerMinute: 1000 +``` + +#### Redis stored buckets Rate limiting. + +Using a Redis backend server is supported by this module, it's up to the user to configure and maintain that infrastructure. +The next example assumes a Redis Replicated Server is running using SSL and `auth_token` authentication. +Timeouts and retry are set lower than default to *not* impact the Waggle Dance service if the Rate Limiting storage is unavailable. +The maximum latency this solution will add to a request in the following scenarios is: +* Redis server down: + * Latency will be in low ms as `retryAttemps: 0`, the connection will immediately fail. +* Redis server slow: + * Latency will be max `timeout: 1000` ms + +Waggle Dance is configured to allow all requests in case of Rate Limiting Server failures. + +Example config using a Redis Replicated Server: + +``` +waggledance.extensions.ratelimit.enabled: true +waggledance.extensions.ratelimit.storage: memory +waggledance.extensions.ratelimit.capacity: 2000 +waggledance.extensions.ratelimit.tokensPerMinute: 1000 +waggledance.extensions.ratelimit.reddison.embedded.config: | + replicatedServersConfig: + idleConnectionTimeout: 10000 + connectTimeout: 3000 + timeout: 1000 + retryAttempts: 0 + retryInterval: 1500 + password: "" + nodeAddresses: + - "rediss://localhost1:62493" + - "rediss://localhost2:62493" +``` + +For more configuration options and details please consult: [https://redisson.org/](https://redisson.org/) and [https://bucket4j.com/](https://bucket4j.com/) + + +## Legal +This project is available under the [Apache 2.0 License](http://www.apache.org/licenses/LICENSE-2.0.html). + +Copyright 2016-2024 Expedia Inc. diff --git a/waggle-dance-extensions/pom.xml b/waggle-dance-extensions/pom.xml new file mode 100644 index 000000000..74413d885 --- /dev/null +++ b/waggle-dance-extensions/pom.xml @@ -0,0 +1,63 @@ + + 4.0.0 + + + com.hotels + waggle-dance-parent + 3.13.0-SNAPSHOT + + + waggle-dance-extensions + + + + com.hotels + waggle-dance-api + ${project.version} + + + com.hotels + waggle-dance-core + ${project.version} + + + org.slf4j + slf4j-reload4j + + + + + com.bucket4j + bucket4j_jdk8-core + 8.9.0 + + + com.bucket4j + bucket4j_jdk8-redis + 8.9.0 + + + org.redisson + redisson + 3.21.0 + + + + + org.hamcrest + hamcrest + test + + + junit + junit + test + + + org.mockito + mockito-core + test + + + + diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java new file mode 100644 index 000000000..1a7f583ab --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions; + +import java.io.IOException; +import java.time.Duration; + +import org.redisson.Redisson; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.config.Config; +import org.redisson.connection.ConnectionManager; +import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.reactive.CommandReactiveService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import io.github.bucket4j.distributed.ExpirationAfterWriteStrategy; +import io.github.bucket4j.distributed.serialization.Mapper; +import io.github.bucket4j.redis.redisson.cas.RedissonBasedProxyManager; +import io.micrometer.core.instrument.MeterRegistry; + +import com.hotels.bdp.waggledance.client.ThriftClientFactory; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketBandwidthProvider; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketKeyGenerator; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketService; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.RateLimitingClientFactory; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.RefillType; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.memory.InMemoryBucketService; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.redis.RedisBucketService; + +@Configuration +@ConditionalOnProperty(name = "waggledance.extensions.ratelimit.enabled", havingValue = "true") +public class ExtensionBeans { + + private static final String STORAGE_MEMORY = "MEMORY"; + private static final String STORAGE_REDIS = "REDIS"; + + @Bean + public ThriftClientFactory thriftClientFactory( + ThriftClientFactory defaultWaggleDanceClientFactory, + BucketService bucketService, + BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) { + return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator, meterRegistry); + } + + @Bean + public BucketKeyGenerator bucketKeyGenerator( + @Value("${waggledance.extensions.ratelimit.keyPrefix:\"\"}") String keyPrefix) { + return new BucketKeyGenerator(keyPrefix); + } + + @ConditionalOnProperty(name = "waggledance.extensions.ratelimit.storage", havingValue = STORAGE_MEMORY) + @Bean + public BucketService inMemoryBucketService(BucketBandwidthProvider bucketBandwidthProvider) { + return new InMemoryBucketService(bucketBandwidthProvider); + } + + @ConditionalOnProperty(name = "waggledance.extensions.ratelimit.storage", havingValue = STORAGE_REDIS) + @Bean + public BucketService redisBucketService( + BucketBandwidthProvider bucketBandwidthProvider, + RedissonBasedProxyManager redissonBasedProxyManager) { + return new RedisBucketService(bucketBandwidthProvider, redissonBasedProxyManager); + } + + @ConditionalOnProperty(name = "waggledance.extensions.ratelimit.storage", havingValue = STORAGE_REDIS) + @Bean + public RedissonBasedProxyManager redissonBasedProxyManager( + @Value("${waggledance.extensions.ratelimit.reddison.embedded.config}") String embeddedConfigString) + throws IOException { + Config config = Config.fromYAML(embeddedConfigString); + Redisson redisson = (Redisson) Redisson.create(config); + ConnectionManager connectionManager = redisson.getConnectionManager(); + RedissonObjectBuilder objectBuilder = new RedissonObjectBuilder(redisson.reactive()); + CommandAsyncExecutor commandExecutor = new CommandReactiveService(connectionManager, objectBuilder); + RedissonBasedProxyManager proxyManager = RedissonBasedProxyManager + .builderFor(commandExecutor) + .withExpirationStrategy(ExpirationAfterWriteStrategy.basedOnTimeForRefillingBucketUpToMax(Duration.ofHours(24))) + .withKeyMapper(Mapper.STRING) + .build(); + return proxyManager; + } + + @Bean + public BucketBandwidthProvider bucketBandwidthProvider( + @Value("${waggledance.extensions.ratelimit.capacity:2000}") long capacity, + @Value("${waggledance.extensions.ratelimit.tokensPerMinute:1000}") long tokensPerMinute, + @Value("${waggledance.extensions.ratelimit.refillType:GREEDY}") RefillType refillType) { + return refillType.createBandwidthProvider(capacity, tokensPerMinute); + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketBandwidthProvider.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketBandwidthProvider.java new file mode 100644 index 000000000..add1404e6 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketBandwidthProvider.java @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import io.github.bucket4j.Bandwidth; + +public interface BucketBandwidthProvider { + + Bandwidth getBandwidth(); + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGenerator.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGenerator.java new file mode 100644 index 000000000..ea8d65749 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGenerator.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +public class BucketKeyGenerator { + + private final String prefix; + + public BucketKeyGenerator(String prefix) { + this.prefix = prefix; + } + + public String generateKey(String key) { + if (prefix != null && !prefix.isEmpty()) { + return prefix + "_" + key; + } + return "" + key; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketService.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketService.java new file mode 100644 index 000000000..76169bfd6 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketService.java @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import io.github.bucket4j.Bucket; + +public interface BucketService { + + public Bucket getBucket(String key); + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/GreedyBandwidthProvider.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/GreedyBandwidthProvider.java new file mode 100644 index 000000000..7fe6b2bcf --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/GreedyBandwidthProvider.java @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import java.time.Duration; + +import io.github.bucket4j.Bandwidth; + +public class GreedyBandwidthProvider implements BucketBandwidthProvider { + + private final long capacity; + private final long tokensPerMinute; + + public GreedyBandwidthProvider(long capacity, long tokensPerMinute) { + this.capacity = capacity; + this.tokensPerMinute = tokensPerMinute; + } + + @Override + public Bandwidth getBandwidth() { + Bandwidth limit = Bandwidth + .builder() + .capacity(capacity) + .refillGreedy(tokensPerMinute, Duration.ofMinutes(1)) + .build(); + return limit; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/IntervallyBandwidthProvider.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/IntervallyBandwidthProvider.java new file mode 100644 index 000000000..c090360f1 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/IntervallyBandwidthProvider.java @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import java.time.Duration; + +import io.github.bucket4j.Bandwidth; + +public class IntervallyBandwidthProvider implements BucketBandwidthProvider { + + private long capacity; + private long tokensPerMinute; + + public IntervallyBandwidthProvider(long capacity, long tokensPerMinute) { + this.capacity = capacity; + this.tokensPerMinute = tokensPerMinute; + } + + @Override + public Bandwidth getBandwidth() { + Bandwidth limit = Bandwidth + .builder() + .capacity(capacity) + .refillIntervally(tokensPerMinute, Duration.ofMinutes(1)) + .build(); + return limit; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitMetrics.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitMetrics.java new file mode 100644 index 000000000..42b585224 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitMetrics.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +public enum RateLimitMetrics { + + EXCEEDED("exceeded"), + ERRORS("errors"), + WITHIN_LIMIT("within_limit"); + + private final static String METRIC_BASE_NAME = "com.hotels.bdp.waggledance.extensions.client.ratelimit"; + private String metricName; + + private RateLimitMetrics(String name) { + this.metricName = METRIC_BASE_NAME + "." + name; + } + + public String getMetricName() { + return metricName; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java new file mode 100644 index 000000000..314d11868 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import java.lang.reflect.Proxy; + +import io.micrometer.core.instrument.MeterRegistry; + +import com.hotels.bdp.waggledance.api.model.AbstractMetaStore; +import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; +import com.hotels.bdp.waggledance.client.ThriftClientFactory; + +public class RateLimitingClientFactory implements ThriftClientFactory { + + private static final Class[] INTERFACES = new Class[] { CloseableThriftHiveMetastoreIface.class }; + + private final ThriftClientFactory thriftClientFactory; + private final BucketService bucketService; + private final BucketKeyGenerator bucketKeyGenerator; + private final MeterRegistry meterRegistry; + + public RateLimitingClientFactory( + ThriftClientFactory thriftClientFactory, + BucketService bucketService, + BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) { + this.thriftClientFactory = thriftClientFactory; + this.bucketService = bucketService; + this.bucketKeyGenerator = bucketKeyGenerator; + this.meterRegistry = meterRegistry; + } + + @Override + public CloseableThriftHiveMetastoreIface newInstance(AbstractMetaStore metaStore) { + CloseableThriftHiveMetastoreIface client = thriftClientFactory.newInstance(metaStore); + return (CloseableThriftHiveMetastoreIface) Proxy + .newProxyInstance(getClass().getClassLoader(), INTERFACES, + new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator, meterRegistry)); + + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java new file mode 100644 index 000000000..5259d8716 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Set; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.github.bucket4j.Bucket; +import io.github.bucket4j.ConsumptionProbe; +import io.micrometer.core.instrument.MeterRegistry; + +import com.google.common.collect.Sets; + +import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; +import com.hotels.bdp.waggledance.server.WaggleDanceServerException; + +class RateLimitingInvocationHandler implements InvocationHandler { + private static Logger log = LoggerFactory.getLogger(RateLimitingInvocationHandler.class); + + static final String UNKNOWN_USER = "_UNKNOWN_USER_"; + private static final Set IGNORABLE_METHODS = Sets.newHashSet("isOpen", "close", "set_ugi", "flushCache"); + private String metastoreName; + private CloseableThriftHiveMetastoreIface client; + private String user = UNKNOWN_USER; + + private final BucketService bucketService; + private final BucketKeyGenerator bucketKeyGenerator; + private final MeterRegistry meterRegistry; + + public RateLimitingInvocationHandler( + CloseableThriftHiveMetastoreIface client, + String metastoreName, + BucketService bucketService, + BucketKeyGenerator bucketKeyGenerator, + MeterRegistry meterRegistry) { + this.client = client; + this.metastoreName = metastoreName; + this.bucketService = bucketService; + this.bucketKeyGenerator = bucketKeyGenerator; + this.meterRegistry = meterRegistry; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getName().equals("set_ugi")) { + user = (String) args[0]; + } + if (isIgnoredMethod(method.getName())) { + return doRealCall(client, method, args); + } else { + return doRateLimitCall(client, method, args); + } + } + + private Object doRateLimitCall(CloseableThriftHiveMetastoreIface client, Method method, Object[] args) + throws IllegalAccessException, Throwable { + if (shouldProceedWithCall(method)) { + return doRealCall(client, method, args); + } else { + log.info("User '{}' made too many requests.", user); + // HTTP status would be 429, so using same for Thrift. + throw new WaggleDanceServerException("[STATUS=429] Too many requests."); + } + } + + private boolean shouldProceedWithCall(Method method) { + try { + Bucket bucket = bucketService.getBucket(bucketKeyGenerator.generateKey(user)); + ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); + log + .info("RateLimitCall:[User:{}, method:{}, source_ip:{}, tokens_remaining:{}, metastoreName:{}]", user, + method.getName(), HMSHandler.getThreadLocalIpAddress(), probe.getRemainingTokens(), metastoreName); + boolean isConsumed = probe.isConsumed(); + if (isConsumed) { + meterRegistry.counter(RateLimitMetrics.WITHIN_LIMIT.getMetricName()).increment(); + } else { + meterRegistry.counter(RateLimitMetrics.EXCEEDED.getMetricName()).increment(); + } + return isConsumed; + } catch (Exception e) { + meterRegistry.counter(RateLimitMetrics.ERRORS.getMetricName()).increment(); + if (log.isDebugEnabled()) { + log.error("Error while processing rate limit for: User:{}, method:{}", user, method.getName(), e); + } else { + log + .error("Error while processing rate limit for: User:{}, method:{}, message:{}", user, method.getName(), + e.getMessage()); + } + return true; + } + } + + private Object doRealCall(CloseableThriftHiveMetastoreIface client, Method method, Object[] args) + throws IllegalAccessException, Throwable { + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + Throwable realException = e.getTargetException(); + throw realException; + } + } + + /** + * Ignore some methods that are not "real" metastore calls or should not count towards a rate limit. + * + * @param method + * @return true if the method should be ignored for rate limiting purposes. + */ + private boolean isIgnoredMethod(String methodName) { + return IGNORABLE_METHODS.contains(methodName); + } +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RefillType.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RefillType.java new file mode 100644 index 000000000..0afd138ba --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RefillType.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +public enum RefillType { + GREEDY { + @Override + public BucketBandwidthProvider createBandwidthProvider(long capacity, long tokensPerMinute) { + return new GreedyBandwidthProvider(capacity, tokensPerMinute); + } + }, + INTERVALLY { + @Override + public BucketBandwidthProvider createBandwidthProvider(long capacity, long tokensPerMinute) { + return new IntervallyBandwidthProvider(capacity, tokensPerMinute); + } + }; + + public abstract BucketBandwidthProvider createBandwidthProvider(long capacity, long tokensPerMinute); +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/memory/InMemoryBucketService.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/memory/InMemoryBucketService.java new file mode 100644 index 000000000..d29f467a9 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/memory/InMemoryBucketService.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit.memory; + +import java.util.HashMap; +import java.util.Map; + +import io.github.bucket4j.Bucket; + +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketBandwidthProvider; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketService; + +/** + * This class is mostly intended for testing or if you want to have a simple in-memory rate limiter and there is just + * one Waggle Dance instance deployed. + */ +public class InMemoryBucketService implements BucketService { + + private final BucketBandwidthProvider bucketBandwidthProvider; + private Map bucketsPerUser = new HashMap<>(); + + public InMemoryBucketService(BucketBandwidthProvider bucketBandwidthProvider) { + this.bucketBandwidthProvider = bucketBandwidthProvider; + } + + private Bucket createNewBucket() { + return Bucket.builder().addLimit(bucketBandwidthProvider.getBandwidth()).build(); + } + + public Bucket getBucket(String key) { + Bucket bucket = bucketsPerUser.get(key); + if (bucket == null) { + bucket = createNewBucket(); + bucketsPerUser.put(key, bucket); + } + return bucket; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/redis/RedisBucketService.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/redis/RedisBucketService.java new file mode 100644 index 000000000..1af0ae165 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/redis/RedisBucketService.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit.redis; + +import io.github.bucket4j.Bucket; +import io.github.bucket4j.BucketConfiguration; +import io.github.bucket4j.redis.redisson.cas.RedissonBasedProxyManager; + +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketBandwidthProvider; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketService; + +/** + * This class uses a Redis server as a the storage back-end for the rate limiter. This is useful if you have multiple + * Waggle Dance instances and you want to rate limit across all of them. + */ +public class RedisBucketService implements BucketService { + + private final RedissonBasedProxyManager proxyManager; + private final BucketConfiguration configuration; + + public RedisBucketService( + BucketBandwidthProvider bucketBandwidthProvider, + RedissonBasedProxyManager proxyManager) { + this.proxyManager = proxyManager; + configuration = BucketConfiguration.builder().addLimit(bucketBandwidthProvider.getBandwidth()).build(); + } + + @Override + public Bucket getBucket(String key) { + Bucket bucket = proxyManager.builder().build(key, () -> configuration); + return bucket; + } + +} diff --git a/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGeneratorTest.java b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGeneratorTest.java new file mode 100644 index 000000000..4777c887a --- /dev/null +++ b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/BucketKeyGeneratorTest.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Test; + +public class BucketKeyGeneratorTest { + + @Test + public void testGenerateKey() { + BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator("prefix"); + String key = bucketKeyGenerator.generateKey("key"); + assertThat(key, is("prefix_key")); + } + + @Test + public void testGenerateNullKey() { + BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator("prefix"); + String key = bucketKeyGenerator.generateKey(null); + assertThat(key, is("prefix_null")); + } + + + @Test + public void testGenerateNullKeyNullPrefix() { + BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator(null); + String key = bucketKeyGenerator.generateKey(null); + assertThat(key, is("null")); + } + + + @Test + public void testGenerateKeyNullPrefix() { + BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator(null); + String key = bucketKeyGenerator.generateKey("key"); + assertThat(key, is("key")); + } + + @Test + public void testGenerateKeyEmptyPrefix() { + BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator(""); + String key = bucketKeyGenerator.generateKey("key"); + assertThat(key, is("key")); + } + +} diff --git a/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java new file mode 100644 index 000000000..ca5f56f26 --- /dev/null +++ b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import static com.hotels.bdp.waggledance.extensions.client.ratelimit.RateLimitingInvocationHandler.UNKNOWN_USER; + +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +import com.hotels.bdp.waggledance.api.model.AbstractMetaStore; +import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; +import com.hotels.bdp.waggledance.client.ThriftClientFactory; +import com.hotels.bdp.waggledance.extensions.client.ratelimit.memory.InMemoryBucketService; +import com.hotels.bdp.waggledance.server.WaggleDanceServerException; + +@RunWith(MockitoJUnitRunner.class) +public class RateLimitingInvocationHandlerTest { + + private static final String USER = "user"; + private @Mock ThriftClientFactory thriftClientFactory; + private @Mock CloseableThriftHiveMetastoreIface client; + private @Mock BucketKeyGenerator bucketKeyGenerator; + private MeterRegistry meterRegistry = new SimpleMeterRegistry(); + private BucketService bucketService = new InMemoryBucketService(new IntervallyBandwidthProvider(2, 1)); + private AbstractMetaStore metastore = AbstractMetaStore.newPrimaryInstance("name", "uri"); + private CloseableThriftHiveMetastoreIface handlerProxy; + + @Before + public void setUp() { + when(thriftClientFactory.newInstance(metastore)).thenReturn(client); + when(bucketKeyGenerator.generateKey(USER)).thenReturn(USER); + when(bucketKeyGenerator.generateKey(UNKNOWN_USER)).thenReturn(UNKNOWN_USER); + handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator, meterRegistry) + .newInstance(metastore); + } + + @Test + public void testLimitDifferentUsers() throws Exception { + + assertTokens(2, 2); + handlerProxy.get_table("db", "table"); + assertTokens(2, 1); + + handlerProxy.set_ugi(USER, null); + assertTokens(2, 1); + + handlerProxy.get_table("db", "table"); + assertTokens(1, 1); + + handlerProxy.get_table("db", "table"); + assertTokens(0, 1); + + try { + handlerProxy.get_table("db", "table"); + fail("Should have thrown exception."); + } catch (WaggleDanceServerException e) { + assertThat(e.getMessage(), is("[STATUS=429] Too many requests.")); + } + + verify(client, times(3)).get_table("db", "table"); + verify(client).set_ugi(USER, null); + assertThat(meterRegistry.counter(RateLimitMetrics.WITHIN_LIMIT.getMetricName()).count(), is(3.0)); + assertThat(meterRegistry.counter(RateLimitMetrics.ERRORS.getMetricName()).count(), is(0.0)); + assertThat(meterRegistry.counter(RateLimitMetrics.EXCEEDED.getMetricName()).count(), is(1.0)); + } + + @Test + public void testBucketExceptionStillDoCall() throws Exception { + Table table = new Table(); + when(client.get_table("db", "table")).thenReturn(table); + BucketService mockedBucketService = Mockito.mock(BucketService.class); + when(mockedBucketService.getBucket(anyString())).thenThrow(new RuntimeException("Bucket exception")); + CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator, meterRegistry) + .newInstance(metastore); + + Table result = proxy.get_table("db", "table"); + assertThat(result, is(table)); + assertThat(meterRegistry.counter(RateLimitMetrics.WITHIN_LIMIT.getMetricName()).count(), is(0.0)); + assertThat(meterRegistry.counter(RateLimitMetrics.ERRORS.getMetricName()).count(), is(1.0)); + assertThat(meterRegistry.counter(RateLimitMetrics.EXCEEDED.getMetricName()).count(), is(0.0)); + + } + + @Test + public void testInvocationHandlerThrowsCause() throws Exception { + when(client.get_table("db", "table")).thenThrow(new NoSuchObjectException("No such table")); + try { + handlerProxy.get_table("db", "table"); + fail("Should have thrown exception."); + } catch (NoSuchObjectException e) { + assertThat(e.getMessage(), is("No such table")); + } + } + + @Test + public void testIgnoreSetUgi() throws Exception { + assertTokens(2, 2); + handlerProxy.set_ugi(USER, null); + assertTokens(2, 2); + + verify(client).set_ugi(USER, null); + } + + @Test + public void testIgnoreFlushCache() throws Exception { + assertTokens(2, 2); + handlerProxy.flushCache(); + assertTokens(2, 2); + + verify(client).flushCache(); + } + + @Test + public void testIgnoreIsOpen() throws Exception { + assertTokens(2, 2); + + handlerProxy.isOpen(); + assertTokens(2, 2); + + verify(client).isOpen(); + } + + @Test + public void testIgnoreClose() throws Exception { + assertTokens(2, 2); + handlerProxy.close(); + assertTokens(2, 2); + + verify(client).close(); + } + + private void assertTokens(long expectedUserTokenCount, long expectedUnknownUserTokenCount) { + assertThat(bucketService.getBucket(USER).getAvailableTokens(), is(expectedUserTokenCount)); + assertThat(bucketService.getBucket(UNKNOWN_USER).getAvailableTokens(), is(expectedUnknownUserTokenCount)); + } +} diff --git a/waggle-dance-extensions/src/test/resources/log4j2.xml b/waggle-dance-extensions/src/test/resources/log4j2.xml new file mode 100644 index 000000000..1b5cd8bf4 --- /dev/null +++ b/waggle-dance-extensions/src/test/resources/log4j2.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + diff --git a/waggle-dance-integration-tests/pom.xml b/waggle-dance-integration-tests/pom.xml index 887d108a9..b8d9e2f21 100644 --- a/waggle-dance-integration-tests/pom.xml +++ b/waggle-dance-integration-tests/pom.xml @@ -4,7 +4,7 @@ com.hotels waggle-dance-parent - 3.12.1-SNAPSHOT + 3.13.0-SNAPSHOT waggle-dance-integration-tests @@ -26,6 +26,11 @@ + + com.hotels + waggle-dance-extensions + ${project.version} + com.hotels waggle-dance-rest @@ -123,6 +128,12 @@ + + com.github.codemonstur + embedded-redis + 1.4.3 + test + com.github.stefanbirkner system-rules diff --git a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/TestUtils.java b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/TestUtils.java index 335000e11..b0ec55fe2 100644 --- a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/TestUtils.java +++ b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/TestUtils.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package com.hotels.bdp.waggledance; import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -29,7 +31,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; -final class TestUtils { +public final class TestUtils { private TestUtils() {} @@ -97,4 +99,13 @@ static Partition newPartition(Table hiveTable, List values, File locatio partition.getSd().setLocation(location.toURI().toString()); return partition; } + + public static int getFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/WaggleDanceRunner.java b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/WaggleDanceRunner.java index 910059a64..e2e36174d 100644 --- a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/WaggleDanceRunner.java +++ b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/WaggleDanceRunner.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2021 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.net.ServerSocket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +30,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,10 +39,17 @@ import org.apache.commons.vfs2.FileSystemException; import org.apache.commons.vfs2.FileSystemManager; import org.apache.commons.vfs2.VFS; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.yaml.snakeyaml.Yaml; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; import com.hotels.bdp.waggledance.api.model.AccessControlType; import com.hotels.bdp.waggledance.api.model.DatabaseResolution; @@ -58,11 +66,14 @@ public class WaggleDanceRunner implements WaggleDance.ContextListener { + private static Logger log = LoggerFactory.getLogger(WaggleDanceRunner.class); + public static final String SERVER_CONFIG = "server-config"; public static final String FEDERATION_CONFIG = "federation-config"; private final File serverConfig; private final File federationConfig; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private ApplicationContext applicationContext; private final int restApiPort; @@ -74,6 +85,7 @@ public static class Builder { private final GraphiteConfiguration graphiteConfiguration = new GraphiteConfiguration(); private final List federatedMetaStores = new ArrayList<>(); private PrimaryMetaStore primaryMetaStore; + private Map extraServerConfig = new HashMap<>(); private Builder(File workingDirectory) { checkArgument(workingDirectory != null); @@ -145,7 +157,11 @@ public Builder withPrimaryDatabaseNameMappingMap(Map databaseNam return this; } - public Builder federate(String name, String remoteMetaStoreUris, List mappedTables, String... mappableDatabases) { + public Builder federate( + String name, + String remoteMetaStoreUris, + List mappedTables, + String... mappableDatabases) { checkArgument(isNotEmpty(name)); checkArgument(isNotEmpty(remoteMetaStoreUris)); FederatedMetaStore federatedMetaStore = new FederatedMetaStore(name, remoteMetaStoreUris); @@ -232,7 +248,12 @@ public Builder graphite(String graphiteHost, int graphitePort, String graphitePr return this; } - private File marshall(Yaml yaml, String fileName, Object... objects) { + public Builder extraServerConfig(Map extraServerConfig) { + this.extraServerConfig = extraServerConfig; + return this; + } + + private File marshall(Yaml yaml, String fileName, Object... objects) throws IOException { File config = new File(workingDirectory, fileName); FileSystemManager fsManager = null; @@ -250,35 +271,26 @@ private File marshall(Yaml yaml, String fileName, Object... objects) { } catch (IOException e) { throw new RuntimeException("Unable to write federations to '" + config.toURI() + "'", e); } - + log.info("Wrote config {} content: {}", fileName, Files.asCharSource(config, StandardCharsets.UTF_8).read()); return config; } - public WaggleDanceRunner build() { + public WaggleDanceRunner build() throws IOException { Yaml yaml = YamlFactory.newYaml(); HashMap extraConfig = new HashMap<>(); extraConfig.put("graphite", graphiteConfiguration); extraConfig.put("yaml-storage", yamlStorageConfiguration); - int restApiPort = getFreePort(); + int restApiPort = TestUtils.getFreePort(); extraConfig.put("server.port", restApiPort); + extraConfig.putAll(extraServerConfig); File serverConfig = marshall(yaml, SERVER_CONFIG + ".yml", waggleDanceConfiguration, extraConfig); - Federations federations = new Federations(primaryMetaStore, federatedMetaStores); File federationConfig = marshall(yaml, FEDERATION_CONFIG + ".yml", federations); - WaggleDanceRunner runner = new WaggleDanceRunner(serverConfig, federationConfig, restApiPort); return runner; } - private int getFreePort() { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } public static Builder builder(File workingDirectory) { @@ -327,14 +339,22 @@ private MetaStoreProxyServer getProxy() { return applicationContext.getBean(MetaStoreProxyServer.class); } - public Map run() throws Exception { - Map props = populateProperties(); - WaggleDance.register(this); - WaggleDance.main(getArgsArray(props)); - return props; + public void runAndWaitForStartup() throws Exception { + executor.submit(() -> { + try { + Map props = populateProperties(); + WaggleDance.register(this); + WaggleDance.main(getArgsArray(props)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Error during execution", e); + } + }); + waitForService(); } - public void waitForService() throws Exception { + private void waitForService() throws Exception { long delay = 1; while (applicationContext == null) { if (delay >= 15) { @@ -356,6 +376,9 @@ public void stop() throws Exception { Thread.sleep(TimeUnit.SECONDS.toMillis(++delay)); } } + if (!executor.isShutdown()) { + executor.shutdownNow(); + } } @Override @@ -368,4 +391,12 @@ public void onStop(ApplicationContext context) { applicationContext = null; } + public HiveMetaStoreClient createWaggleDanceClient() throws MetaException { + String thriftUri = "thrift://localhost:" + MetaStoreProxyServer.DEFAULT_WAGGLEDANCE_PORT; + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.METASTOREURIS, thriftUri); + conf.setBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI, true); + return new HiveMetaStoreClient(conf); + } + } diff --git a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/junit/ServerSocketRule.java b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/junit/ServerSocketRule.java index 83748ad43..e34f8ea7f 100644 --- a/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/junit/ServerSocketRule.java +++ b/waggle-dance-integration-tests/src/main/java/com/hotels/bdp/waggledance/junit/ServerSocketRule.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,15 +38,18 @@ public class ServerSocketRule extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(ServerSocketRule.class); - private final InetSocketAddress address; - private final ByteArrayOutputStream output = new ByteArrayOutputStream(); - - private final ServerSocketChannel serverSocketChannel; + private InetSocketAddress address; + private ByteArrayOutputStream output; + private ServerSocketChannel serverSocketChannel; private int requests = 0; - public ServerSocketRule() { + public ServerSocketRule() {} + + @Override + protected void before() throws Throwable { try { + output = new ByteArrayOutputStream(); serverSocketChannel = (ServerSocketChannel) ServerSocketChannel .open() .bind(new InetSocketAddress(0)) diff --git a/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/WaggleDanceIntegrationTest.java b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/WaggleDanceIntegrationTest.java index 73afd10b7..b50eb50c8 100644 --- a/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/WaggleDanceIntegrationTest.java +++ b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/WaggleDanceIntegrationTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,11 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; @@ -64,6 +60,7 @@ import org.apache.thrift.TException; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -90,7 +87,6 @@ import com.hotels.bdp.waggledance.api.model.PrimaryMetaStore; import com.hotels.bdp.waggledance.junit.ServerSocketRule; import com.hotels.bdp.waggledance.mapping.model.PrefixingMetastoreFilter; -import com.hotels.bdp.waggledance.server.MetaStoreProxyServer; import com.hotels.bdp.waggledance.yaml.YamlFactory; import com.hotels.beeju.ThriftHiveMetaStoreJUnitRule; import com.hotels.hcommon.hive.metastore.client.tunnelling.MetastoreTunnel; @@ -113,7 +109,6 @@ public class WaggleDanceIntegrationTest { public @Rule ThriftHiveMetaStoreJUnitRule newRemoteServer = new ThriftHiveMetaStoreJUnitRule(); public @Rule DataFolder dataFolder = new ClassDataFolder(); - private ExecutorService executor; private WaggleDanceRunner runner; private File configLocation; @@ -131,8 +126,6 @@ public void init() throws Exception { createRemoteTable(new File(remoteWarehouseUri, REMOTE_DATABASE + "/" + REMOTE_TABLE), REMOTE_TABLE); LOG.info(">>>> Table {} ", remoteServer.client().getTable(REMOTE_DATABASE, REMOTE_TABLE)); - - executor = Executors.newSingleThreadExecutor(); } @After @@ -140,7 +133,6 @@ public void destroy() throws Exception { if (runner != null) { runner.stop(); } - executor.shutdownNow(); } private void createLocalTable(File tableUri, String table) throws Exception { @@ -166,28 +158,8 @@ private void createRemoteTable(File tableUri, String table) throws Exception { newPartition(hiveTable, Arrays.asList("Asia", "China"), partitionChina)))); } - private String getWaggleDanceThriftUri() { - return "thrift://localhost:" + MetaStoreProxyServer.DEFAULT_WAGGLEDANCE_PORT; - } - - private HiveMetaStoreClient getWaggleDanceClient() throws MetaException { - HiveConf conf = new HiveConf(); - conf.setVar(ConfVars.METASTOREURIS, getWaggleDanceThriftUri()); - conf.setBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI, true); - return new HiveMetaStoreClient(conf); - } - private void runWaggleDance(WaggleDanceRunner runner) throws Exception { - executor.submit(() -> { - try { - runner.run(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException("Error during execution", e); - } - }); - runner.waitForService(); + runner.runAndWaitForStartup(); } private Federations stopServerAndGetConfiguration() throws Exception, FileNotFoundException { @@ -208,7 +180,7 @@ public void typical() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); // Local table Table localTable = localServer.client().getTable(LOCAL_DATABASE, LOCAL_TABLE); @@ -231,7 +203,7 @@ public void typicalGetAllFunctions() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List resourceUris = Lists .newArrayList(new ResourceUri(ResourceType.JAR, "hdfs://path/to/my/jar/my.jar")); Function localFunction = new Function("fn1", LOCAL_DATABASE, "com.hotels.hive.FN1", "hadoop", PrincipalType.USER, 0, @@ -262,12 +234,15 @@ public void usePrefix() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); + HiveMetaStoreClient proxy2 = runner.createWaggleDanceClient(); // Local table Table localTable = localServer.client().getTable(LOCAL_DATABASE, LOCAL_TABLE); Table waggledLocalTable = proxy.getTable(LOCAL_DATABASE, LOCAL_TABLE); assertThat(waggledLocalTable, is(localTable)); + waggledLocalTable = proxy2.getTable(LOCAL_DATABASE, LOCAL_TABLE); + assertThat(waggledLocalTable, is(localTable)); // Remote table String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE; @@ -297,7 +272,7 @@ public void manyFederatedMetastores() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List dbs = proxy.getAllDatabases(); List expected = newArrayList("default", "local_database", "waggle_remote_default", @@ -322,7 +297,7 @@ public void usePrimaryPrefix() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); // Local table String prefixedLocalDbName = primaryPrefix + LOCAL_DATABASE; @@ -369,6 +344,8 @@ private void assertTypicalRemoteTable(HiveMetaStoreClient proxy, String waggledR } } + + @Ignore("Seems to fail for unknown reasons often in Github Actions") @Test public void typicalWithGraphite() throws Exception { runner = WaggleDanceRunner @@ -379,7 +356,7 @@ public void typicalWithGraphite() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); // Execute a couple of requests proxy.getAllDatabases(); @@ -429,7 +406,7 @@ public void readWriteCreateAllowed() throws Exception { runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); // create rights proxy.createDatabase(new Database("newDB", "", new File(localWarehouseUri, "newDB").toURI().toString(), null)); Database newDB = proxy.getDatabase("newDB"); @@ -464,7 +441,7 @@ public void readWriteCreateAllowedPrefixed() throws Exception { runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); // create rights proxy.createDatabase(new Database("newDB", "", new File(localWarehouseUri, "newDB").toURI().toString(), null)); Database newDB = proxy.getDatabase("newDB"); @@ -497,7 +474,7 @@ public void federatedWritesSucceedIfReadAndWriteOnDatabaseWhiteListIsConfigured( runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE; @@ -527,7 +504,7 @@ public void federatedWritesFailIfReadAndWriteOnDatabaseWhiteListIsNotConfigured( runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE; @@ -557,7 +534,7 @@ public void federatedWritesFailIfReadAndWriteOnDatabaseWhiteListDoesNotIncludeDb runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE; @@ -589,7 +566,7 @@ public void databaseWhitelisting() throws Exception { runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); Database readOnlyDB = proxy.getDatabase(LOCAL_DATABASE); assertNotNull(readOnlyDB); Database writableDB = proxy.getDatabase(writableDatabase); @@ -614,7 +591,7 @@ public void createDatabaseUsingManualAndWhitelistingUpdatesConfig() throws Excep runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); proxy.createDatabase(new Database("newDB", "", new File(localWarehouseUri, "newDB").toURI().toString(), null)); Database newDB = proxy.getDatabase("newDB"); assertNotNull(newDB); @@ -641,7 +618,7 @@ public void createDatabaseDatabaseUsingPrefixAndWhitelistingUpdates() throws Exc runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); proxy.createDatabase(new Database("newDB", "", new File(localWarehouseUri, "newDB").toURI().toString(), null)); Database newDB = proxy.getDatabase("newDB"); assertNotNull(newDB); @@ -672,7 +649,7 @@ public void alterTableOnFederatedIsNotAllowedUsingManual() throws Exception { runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); Table table = proxy.getTable(REMOTE_DATABASE, REMOTE_TABLE); Table newTable = new Table(table); newTable.setTableName("new_remote_table"); @@ -695,7 +672,7 @@ public void doesNotOverwriteConfigOnShutdownManualMode() throws Exception { runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); proxy.createDatabase(new Database("newDB", "", new File(localWarehouseUri, "newDB").toURI().toString(), null)); Database newDB = proxy.getDatabase("newDB"); assertNotNull(newDB); @@ -847,7 +824,7 @@ public void getDatabaseFromPatternManual() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); List expected = Lists.newArrayList("default", LOCAL_DATABASE, REMOTE_DATABASE); @@ -867,7 +844,7 @@ public void getDatabaseFromPatternPrefixed() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); List expected = Lists.newArrayList("default", LOCAL_DATABASE, PREFIXED_REMOTE_DATABASE); @@ -889,7 +866,7 @@ public void primaryMappedDatabasesManual() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); assertThat(allDatabases.size(), is(2)); @@ -918,7 +895,7 @@ public void primaryMappedDatabasesPrefixed() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); assertThat(allDatabases.size(), is(2)); @@ -955,7 +932,7 @@ public void primaryAndFederatedMappedTables() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List resultTables = proxy.getAllTables(LOCAL_DATABASE); assertThat(resultTables.size(), is(1)); assertThat(resultTables.get(0), is(localTable)); @@ -986,7 +963,7 @@ public void getTablesFromPatternMappedTables() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List resultTables = proxy.getAllTables(LOCAL_DATABASE); assertThat(resultTables.size(), is(1)); assertThat(resultTables.get(0), is(localTable)); @@ -1011,7 +988,7 @@ public void prefixedModeDatabaseNameMapping() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); assertThat(allDatabases.size(), is(5)); @@ -1044,7 +1021,7 @@ public void manualModeDatabaseNameMapping() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); List allDatabases = proxy.getAllDatabases(); assertThat(allDatabases.size(), is(5)); @@ -1072,7 +1049,7 @@ public void hiveMetastoreFilterHookConfiguredForPrimary() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); Table waggledLocalTable = proxy.getTable(LOCAL_DATABASE, LOCAL_TABLE); assertThat(waggledLocalTable.getSd().getLocation(), startsWith("prefix")); @@ -1090,7 +1067,7 @@ public void get_privilege_set() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); HiveObjectType objectType = HiveObjectType.DATABASE; String dbName = LOCAL_DATABASE; @@ -1113,8 +1090,8 @@ public void getTableMeta() throws Exception { .build(); runWaggleDance(runner); - HiveMetaStoreClient proxy = getWaggleDanceClient(); - + HiveMetaStoreClient proxy = runner.createWaggleDanceClient(); + List tableMeta = proxy .getTableMeta("waggle_remote_remote_database", "*", Lists.newArrayList("EXTERNAL_TABLE")); assertThat(tableMeta.size(), is(1)); diff --git a/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/extensions/WaggleDanceRateLimitIntegrationTest.java b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/extensions/WaggleDanceRateLimitIntegrationTest.java new file mode 100644 index 000000000..5c59b50bb --- /dev/null +++ b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/extensions/WaggleDanceRateLimitIntegrationTest.java @@ -0,0 +1,170 @@ +/** + * Copyright (C) 2016-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.bdp.waggledance.extensions; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import redis.embedded.RedisServer; + +import com.hotels.bdp.waggledance.TestUtils; +import com.hotels.bdp.waggledance.WaggleDanceRunner; +import com.hotels.bdp.waggledance.api.model.AccessControlType; +import com.hotels.bdp.waggledance.api.model.DatabaseResolution; +import com.hotels.beeju.ThriftHiveMetaStoreJUnitRule; + +public class WaggleDanceRateLimitIntegrationTest { + + public @Rule TemporaryFolder temporaryFolder = new TemporaryFolder(); + public @Rule ThriftHiveMetaStoreJUnitRule metastore = new ThriftHiveMetaStoreJUnitRule(); + + private RedisServer redisServer; + private WaggleDanceRunner runner; + private Map extraServerConfig; + + @Before + public void setup() { + extraServerConfig = new HashMap<>(); + extraServerConfig.put("waggledance.extensions.ratelimit.enabled", "true"); + //Use INTERVALLY as it's more deterministic for the test. + extraServerConfig.put("waggledance.extensions.ratelimit.refillType", "INTERVALLY"); + } + + + @Test + public void rateLimitInMemory() throws Exception { + extraServerConfig.put("waggledance.extensions.ratelimit.storage", "MEMORY"); + extraServerConfig.put("waggledance.extensions.ratelimit.capacity", "2"); + extraServerConfig.put("waggledance.extensions.ratelimit.tokenPerMinute", "1"); + + runner = WaggleDanceRunner + .builder(temporaryFolder.newFolder("config")) + .databaseResolution(DatabaseResolution.PREFIXED) + .extraServerConfig(extraServerConfig) + .primary("primary", metastore.getThriftConnectionUri(), AccessControlType.READ_AND_WRITE_AND_CREATE) + .build(); + + runner.runAndWaitForStartup(); + + HiveMetaStoreClient client = runner.createWaggleDanceClient(); + assertTokensUsed(client); + } + + @Test + public void rateLimitRedis() throws Exception { + startRedis(); + String reddisonYaml = ""; + reddisonYaml += "singleServerConfig:\n"; + reddisonYaml += " address: \"redis://localhost:" + redisServer.ports().get(0) + "\"\n"; + + extraServerConfig.put("waggledance.extensions.ratelimit.storage", "REDIS"); + extraServerConfig.put("waggledance.extensions.ratelimit.capacity", "2"); + extraServerConfig.put("waggledance.extensions.ratelimit.tokenPerMinute", "1"); + extraServerConfig.put("waggledance.extensions.ratelimit.reddison.embedded.config", reddisonYaml); + + runner = WaggleDanceRunner + .builder(temporaryFolder.newFolder("config")) + .databaseResolution(DatabaseResolution.PREFIXED) + .extraServerConfig(extraServerConfig) + .primary("primary", metastore.getThriftConnectionUri(), AccessControlType.READ_AND_WRITE_AND_CREATE) + .build(); + + runner.runAndWaitForStartup(); + + HiveMetaStoreClient client = runner.createWaggleDanceClient(); + assertTokensUsed(client); + } + + private void assertTokensUsed(HiveMetaStoreClient client) throws MetaException, NoSuchObjectException, TException { + List allDatabases = client.getAllDatabases(); + assertThat(allDatabases.size(), is(2)); + Database database = client.getDatabase("default"); + assertThat(database.getName(), is("default")); + + // Tokens spent + allDatabases = client.getAllDatabases(); + // getAllDatabases is special as it's a request that is federated across all Metastores + // hence the underlying Rate Limit Exception is not returned. + assertThat(allDatabases.size(), is(0)); + + try { + client.getDatabase("default"); + } catch (MetaException e) { + assertThat(e.getMessage(), is("Waggle Dance: [STATUS=429] Too many requests.")); + } + } + + @Test + public void ignoreRedisServerDown() throws Exception { + startRedis(); + String reddisonYaml = ""; + reddisonYaml += "singleServerConfig:\n"; + reddisonYaml += " address: \"redis://localhost:" + redisServer.ports().get(0) + "\"\n"; + reddisonYaml += " retryAttempts: 0\n"; + + extraServerConfig.put("waggledance.extensions.ratelimit.storage", "REDIS"); + extraServerConfig.put("waggledance.extensions.ratelimit.capacity", "1"); + extraServerConfig.put("waggledance.extensions.ratelimit.tokenPerMinute", "1"); + extraServerConfig.put("waggledance.extensions.ratelimit.reddison.embedded.config", reddisonYaml); + + runner = WaggleDanceRunner + .builder(temporaryFolder.newFolder("config")) + .databaseResolution(DatabaseResolution.PREFIXED) + .extraServerConfig(extraServerConfig) + .primary("primary", metastore.getThriftConnectionUri(), AccessControlType.READ_AND_WRITE_AND_CREATE) + .build(); + + runner.runAndWaitForStartup(); + + // Simulate Redis server down + redisServer.stop(); + + HiveMetaStoreClient client = runner.createWaggleDanceClient(); + assertThat(client.getDatabase("default").getName(), is("default")); + assertThat(client.getDatabase("default").getName(), is("default")); + assertThat(client.getDatabase("default").getName(), is("default")); + } + + @After + public void teardown() throws Exception { + if (redisServer != null) { + redisServer.stop(); + } + if (runner != null) { + runner.stop(); + } + } + + private void startRedis() throws Exception { + redisServer = new RedisServer(TestUtils.getFreePort()); + redisServer.start(); + } +} diff --git a/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/junit/ServerSocketRuleTest.java b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/junit/ServerSocketRuleTest.java index 4d17b5770..05090276d 100644 --- a/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/junit/ServerSocketRuleTest.java +++ b/waggle-dance-integration-tests/src/test/java/com/hotels/bdp/waggledance/junit/ServerSocketRuleTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.net.ConnectException; import java.net.Socket; +import org.junit.Before; import org.junit.Test; public class ServerSocketRuleTest { @@ -33,6 +34,11 @@ private void sendData(int port, byte[] bytes) throws Exception { } } + @Before + public void setUp() throws Throwable { + rule.before(); + } + @Test public void typical() throws Throwable { sendData(rule.port(), "my-data".getBytes()); diff --git a/waggle-dance-integration-tests/src/test/resources/log4j.xml b/waggle-dance-integration-tests/src/test/resources/log4j.xml index ea289a5ca..470f6f1ae 100644 --- a/waggle-dance-integration-tests/src/test/resources/log4j.xml +++ b/waggle-dance-integration-tests/src/test/resources/log4j.xml @@ -1,6 +1,6 @@ - + diff --git a/waggle-dance-integration-tests/src/test/resources/log4j2.xml b/waggle-dance-integration-tests/src/test/resources/log4j2.xml index e6605454a..9764318a2 100644 --- a/waggle-dance-integration-tests/src/test/resources/log4j2.xml +++ b/waggle-dance-integration-tests/src/test/resources/log4j2.xml @@ -1,6 +1,6 @@