Skip to content

Commit

Permalink
Moved to okhttp transport
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Aug 20, 2024
1 parent d0c5533 commit e5b82c6
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 755596.8796429877
"mean_ops" : 525944.6622679544
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 612871.57497758
"mean_ops" : 484353.78910261835
}
14 changes: 4 additions & 10 deletions ranger-drove/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</dependency>
<dependency>
<groupId>com.phonepe.drove</groupId>
<artifactId>drove-client-httpcomponent-transport</artifactId>
<artifactId>drove-events-client</artifactId>
<version>${drove.version}</version>
<exclusions>
<exclusion>
Expand All @@ -67,15 +67,9 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.phonepe.drove</groupId>
<artifactId>drove-events-client</artifactId>
<version>${drove.version}</version>
<exclusions>
<exclusion>
<groupId>com.phonepe.platform</groupId>
<artifactId>release-scripts</artifactId>
</exclusion>
</exclusions>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${http.client.version}</version>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,59 @@

import com.phonepe.drove.client.DroveClient;
import com.phonepe.drove.client.DroveHttpTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;

import java.net.URI;
import java.util.List;
import java.util.Map;

/**
*
* OkHttp client transport for drove
*/
@Slf4j
public class DroveOkHttpTransport implements DroveHttpTransport {
private final OkHttpClient httpClient;

public DroveOkHttpTransport(final OkHttpClient httpClient) {
this.httpClient = httpClient;
log.info("Okhttp based transport initialized");
}

@Override
public <T> T get(URI uri, Map<String, List<String>> headers, DroveClient.ResponseHandler<T> responseHandler) {
return null;
@SneakyThrows
public <T> T get(
URI uri,
Map<String, List<String>> headers,
DroveClient.ResponseHandler<T> responseHandler) {
val headersBuilder = new Headers.Builder();
headers.forEach((headerName, headerValues) -> headerValues.forEach(value -> headersBuilder.add(headerName, value)));
val request = new Request.Builder()
.url(uri.toURL())
.headers(headersBuilder.build())
.get()
.build();
log.debug("Calling url: {}", uri);
try (val response = httpClient.newCall(request).execute()) {
val body = response.body();
var strResponse = body != null ? body.string() : null;
if(!response.isSuccessful()) {
log.error("Error calling drove api {}: Status: {} Body: {}",
uri, response.code(), strResponse);
}
val droveResponse = new DroveClient.Response(response.code(),
response.headers().toMultimap(),
strResponse);
return responseHandler.handle(droveResponse);
}
catch (Exception e) {
log.error("Error calling drove: " + e.getMessage(), e);
}
return responseHandler.defaultValue();
}

@Override
Expand All @@ -22,7 +63,7 @@ public <T> T post(
Map<String, List<String>> headers,
String body,
DroveClient.ResponseHandler<T> responseHandler) {
return null;
throw new UnsupportedOperationException("POST is not supported on this transport");
}

@Override
Expand All @@ -31,16 +72,16 @@ public <T> T put(
Map<String, List<String>> headers,
String body,
DroveClient.ResponseHandler<T> responseHandler) {
return null;
throw new UnsupportedOperationException("PUT is not supported on this transport");
}

@Override
public <T> T delete(URI uri, Map<String, List<String>> headers, DroveClient.ResponseHandler<T> responseHandler) {
return null;
throw new UnsupportedOperationException("DELETE is not supported on this transport");
}

@Override
public void close() throws Exception {

//Nothing to do here
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,21 @@
import com.phonepe.drove.client.DroveClientConfig;
import com.phonepe.drove.client.decorators.AuthHeaderDecorator;
import com.phonepe.drove.client.decorators.BasicAuthDecorator;
import com.phonepe.drove.client.transport.httpcomponent.DroveHttpComponentsTransport;
import io.appform.ranger.drove.common.DroveOkHttpTransport;
import io.appform.ranger.drove.config.DroveUpstreamConfig;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
import org.apache.hc.client5.http.ssl.TrustAllStrategy;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
*
Expand All @@ -33,42 +28,45 @@
@UtilityClass
public class RangerDroveUtils {



@SneakyThrows
public static CloseableHttpClient createHttpClient(final DroveUpstreamConfig config) {
public static OkHttpClient createOkHttpClient(final DroveUpstreamConfig config) {
val connectionTimeout
= Objects.requireNonNullElse(config.getConnectionTimeout(),
DroveUpstreamConfig.DEFAULT_CONNECTION_TIMEOUT)
.toJavaDuration();
val cmBuilder = PoolingHttpClientConnectionManagerBuilder.create();
if (config.isInsecure()) {
log.warn("Creating insecure http client for drove endpoint: {}", config.getEndpoints());
cmBuilder.setSSLSocketFactory(SSLConnectionSocketFactoryBuilder.create()
.setSslContext(
SSLContextBuilder.create()
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
.build())
.setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.build());
val operationTimeout = Objects.requireNonNullElse(config.getOperationTimeout(),
DroveUpstreamConfig.DEFAULT_OPERATION_TIMEOUT).toJavaDuration();
val okHttpBuilder = new OkHttpClient.Builder();
if(config.isInsecure()) {
val trustAllCerts = new TrustManager[]{
new X509TrustManager() {
@Override
public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) {
}

@Override
public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) {
}

@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return new java.security.cert.X509Certificate[]{};
}
}
};
val sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
okHttpBuilder.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0]);
okHttpBuilder.hostnameVerifier((hostname, session) -> true);
log.warn("SSL verification turned off for drove transport");
}
val connectionManager = cmBuilder.build();
connectionManager.setDefaultMaxPerRoute(Integer.MAX_VALUE);
connectionManager.setMaxTotal(Integer.MAX_VALUE);
connectionManager.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(connectionTimeout))
.setSocketTimeout(Timeout.of(connectionTimeout))
.setValidateAfterInactivity(TimeValue.ofSeconds(10))
.setTimeToLive(TimeValue.ofHours(1))
.build());
val rc = RequestConfig.custom()
.setConnectionRequestTimeout(Timeout.of(connectionTimeout))
.setResponseTimeout(Timeout.of(Objects.requireNonNullElse(config.getOperationTimeout(),
DroveUpstreamConfig.DEFAULT_OPERATION_TIMEOUT)
.toJavaDuration()))
.build();
return HttpClients.custom()
.disableRedirectHandling()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(rc)
return okHttpBuilder
.callTimeout(operationTimeout)
.connectTimeout(connectionTimeout)
.followRedirects(false)
.connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS))
.build();
}

Expand All @@ -90,11 +88,12 @@ public static <T> DroveCommunicator<T> buildDroveClient(
List.of(new BasicAuthDecorator(config.getUsername(),
config.getPassword()),
new AuthHeaderDecorator(config.getAuthHeader())),
new DroveHttpComponentsTransport(droveConfig,
createHttpClient(config)));
new DroveOkHttpTransport(createOkHttpClient(config)));
// new DroveHttpComponentsTransport(droveConfig,
// createHttpClient(config)));
val apiCommunicator = new DroveApiCommunicator<T>(namespace, config, droveClient, mapper);
return config.isSkipCaching()
? apiCommunicator
: new DroveCachingCommunicator<>(apiCommunicator, namespace, config, droveClient, mapper);
? apiCommunicator
: new DroveCachingCommunicator<>(apiCommunicator, namespace, config, droveClient, mapper);
}
}

0 comments on commit e5b82c6

Please sign in to comment.