Skip to content

Commit

Permalink
Added rate limit metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
patduin committed Apr 17, 2024
1 parent 9648fa2 commit d92cec7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.github.bucket4j.distributed.ExpirationAfterWriteStrategy;
import io.github.bucket4j.distributed.serialization.Mapper;
import io.github.bucket4j.redis.redisson.cas.RedissonBasedProxyManager;
import io.micrometer.core.instrument.MeterRegistry;

@Configuration
@ConditionalOnProperty(name = "waggledance.extensions.ratelimit.enabled", havingValue = "true")
Expand All @@ -53,8 +54,8 @@ public class ExtensionBeans {
public ThriftClientFactory thriftClientFactory(
ThriftClientFactory defaultWaggleDanceClientFactory,
BucketService bucketService,
BucketKeyGenerator bucketKeyGenerator) {
return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator);
BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) {
return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator, meterRegistry);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.hotels.bdp.waggledance.extensions.client.ratelimit;

public enum Metrics {

CALLS("calls"),
EXCEEDED("exceeded"),
ERRORS("errors");

private final static String METRIC_BASE_NAME = "com.hotels.bdp.waggledance.extensions.client.ratelimit";
private String metricName;

private Metrics(String name) {
this.metricName = METRIC_BASE_NAME + "." + name;
}

public String getMetricName() {
return metricName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,33 @@
import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface;
import com.hotels.bdp.waggledance.client.ThriftClientFactory;

import io.micrometer.core.instrument.MeterRegistry;

public class RateLimitingClientFactory implements ThriftClientFactory {

private static final Class<?>[] INTERFACES = new Class<?>[] { CloseableThriftHiveMetastoreIface.class };

private final ThriftClientFactory thriftClientFactory;
private final BucketService bucketService;
private final BucketKeyGenerator bucketKeyGenerator;
private final MeterRegistry meterRegistry;

public RateLimitingClientFactory(
ThriftClientFactory thriftClientFactory,
BucketService bucketService,
BucketKeyGenerator bucketKeyGenerator) {
BucketKeyGenerator bucketKeyGenerator, MeterRegistry meterRegistry) {
this.thriftClientFactory = thriftClientFactory;
this.bucketService = bucketService;
this.bucketKeyGenerator = bucketKeyGenerator;
this.meterRegistry = meterRegistry;
}

@Override
public CloseableThriftHiveMetastoreIface newInstance(AbstractMetaStore metaStore) {
CloseableThriftHiveMetastoreIface client = thriftClientFactory.newInstance(metaStore);
return (CloseableThriftHiveMetastoreIface) Proxy
.newProxyInstance(getClass().getClassLoader(), INTERFACES,
new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator));
new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator, meterRegistry));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;

import com.google.common.collect.Sets;

import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface;
import com.hotels.bdp.waggledance.server.WaggleDanceServerException;

import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import io.micrometer.core.instrument.MeterRegistry;

class RateLimitingInvocationHandler implements InvocationHandler {
private static Logger log = LoggerFactory.getLogger(RateLimitingInvocationHandler.class);


static final String METRIC_BASE_NAME = "com.hotels.bdp.waggledance.extensions.client.ratelimit";
static final String UNKNOWN_USER = "_UNKNOWN_USER_";
private static final Set<String> IGNORABLE_METHODS = Sets.newHashSet("isOpen", "close", "set_ugi", "flushCache");
private String metastoreName;
Expand All @@ -43,15 +44,19 @@ class RateLimitingInvocationHandler implements InvocationHandler {

private final BucketService bucketService;
private final BucketKeyGenerator bucketKeyGenerator;
private final MeterRegistry meterRegistry;

public RateLimitingInvocationHandler(
CloseableThriftHiveMetastoreIface client,
String metastoreName,
BucketService bucketService, BucketKeyGenerator bucketKeyGenerator) {
BucketService bucketService,
BucketKeyGenerator bucketKeyGenerator,
MeterRegistry meterRegistry) {
this.client = client;
this.metastoreName = metastoreName;
this.bucketService = bucketService;
this.bucketKeyGenerator = bucketKeyGenerator;
this.meterRegistry = meterRegistry;
}

@Override
Expand All @@ -71,6 +76,7 @@ private Object doRateLimitCall(CloseableThriftHiveMetastoreIface client, Method
if (shouldProceedWithCall(method)) {
return doRealCall(client, method, args);
} else {
meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).increment();
log.info("User '{}' made too many requests.", user);
// HTTP status would be 429, so using same for Thrift.
throw new WaggleDanceServerException("[STATUS=429] Too many requests.");
Expand All @@ -86,6 +92,7 @@ private boolean shouldProceedWithCall(Method method) {
method.getName(), HMSHandler.getThreadLocalIpAddress(), probe.getRemainingTokens(), metastoreName);
return probe.isConsumed();
} catch (Exception e) {
meterRegistry.counter(Metrics.ERRORS.getMetricName()).increment();
if (log.isDebugEnabled()) {
log.error("Error while processing rate limit for: User:{}, method:{}", user, method.getName(), e);
} else {
Expand All @@ -94,6 +101,8 @@ private boolean shouldProceedWithCall(Method method) {
e.getMessage());
}
return true;
} finally {
meterRegistry.counter(Metrics.CALLS.getMetricName()).increment();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
import com.hotels.bdp.waggledance.extensions.client.ratelimit.memory.InMemoryBucketService;
import com.hotels.bdp.waggledance.server.WaggleDanceServerException;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

@RunWith(MockitoJUnitRunner.class)
public class RateLimitingInvocationHandlerTest {

private static final String USER = "user";
private @Mock ThriftClientFactory thriftClientFactory;
private @Mock CloseableThriftHiveMetastoreIface client;
private @Mock BucketKeyGenerator bucketKeyGenerator;
private MeterRegistry meterRegistry = new SimpleMeterRegistry();
private BucketService bucketService = new InMemoryBucketService(new IntervallyBandwidthProvider(2, 1));
private AbstractMetaStore metastore = AbstractMetaStore.newPrimaryInstance("name", "uri");
private CloseableThriftHiveMetastoreIface handlerProxy;
Expand All @@ -55,7 +59,7 @@ public void setUp() {
when(thriftClientFactory.newInstance(metastore)).thenReturn(client);
when(bucketKeyGenerator.generateKey(USER)).thenReturn(USER);
when(bucketKeyGenerator.generateKey(UNKNOWN_USER)).thenReturn(UNKNOWN_USER);
handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator)
handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator, meterRegistry)
.newInstance(metastore);
}

Expand Down Expand Up @@ -84,6 +88,9 @@ public void testLimitDifferentUsers() throws Exception {

verify(client, times(3)).get_table("db", "table");
verify(client).set_ugi(USER, null);
assertThat(meterRegistry.counter(Metrics.CALLS.getMetricName()).count(), is(4.0));
assertThat(meterRegistry.counter(Metrics.ERRORS.getMetricName()).count(), is(0.0));
assertThat(meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).count(), is(1.0));
}

@Test
Expand All @@ -92,11 +99,15 @@ public void testBucketExceptionStillDoCall() throws Exception {
when(client.get_table("db", "table")).thenReturn(table);
BucketService mockedBucketService = Mockito.mock(BucketService.class);
when(mockedBucketService.getBucket(anyString())).thenThrow(new RuntimeException("Bucket exception"));
CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator)
CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator, meterRegistry)
.newInstance(metastore);

Table result = proxy.get_table("db", "table");
assertThat(result, is(table));
assertThat(meterRegistry.counter(Metrics.CALLS.getMetricName()).count(), is(1.0));
assertThat(meterRegistry.counter(Metrics.ERRORS.getMetricName()).count(), is(1.0));
assertThat(meterRegistry.counter(Metrics.EXCEEDED.getMetricName()).count(), is(0.0));

}

@Test
Expand Down

0 comments on commit d92cec7

Please sign in to comment.