diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java index 4609d9ac..7fbeeff2 100644 --- a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/ExtensionBeans.java @@ -41,6 +41,7 @@ import io.github.bucket4j.distributed.ExpirationAfterWriteStrategy; import io.github.bucket4j.distributed.serialization.Mapper; import io.github.bucket4j.redis.redisson.cas.RedissonBasedProxyManager; +import io.micrometer.core.instrument.MeterRegistry; @Configuration @ConditionalOnProperty(name = "waggledance.extensions.ratelimit.enabled", havingValue = "true") @@ -53,8 +54,8 @@ public class ExtensionBeans { public ThriftClientFactory thriftClientFactory( ThriftClientFactory defaultWaggleDanceClientFactory, BucketService bucketService, - BucketKeyGenerator bucketKeyGenerator) { - return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator); + BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) { + return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator, meterRegistry); } @Bean diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/Metrics.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/Metrics.java new file mode 100644 index 00000000..2c3d7160 --- /dev/null +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/Metrics.java @@ -0,0 +1,20 @@ +package com.hotels.bdp.waggledance.extensions.client.ratelimit; + +public enum Metrics { + + CALLS("calls"), + EXCEEDED("exceeded"), + ERRORS("errors"); + + private final static String METRIC_BASE_NAME = "com.hotels.bdp.waggledance.extensions.client.ratelimit"; + private String metricName; + + private Metrics(String name) { + this.metricName = METRIC_BASE_NAME + "." + name; + } + + public String getMetricName() { + return metricName; + } + +} diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java index c8ce32e4..985874d5 100644 --- a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingClientFactory.java @@ -21,6 +21,8 @@ import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; import com.hotels.bdp.waggledance.client.ThriftClientFactory; +import io.micrometer.core.instrument.MeterRegistry; + public class RateLimitingClientFactory implements ThriftClientFactory { private static final Class[] INTERFACES = new Class[] { CloseableThriftHiveMetastoreIface.class }; @@ -28,14 +30,16 @@ public class RateLimitingClientFactory implements ThriftClientFactory { private final ThriftClientFactory thriftClientFactory; private final BucketService bucketService; private final BucketKeyGenerator bucketKeyGenerator; + private final MeterRegistry meterRegistry; public RateLimitingClientFactory( ThriftClientFactory thriftClientFactory, BucketService bucketService, - BucketKeyGenerator bucketKeyGenerator) { + BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) { this.thriftClientFactory = thriftClientFactory; this.bucketService = bucketService; this.bucketKeyGenerator = bucketKeyGenerator; + this.meterRegistry = meterRegistry; } @Override @@ -43,7 +47,7 @@ public CloseableThriftHiveMetastoreIface newInstance(AbstractMetaStore metaStore CloseableThriftHiveMetastoreIface client = thriftClientFactory.newInstance(metaStore); return (CloseableThriftHiveMetastoreIface) Proxy .newProxyInstance(getClass().getClassLoader(), INTERFACES, - new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator)); + new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator, meterRegistry)); } diff --git a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java index 60e8ca84..0a1e8acd 100644 --- a/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java +++ b/waggle-dance-extensions/src/main/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandler.java @@ -24,17 +24,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.github.bucket4j.Bucket; -import io.github.bucket4j.ConsumptionProbe; - import com.google.common.collect.Sets; - import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface; import com.hotels.bdp.waggledance.server.WaggleDanceServerException; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.ConsumptionProbe; +import io.micrometer.core.instrument.MeterRegistry; + class RateLimitingInvocationHandler implements InvocationHandler { private static Logger log = LoggerFactory.getLogger(RateLimitingInvocationHandler.class); - + + static final String METRIC_BASE_NAME = "com.hotels.bdp.waggledance.extensions.client.ratelimit"; static final String UNKNOWN_USER = "_UNKNOWN_USER_"; private static final Set IGNORABLE_METHODS = Sets.newHashSet("isOpen", "close", "set_ugi", "flushCache"); private String metastoreName; @@ -43,15 +44,19 @@ class RateLimitingInvocationHandler implements InvocationHandler { private final BucketService bucketService; private final BucketKeyGenerator bucketKeyGenerator; + private final MeterRegistry meterRegistry; public RateLimitingInvocationHandler( CloseableThriftHiveMetastoreIface client, String metastoreName, - BucketService bucketService, BucketKeyGenerator bucketKeyGenerator) { + BucketService bucketService, + BucketKeyGenerator bucketKeyGenerator, + MeterRegistry meterRegistry) { this.client = client; this.metastoreName = metastoreName; this.bucketService = bucketService; this.bucketKeyGenerator = bucketKeyGenerator; + this.meterRegistry = meterRegistry; } @Override @@ -71,6 +76,7 @@ private Object doRateLimitCall(CloseableThriftHiveMetastoreIface client, Method if (shouldProceedWithCall(method)) { return doRealCall(client, method, args); } else { + meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).increment(); log.info("User '{}' made too many requests.", user); // HTTP status would be 429, so using same for Thrift. throw new WaggleDanceServerException("[STATUS=429] Too many requests."); @@ -86,6 +92,7 @@ private boolean shouldProceedWithCall(Method method) { method.getName(), HMSHandler.getThreadLocalIpAddress(), probe.getRemainingTokens(), metastoreName); return probe.isConsumed(); } catch (Exception e) { + meterRegistry.counter(Metrics.ERRORS.getMetricName()).increment(); if (log.isDebugEnabled()) { log.error("Error while processing rate limit for: User:{}, method:{}", user, method.getName(), e); } else { @@ -94,6 +101,8 @@ private boolean shouldProceedWithCall(Method method) { e.getMessage()); } return true; + } finally { + meterRegistry.counter(Metrics.CALLS.getMetricName()).increment(); } } diff --git a/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java index 9fa22ca9..02a9ba0c 100644 --- a/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java +++ b/waggle-dance-extensions/src/test/java/com/hotels/bdp/waggledance/extensions/client/ratelimit/RateLimitingInvocationHandlerTest.java @@ -39,6 +39,9 @@ import com.hotels.bdp.waggledance.extensions.client.ratelimit.memory.InMemoryBucketService; import com.hotels.bdp.waggledance.server.WaggleDanceServerException; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + @RunWith(MockitoJUnitRunner.class) public class RateLimitingInvocationHandlerTest { @@ -46,6 +49,7 @@ public class RateLimitingInvocationHandlerTest { private @Mock ThriftClientFactory thriftClientFactory; private @Mock CloseableThriftHiveMetastoreIface client; private @Mock BucketKeyGenerator bucketKeyGenerator; + private MeterRegistry meterRegistry = new SimpleMeterRegistry(); private BucketService bucketService = new InMemoryBucketService(new IntervallyBandwidthProvider(2, 1)); private AbstractMetaStore metastore = AbstractMetaStore.newPrimaryInstance("name", "uri"); private CloseableThriftHiveMetastoreIface handlerProxy; @@ -55,7 +59,7 @@ public void setUp() { when(thriftClientFactory.newInstance(metastore)).thenReturn(client); when(bucketKeyGenerator.generateKey(USER)).thenReturn(USER); when(bucketKeyGenerator.generateKey(UNKNOWN_USER)).thenReturn(UNKNOWN_USER); - handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator) + handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator, meterRegistry) .newInstance(metastore); } @@ -84,6 +88,9 @@ public void testLimitDifferentUsers() throws Exception { verify(client, times(3)).get_table("db", "table"); verify(client).set_ugi(USER, null); + assertThat(meterRegistry.counter(Metrics.CALLS.getMetricName()).count(), is(4.0)); + assertThat(meterRegistry.counter(Metrics.ERRORS.getMetricName()).count(), is(0.0)); + assertThat(meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).count(), is(1.0)); } @Test @@ -92,11 +99,15 @@ public void testBucketExceptionStillDoCall() throws Exception { when(client.get_table("db", "table")).thenReturn(table); BucketService mockedBucketService = Mockito.mock(BucketService.class); when(mockedBucketService.getBucket(anyString())).thenThrow(new RuntimeException("Bucket exception")); - CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator) + CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator, meterRegistry) .newInstance(metastore); Table result = proxy.get_table("db", "table"); assertThat(result, is(table)); + assertThat(meterRegistry.counter(Metrics.CALLS.getMetricName()).count(), is(1.0)); + assertThat(meterRegistry.counter(Metrics.ERRORS.getMetricName()).count(), is(1.0)); + assertThat(meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).count(), is(0.0)); + } @Test