From 2761ccb35208ccd46107f2461929ed53ebe44e15 Mon Sep 17 00:00:00 2001 From: Santanu Sinha Date: Fri, 4 Oct 2024 12:49:12 +0530 Subject: [PATCH] Stability fixes and optimizations - Introduced communication execption - Introduced communicator for http - Introduced circuit breakers for drove and http - Staleness check is done centrally at ServiceRegistryUpdater - Bugfix: If upstream is inactive, the refresher will override timestamps with current time so that all nodes don't go stale in one minute - Performance optimization: Introduced ForkJoinPool for hub refresh. Default pool of 20 or processor count whichever is higher. Significant improvement in startup time. --- .../TestSimpleUnshardedServiceFinder.java | 3 +- .../exceptions/CommunicationException.java | 26 +++ .../ServiceRegistryUpdater.java | 58 +++-- .../core/finderhub/ServiceFinderHub.java | 58 +++-- .../ranger/core/model/NodeDataSource.java | 6 +- .../ranger/core/signals/ScheduledSignal.java | 2 +- .../core/finderhub/ServiceFinderHubTest.java | 7 +- ...e.id.IdGeneratorPerfTest.testGenerate.json | 2 +- ...dGeneratorPerfTest.testGenerateBase36.json | 2 +- .../drove/common/DroveApiCommunicator.java | 32 ++- .../common/DroveCachingCommunicator.java | 5 + .../common/DroveCommunicationException.java | 6 +- .../drove/common/DroveCommunicator.java | 2 + .../common/DroveNodeDataStoreConnector.java | 2 +- .../drove/common/DroveOkHttpTransport.java | 2 +- .../servicefinder/DroveNodeDataSource.java | 10 +- .../http/AbstractRangerHttpHubClient.java | 48 +++-- .../client/http/SimpleRangerHttpClient.java | 4 +- .../http/ShardedRangerHttpClientTest.java | 2 +- .../http/SimpleRangerHttpClientTest.java | 2 +- .../http/UnshardedRangerHttpClientTest.java | 2 +- .../common/HttpNodeDataStoreConnector.java | 12 +- .../servicefinder/HttpApiCommunicator.java | 204 ++++++++++++++++++ .../HttpCommunicationException.java | 29 +++ .../http/servicefinder/HttpCommunicator.java | 39 ++++ .../servicefinder/HttpNodeDataSource.java | 71 ++---- .../HttpShardedServiceFinderBuilder.java | 13 +- .../HttpUnshardedServiceFinderBuilider.java | 9 +- .../HttpServiceDataSource.java | 65 +----- .../HttpShardedServiceFinderFactory.java | 9 +- .../HttpUnshardedServiceFinderFactory.java | 10 +- .../serviceprovider/HttpNodeDataSink.java | 10 +- .../HttpShardedServiceProviderBuilder.java | 8 +- .../ranger/http/utils/RangerHttpUtils.java | 30 ++- .../HttpNodeDataStoreConnectorTest.java | 4 +- .../HttpServiceDataSourceTest.java | 2 +- .../server/bundle/RangerHubServerBundle.java | 2 +- .../servicefinder/ZkNodeDataSource.java | 10 +- 38 files changed, 549 insertions(+), 259 deletions(-) create mode 100644 ranger-core/src/main/java/io/appform/ranger/core/exceptions/CommunicationException.java create mode 100644 ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java create mode 100644 ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java create mode 100644 ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java diff --git a/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java b/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java index f167d08b..3cbb0258 100644 --- a/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java +++ b/ranger-client/src/test/java/io/appform/ranger/client/stubs/TestSimpleUnshardedServiceFinder.java @@ -22,11 +22,11 @@ import io.appform.ranger.core.model.Service; import io.appform.ranger.core.model.ServiceNode; import io.appform.ranger.core.units.TestNodeData; -import java.util.Optional; import lombok.Builder; import java.util.Collections; import java.util.List; +import java.util.Optional; @Builder public class TestSimpleUnshardedServiceFinder @@ -51,6 +51,7 @@ public Optional>> refresh(Deserializer> { private final ServiceRegistry serviceRegistry; - private final NodeDataSource nodeDataSource; + private final NodeDataSource nodeDataSource; private final D deserializer; private final Lock checkLock = new ReentrantLock(); @@ -51,7 +54,7 @@ public class ServiceRegistryUpdater> { public ServiceRegistryUpdater( ServiceRegistry serviceRegistry, - NodeDataSource nodeDataSource, + NodeDataSource nodeDataSource, List> signalGenerators, D deserializer) { this.serviceRegistry = serviceRegistry; @@ -70,6 +73,8 @@ public void start() { try { RetryerBuilder.newBuilder() .retryIfResult(r -> null == r || !r) + .retryIfException() + .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) .build() .call(serviceRegistry::isRefreshed); } @@ -81,7 +86,7 @@ public void start() { } public void stop() { - if(null != queryThreadFuture) { + if (null != queryThreadFuture) { executorService.shutdownNow(); } } @@ -125,21 +130,42 @@ private Void queryExecutor() { private void updateRegistry() throws InterruptedException { log.debug("Checking for updates on data source for service: {}", - serviceRegistry.getService().getServiceName()); - if(!nodeDataSource.isActive()) { - log.warn("Node data source seems to be down. Keeping old list for {}", - serviceRegistry.getService().getServiceName()); - return; - } - val nodeList = nodeDataSource.refresh(deserializer).orElse(null); - if (null != nodeList) { - log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(), - serviceRegistry.getService().getServiceName()); - serviceRegistry.updateNodes(nodeList); + serviceRegistry.getService().getServiceName()); + var callFailed = false; + if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some + // time + try { + val nodeList = nodeDataSource.refresh(deserializer).orElse(null); + if (null != nodeList) { + log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(), + serviceRegistry.getService().getServiceName()); + val livenessCheckMaxAge = nodeDataSource.healthcheckZombieCheckThresholdTime(serviceRegistry.getService()); + //Remove all stale nodes before updating. This is done centrally to ensure some data sources + //don't skip this check. Some control is still provided so that they can overload. + serviceRegistry.updateNodes(FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge)); + } + else { + log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}", + serviceRegistry.getService().getServiceName()); + } + } + catch (Exception e) { + log.error("Error updating data from registry. Error: [{}] {}", + e.getClass().getSimpleName(), + e.getMessage()); + callFailed = true; + } } - else { - log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}", + if (!nodeDataSource.isActive() || callFailed) { + val currTime = System.currentTimeMillis(); + log.warn("Node data source seems to be down. Keeping old list for {}." + + " Will update timestamp to keep stale date relevant.", serviceRegistry.getService().getServiceName()); + serviceRegistry.updateNodes(serviceRegistry.nodeList() + .stream() + .filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus()) + .map(node -> node.setLastUpdatedTimeStamp(currTime)) + .toList()); } } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java index 9cc54a25..2611b4f0 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/finderhub/ServiceFinderHub.java @@ -18,7 +18,6 @@ import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import io.appform.ranger.core.finder.ServiceFinder; import io.appform.ranger.core.model.HubConstants; import io.appform.ranger.core.model.Service; @@ -34,11 +33,13 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -68,11 +69,14 @@ public class ServiceFinderHub> { private final ServiceFinderFactory finderFactory; private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false); + private final AtomicInteger poolThreadIndex = new AtomicInteger(0); private Future monitorFuture = null; private final long serviceRefreshDurationMs; private final long hubRefreshDurationMs; + private final ForkJoinPool refresherPool; + public ServiceFinderHub( ServiceDataSource serviceDataSource, ServiceFinderFactory finderFactory @@ -88,12 +92,13 @@ public ServiceFinderHub( long hubRefreshDurationMs) { this.serviceDataSource = serviceDataSource; this.finderFactory = finderFactory; - this.serviceRefreshDurationMs = serviceRefreshDurationMs; - this.hubRefreshDurationMs = hubRefreshDurationMs; + this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs; + this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs; this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater", () -> null, Collections.emptyList(), 10_000)); + this.refresherPool = createRefresherPool(); } public Optional> finder(final Service service) { @@ -158,6 +163,18 @@ public void updateAvailable() { } } + private ForkJoinPool createRefresherPool() { + return new ForkJoinPool( + Math.max(20, Runtime.getRuntime().availableProcessors()), + pool -> { + val thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setName("hub-refresher-" + poolThreadIndex.getAndIncrement()); + return thread; + }, + null, + false); + } + private void monitor() { while (true) { try { @@ -185,7 +202,7 @@ private void updateRegistry() { return; } alreadyUpdating.set(true); - final Map> updatedFinders = new HashMap<>(); + val updatedFinders = new ConcurrentHashMap>(); try { val services = serviceDataSource.services(); if (services.isEmpty()) { @@ -193,9 +210,10 @@ private void updateRegistry() { return; } val knownServiceFinders = finders.get(); - val newFinders = services.stream() - .filter(service -> !knownServiceFinders.containsKey(service)) - .collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder)); + val newFinders = refresherPool.submit(() -> services.parallelStream() + .filter(service -> !knownServiceFinders.containsKey(service)) + .collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder))) + .get(); val matchingServices = knownServiceFinders.entrySet() .stream() .filter(entry -> services.contains(entry.getKey())) @@ -208,6 +226,10 @@ private void updateRegistry() { updatedFinders.putAll(matchingServices); finders.set(updatedFinders); } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Refresh interrupted."); + } catch (Exception e) { log.error("Error updating service list. Will maintain older list", e); } @@ -217,29 +239,33 @@ private void updateRegistry() { } private void waitTillHubIsReady() { + val services = serviceDataSource.services(); + val timeToRefresh = Math.max(hubRefreshDurationMs, + (serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism()); + if (timeToRefresh != hubRefreshDurationMs) { + log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " + + "provided time would have been insufficient to refresh {} services.", + timeToRefresh, hubRefreshDurationMs, services.size()); + } val hubRefresher = CompletableFuture.allOf( - serviceDataSource.services() - .stream() + services.stream() .map(service -> CompletableFuture.supplyAsync((Supplier) () -> { waitTillServiceIsReady(service); return null; - })).toArray(CompletableFuture[]::new) - ); + })).toArray(CompletableFuture[]::new)); try { - hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS); + hubRefresher.get(timeToRefresh, TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); Exceptions.illegalState("Refresh interrupted"); } catch (TimeoutException e) { - Exceptions - .illegalState("Couldn't perform service hub refresh at this time. " + + Exceptions.illegalState("Couldn't perform service hub refresh at this time. " + "Refresh exceeded the start up time specified"); } catch (Exception e) { - Exceptions - .illegalState("Couldn't perform hub refresh at this time", e); + Exceptions.illegalState("Couldn't perform hub refresh at this time", e); } } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java b/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java index bc4e3da1..86276b2e 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/model/NodeDataSource.java @@ -15,6 +15,8 @@ */ package io.appform.ranger.core.model; +import io.appform.ranger.core.exceptions.CommunicationException; + import java.util.List; import java.util.Optional; @@ -24,9 +26,9 @@ @SuppressWarnings("unused") public interface NodeDataSource> extends NodeDataStoreConnector { - Optional>> refresh(D deserializer); + Optional>> refresh(D deserializer) throws CommunicationException; default long healthcheckZombieCheckThresholdTime(Service service) { - return System.currentTimeMillis() - 60000; //1 Minute + return isActive() ? (System.currentTimeMillis() - 60000) : 0; //1 Minute } } diff --git a/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java b/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java index 150514d4..a35342fb 100644 --- a/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java +++ b/ranger-core/src/main/java/io/appform/ranger/core/signals/ScheduledSignal.java @@ -36,7 +36,7 @@ public class ScheduledSignal extends Signal { private final String name; private final long refreshIntervalMillis; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private ScheduledFuture scheduledFuture = null; diff --git a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java index e5e70c74..3778ee5c 100644 --- a/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java +++ b/ranger-core/src/test/java/io/appform/ranger/core/finderhub/ServiceFinderHubTest.java @@ -26,8 +26,6 @@ import io.appform.ranger.core.healthcheck.HealthcheckStatus; import io.appform.ranger.core.model.*; import io.appform.ranger.core.units.TestNodeData; -import java.util.Optional; - import io.appform.ranger.core.utils.RangerTestUtils; import lombok.val; import org.junit.jupiter.api.Assertions; @@ -36,6 +34,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; class ServiceFinderHubTest { @@ -83,7 +82,7 @@ void testDelayedServiceAddition() { .withServiceName(service.getServiceName()) .withDeserializer(new Deserializer() {}) .withSleepDuration(1) - .build(), 2_000, 5_000 + .build(), 5_000, 5_000 ); serviceFinderHub.start(); Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent()); @@ -141,7 +140,7 @@ private static class TestNodeDataSource implements NodeDataSource>> refresh(Deserializer deserializer) { val list = new ArrayList>(); - list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, 10L, "HTTP")); + list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, Long.MAX_VALUE, "HTTP")); return Optional.of(list); } diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json index 005b0dd9..f0855806 100644 --- a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json +++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerate.json @@ -4,5 +4,5 @@ "iterations" : 4, "threads" : 1, "forks" : 3, - "mean_ops" : 787544.107063881 + "mean_ops" : 812476.3197574528 } \ No newline at end of file diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json index a9486eef..179cec48 100644 --- a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json +++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.IdGeneratorPerfTest.testGenerateBase36.json @@ -4,5 +4,5 @@ "iterations" : 4, "threads" : 1, "forks" : 3, - "mean_ops" : 594383.3501367184 + "mean_ops" : 592802.8071907263 } \ No newline at end of file diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java index ee40d192..0bfdeee7 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveApiCommunicator.java @@ -37,6 +37,11 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -49,6 +54,8 @@ public class DroveApiCommunicator implements DroveCommunicator { private final DroveUpstreamConfig config; private final DroveClient droveClient; private final ObjectMapper mapper; + private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true); + private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor(); public DroveApiCommunicator( String namespace, DroveUpstreamConfig config, @@ -58,6 +65,7 @@ public DroveApiCommunicator( this.config = config; this.droveClient = droveClient; this.mapper = mapper; + resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS); } @Override @@ -70,6 +78,11 @@ public Optional leader() { return droveClient.leader(); } + @Override + public boolean healthy() { + return upstreamAvailable.get(); + } + @Override public List services() { log.debug("Loading services list"); @@ -77,7 +90,7 @@ public List services() { config.getSkipTagName(), DroveUpstreamConfig.DEFAULT_SKIP_TAG_NAME); val url = "/apis/v1/applications"; - return droveClient.execute( + return executeRemoteCall(() -> droveClient.execute( new DroveClient.Request(DroveClient.Method.GET, url), new DroveClient.ResponseHandler<>() { @Override @@ -111,7 +124,7 @@ public List handle(DroveClient.Response response) throws Exception { .distinct() .toList(); } - }); + })); } @Override @@ -124,7 +137,7 @@ public Map> listNodes(Iterable .toList())); logUrl(url); - return droveClient.execute(new DroveClient.Request(DroveClient.Method.GET, url), + return executeRemoteCall(() -> droveClient.execute(new DroveClient.Request(DroveClient.Method.GET, url), new DroveClient.ResponseHandler<>() { @Override public Map> defaultValue() { @@ -150,7 +163,18 @@ public Map> handle(DroveClient.Response response) appInfo -> new Service(namespace, appInfo.getAppName()), Collectors.toList())); } - }); + })); + } + + private T executeRemoteCall(Supplier executor) { + upstreamAvailable.set(true); + try { + return executor.get(); + } + catch (DroveCommunicationException e) { + upstreamAvailable.set(false); + throw e; + } } private static void throwDroveCommError(DroveClient.Response response) { diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java index 94e08180..892a39ec 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCachingCommunicator.java @@ -110,6 +110,11 @@ public Optional leader() { return root.leader(); } + @Override + public boolean healthy() { + return root.healthy(); + } + @Override public List services() { return root.services(); diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java index 858e309c..4c25d372 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicationException.java @@ -16,10 +16,12 @@ package io.appform.ranger.drove.common; +import io.appform.ranger.core.exceptions.CommunicationException; + /** - * + * Thrown in case there is an issue communicating with the drove upstream. */ -public class DroveCommunicationException extends RuntimeException { +public class DroveCommunicationException extends CommunicationException { public DroveCommunicationException(final String message) { super(message); } diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java index bb830663..bd664798 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveCommunicator.java @@ -30,6 +30,8 @@ public interface DroveCommunicator extends AutoCloseable { Optional leader(); + boolean healthy(); + List services(); default List listNodes(final Service service) { diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java index 49abbf99..5b8d6c3e 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveNodeDataStoreConnector.java @@ -62,7 +62,7 @@ public void stop() { @Override public boolean isActive() { - return droveClient.leader().isPresent(); + return droveClient.healthy(); } } diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java index a84a8045..928a91a1 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/common/DroveOkHttpTransport.java @@ -68,7 +68,7 @@ public T get( return responseHandler.handle(droveResponse); } catch (Exception e) { - log.error("Error calling drove: " + e.getMessage(), e); + log.error("Error calling drove: {}. Error: {}", e.getMessage(), e.getClass().getSimpleName()); throw new DroveCommunicationException(e.getMessage()); } } diff --git a/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java b/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java index a202565d..dc266d2a 100644 --- a/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java +++ b/ranger-drove/src/main/java/io/appform/ranger/drove/servicefinder/DroveNodeDataSource.java @@ -20,7 +20,6 @@ import io.appform.ranger.core.model.NodeDataSource; import io.appform.ranger.core.model.Service; import io.appform.ranger.core.model.ServiceNode; -import io.appform.ranger.core.util.FinderUtils; import io.appform.ranger.drove.common.DroveCommunicationException; import io.appform.ranger.drove.common.DroveCommunicator; import io.appform.ranger.drove.common.DroveNodeDataStoreConnector; @@ -58,20 +57,17 @@ public Optional>> refresh(D deserializer) { val exposedAppInfos = droveClient.listNodes(service); val nodes = deserializer.deserialize( Objects.requireNonNull(exposedAppInfos, "Unexpected empty response from server")); - return Optional.of(FinderUtils.filterValidNodes( - service, - nodes, - healthcheckZombieCheckThresholdTime(service))); + return Optional.of(nodes); } catch (DroveCommunicationException e) { log.error("Drove communication error", e); return Optional.empty(); //In case of refresh failure, maintain old list } - } @Override public boolean isActive() { - return droveClient.leader().isPresent(); + return droveClient.healthy(); } + } diff --git a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java index c2dfc2b4..139363c3 100644 --- a/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java +++ b/ranger-http-client/src/main/java/io/appform/ranger/client/http/AbstractRangerHttpHubClient.java @@ -23,6 +23,7 @@ import io.appform.ranger.core.model.ServiceRegistry; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import io.appform.ranger.http.servicefinderhub.HttpServiceDataSource; import io.appform.ranger.http.servicefinderhub.HttpServiceFinderHubBuilder; import io.appform.ranger.http.utils.RangerHttpUtils; @@ -30,37 +31,40 @@ import lombok.Getter; import lombok.experimental.SuperBuilder; import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; import java.util.Objects; @Slf4j @Getter @SuperBuilder -public abstract class AbstractRangerHttpHubClient, D extends HTTPResponseDataDeserializer> - extends AbstractRangerHubClient { +public abstract class AbstractRangerHttpHubClient, + D extends HTTPResponseDataDeserializer> + extends AbstractRangerHubClient { - private final HttpClientConfig clientConfig; + private final HttpClientConfig clientConfig; - private final OkHttpClient httpClient; + private final HttpCommunicator httpClient; - @Builder.Default - private final ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector<>(); + @Builder.Default + private final ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector<>(); - @Override - protected ServiceDataSource getDefaultDataSource() { - return new HttpServiceDataSource<>(clientConfig, getMapper(), Objects.requireNonNullElseGet(getHttpClient(), - () -> RangerHttpUtils.httpClient(clientConfig))); - } + @Override + protected ServiceDataSource getDefaultDataSource() { + return new HttpServiceDataSource<>(clientConfig, + Objects.requireNonNullElseGet(getHttpClient(), + () -> RangerHttpUtils.httpClient( + clientConfig, + getMapper()))); + } - @Override - protected ServiceFinderHub buildHub() { - return new HttpServiceFinderHubBuilder() - .withServiceDataSource(getServiceDataSource()) - .withServiceFinderFactory(getFinderFactory()) - .withRefreshFrequencyMs(getNodeRefreshTimeMs()) - .withHubRefreshDuration(getHubRefreshDurationMs()) - .withServiceRefreshDuration(getServiceRefreshDurationMs()) - .build(); - } + @Override + protected ServiceFinderHub buildHub() { + return new HttpServiceFinderHubBuilder() + .withServiceDataSource(getServiceDataSource()) + .withServiceFinderFactory(getFinderFactory()) + .withRefreshFrequencyMs(getNodeRefreshTimeMs()) + .withHubRefreshDuration(getHubRefreshDurationMs()) + .withServiceRefreshDuration(getServiceRefreshDurationMs()) + .build(); + } } diff --git a/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java b/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java index f71e3d96..a8f307ec 100644 --- a/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java +++ b/ranger-http-client/src/main/java/io/appform/ranger/client/http/SimpleRangerHttpClient.java @@ -25,11 +25,11 @@ import io.appform.ranger.http.HttpServiceFinderBuilders; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import lombok.Builder; import lombok.Getter; import lombok.experimental.SuperBuilder; import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; @Slf4j @SuperBuilder @@ -40,7 +40,7 @@ public class SimpleRangerHttpClient extends AbstractRangerClient httpClient; private final HTTPResponseDataDeserializer deserializer; @Builder.Default private final ShardSelector> shardSelector = new ListShardSelector<>(); diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java index 90bea638..956c5ff6 100644 --- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java +++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/ShardedRangerHttpClientTest.java @@ -29,7 +29,7 @@ void testShardedHttpHubClient(){ val httpClientConfig = getHttpClientConfig(); val client = ShardedRangerHttpHubClient.builder() .clientConfig(httpClientConfig) - .httpClient(RangerHttpUtils.httpClient(httpClientConfig)) + .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper())) .namespace("test-n") .deserializer(this::read) .mapper(getObjectMapper()) diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java index c09b3193..64f6b3cc 100644 --- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java +++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/SimpleRangerHttpClientTest.java @@ -30,7 +30,7 @@ void testSimpleHttpRangerClient(){ val client = SimpleRangerHttpClient.builder() .clientConfig(httpClientConfig) .mapper(getObjectMapper()) - .httpClient(RangerHttpUtils.httpClient(httpClientConfig)) + .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper())) .deserializer(this::read) .namespace("test-n") .serviceName("test-s") diff --git a/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java b/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java index c9cfd533..b4b64325 100644 --- a/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java +++ b/ranger-http-client/src/test/java/io/appform/ranger/client/http/UnshardedRangerHttpClientTest.java @@ -29,7 +29,7 @@ void testUnshardedRangerHubClient(){ val httpClientConfig = getHttpClientConfig(); val client = UnshardedRangerHttpHubClient.builder() .clientConfig(httpClientConfig) - .httpClient(RangerHttpUtils.httpClient(httpClientConfig)) + .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getObjectMapper())) .namespace("test-n") .deserializer(this::read) .mapper(getObjectMapper()) diff --git a/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java b/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java index aaece10c..210a6e02 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/common/HttpNodeDataStoreConnector.java @@ -15,11 +15,10 @@ */ package io.appform.ranger.http.common; -import com.fasterxml.jackson.databind.ObjectMapper; import io.appform.ranger.core.model.NodeDataStoreConnector; import io.appform.ranger.http.config.HttpClientConfig; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; /** * @@ -28,16 +27,13 @@ public class HttpNodeDataStoreConnector implements NodeDataStoreConnector { protected final HttpClientConfig config; - protected final ObjectMapper mapper; - protected final OkHttpClient httpClient; + protected final HttpCommunicator httpCommunicator; public HttpNodeDataStoreConnector( final HttpClientConfig config, - final ObjectMapper mapper, - final OkHttpClient httpClient) { - this.httpClient = httpClient; + final HttpCommunicator httpCommunicator) { + this.httpCommunicator = httpCommunicator; this.config = config; - this.mapper = mapper; } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java new file mode 100644 index 00000000..4b6247e9 --- /dev/null +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpApiCommunicator.java @@ -0,0 +1,204 @@ +/* + * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appform.ranger.http.servicefinder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.appform.ranger.core.model.Service; +import io.appform.ranger.core.model.ServiceNode; +import io.appform.ranger.http.config.HttpClientConfig; +import io.appform.ranger.http.model.ServiceDataSourceResponse; +import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * Direct api based communication + */ +@Slf4j +public class HttpApiCommunicator implements HttpCommunicator { + private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true); + private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor(); + + @Getter + private final OkHttpClient httpClient; + private final HttpClientConfig config; + private final ObjectMapper mapper; + + public HttpApiCommunicator(OkHttpClient httpClient, HttpClientConfig config, ObjectMapper mapper) { + Objects.requireNonNull(mapper, "mapper has not been set for node data"); + this.httpClient = httpClient; + this.config = config; + this.mapper = mapper; + resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS); + } + + @Override + public boolean healthy() { + return upstreamAvailable.get(); + } + + @Override + public Set services() { + return executeRemoteCall(() -> { + val httpUrl = new HttpUrl.Builder() + .scheme(config.isSecure() ? "https" : "http") + .host(config.getHost()) + .port(config.getPort() == 0 ? defaultPort() : config.getPort()) + .encodedPath("/ranger/services/v1") + .build(); + val request = new Request.Builder() + .url(httpUrl) + .get() + .build(); + + try (val response = httpClient.newCall(request).execute()) { + if (response.isSuccessful()) { + return parseServices(response, httpUrl); + } + else { + throw new HttpCommunicationException( + "Http call to returned a failure response. Url:" + httpUrl + " status: " + response.code()); + } + } + catch (Exception e) { + throw new HttpCommunicationException( + "Error parsing the response from server for url: " + httpUrl + + " with exception " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + }); + } + + @Override + public List> listNodes( + Service service, + HTTPResponseDataDeserializer deserializer) { + return executeRemoteCall(() -> { + val url = String.format("/ranger/nodes/v1/%s/%s", service.getNamespace(), service.getServiceName()); + + log.debug("Refreshing the node list from url {}", url); + val httpUrl = new HttpUrl.Builder() + .scheme(config.isSecure() ? "https" : "http") + .host(config.getHost()) + .port(config.getPort() == 0 ? defaultPort() : config.getPort()) + .encodedPath(url) + .build(); + val request = new Request.Builder() + .url(httpUrl) + .get() + .build(); + + try (val response = httpClient.newCall(request).execute()) { + if (response.isSuccessful()) { + return parseNodeList(deserializer, response, httpUrl); + } + else { + throw new HttpCommunicationException("HTTP call failed. url: " + httpUrl + " status: " + response.code()); + } + } + catch (Exception e) { + throw new HttpCommunicationException("Error getting node data from the http endpoint: " + httpUrl + + ". Error: " + e.getMessage()); + } + }); + } + + @Override + public void close() throws Exception { + + } + + private U executeRemoteCall(Supplier executor) { + upstreamAvailable.set(true); + try { + return executor.get(); + } + catch (HttpCommunicationException e) { + upstreamAvailable.set(false); + throw e; + } + } + + private int defaultPort() { + return config.isSecure() + ? 443 + : 80; + } + + private Set parseServices(Response response, HttpUrl httpUrl) { + try (val body = response.body()) { + if (null == body) { + throw new HttpCommunicationException("Empty response body from: " + httpUrl); + } + else { + val bytes = body.bytes(); + val serviceDataSourceResponse = mapper.readValue(bytes, ServiceDataSourceResponse.class); + if (serviceDataSourceResponse.valid()) { + return serviceDataSourceResponse.getData(); + } + else { + throw new HttpCommunicationException( + "Http call to returned a failure response. Url:" + httpUrl + " data: " + serviceDataSourceResponse); + } + } + } + catch (Exception e) { + throw new HttpCommunicationException( + "Error reading data from server. Url: " + httpUrl + "Error: " + e.getMessage()); + } + } + + private static List> parseNodeList( + HTTPResponseDataDeserializer deserializer, + Response response, + HttpUrl httpUrl) { + try (val body = response.body()) { + if (null == body) { + log.warn("HTTP call to {} returned empty body", httpUrl); + throw new HttpCommunicationException("Empty response received for call to " + httpUrl); + } + else { + val bytes = body.bytes(); + val serviceNodesResponse = deserializer.deserialize(bytes); + if (serviceNodesResponse.valid()) { + return serviceNodesResponse.getData(); + } + else { + throw new HttpCommunicationException( + "Http call returned null nodes for url: " + httpUrl + " response: " + serviceNodesResponse); + } + } + } + catch (Exception e) { + throw new HttpCommunicationException( + "Error parsing node data from server. Url: " + httpUrl + "Error: " + e.getMessage()); + } + } +} diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java new file mode 100644 index 00000000..f8a2586e --- /dev/null +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicationException.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appform.ranger.http.servicefinder; + +import io.appform.ranger.core.exceptions.CommunicationException; + +/** + * Thrown in case there is an issue communicating with the HTTP upstream. + + */ +public class HttpCommunicationException extends CommunicationException { + public HttpCommunicationException(final String message) { + super(message); + } +} diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java new file mode 100644 index 00000000..22803e68 --- /dev/null +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpCommunicator.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Authors, Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appform.ranger.http.servicefinder; + +import io.appform.ranger.core.model.Service; +import io.appform.ranger.core.model.ServiceNode; +import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import okhttp3.OkHttpClient; + +import java.util.List; +import java.util.Set; + +/** + * Interface for communicator to upstream + */ +public interface HttpCommunicator extends AutoCloseable { + boolean healthy(); + + Set services(); + + List> listNodes(final Service service, + HTTPResponseDataDeserializer deserializer); + + OkHttpClient getHttpClient(); +} diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java index 9ee8b155..d88f94a6 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpNodeDataSource.java @@ -15,24 +15,21 @@ */ package io.appform.ranger.http.servicefinder; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import io.appform.ranger.core.model.NodeDataSource; import io.appform.ranger.core.model.Service; import io.appform.ranger.core.model.ServiceNode; -import io.appform.ranger.core.util.FinderUtils; import io.appform.ranger.http.common.HttpNodeDataStoreConnector; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; import lombok.extern.slf4j.Slf4j; -import lombok.val; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -41,67 +38,27 @@ public class HttpNodeDataSource> extends HttpNodeDataStoreConnector implements NodeDataSource { private final Service service; + private final AtomicBoolean upstreamAvailable = new AtomicBoolean(true); + private final ScheduledExecutorService resetter = Executors.newSingleThreadScheduledExecutor(); public HttpNodeDataSource( final Service service, final HttpClientConfig config, - final ObjectMapper mapper, - final OkHttpClient httpClient) { - super(config, mapper, httpClient); + final HttpCommunicator httpCommunicator) { + super(config, httpCommunicator); + Objects.requireNonNull(config, "client config has not been set for node data"); + Objects.requireNonNull(httpCommunicator, "http communicator has not been set for node data"); this.service = service; + resetter.scheduleWithFixedDelay(() -> upstreamAvailable.set(true), 0, 60, TimeUnit.SECONDS); } @Override public Optional>> refresh(D deserializer) { - Preconditions.checkNotNull(config, "client config has not been set for node data"); - Preconditions.checkNotNull(mapper, "mapper has not been set for node data"); - val url = String.format("/ranger/nodes/v1/%s/%s", service.getNamespace(), service.getServiceName()); - - log.debug("Refreshing the node list from url {}", url); - val httpUrl = new HttpUrl.Builder() - .scheme(config.isSecure() - ? "https" - : "http") - .host(config.getHost()) - .port(config.getPort() == 0 - ? defaultPort() - : config.getPort()) - .encodedPath(url) - .build(); - val request = new Request.Builder() - .url(httpUrl) - .get() - .build(); - - try (val response = httpClient.newCall(request).execute()) { - if (response.isSuccessful()) { - try (val body = response.body()) { - if (null == body) { - log.warn("HTTP call to {} returned empty body", httpUrl); - } else { - val bytes = body.bytes(); - val serviceNodesResponse = deserializer.deserialize(bytes); - if(serviceNodesResponse.valid()){ - return Optional.of(FinderUtils.filterValidNodes( - service, - serviceNodesResponse.getData(), - healthcheckZombieCheckThresholdTime(service))); - } else{ - log.warn("Http call to {} returned a failure response with response {}", httpUrl, serviceNodesResponse); - } - } - } - } else { - log.warn("HTTP call to {} returned: {}", httpUrl, response.code()); - } - } catch (IOException e) { - log.error("Error getting service data from the http endPoint: ", e); - } - return Optional.empty(); + return Optional.of(httpCommunicator.listNodes(service, deserializer)); } @Override public boolean isActive() { - return true; + return upstreamAvailable.get(); } } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java index 2b6fe2f3..44d3cf6e 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpShardedServiceFinderBuilder.java @@ -23,7 +23,6 @@ import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; import io.appform.ranger.http.utils.RangerHttpUtils; -import okhttp3.OkHttpClient; import java.util.Objects; @@ -34,7 +33,7 @@ public class HttpShardedServiceFinderBuilder extends SimpleShardedServiceFind private HttpClientConfig clientConfig; private ObjectMapper mapper; - private OkHttpClient httpClient; + private HttpCommunicator httpCommunicator; public HttpShardedServiceFinderBuilder withClientConfig(final HttpClientConfig clientConfig) { this.clientConfig = clientConfig; @@ -46,8 +45,8 @@ public HttpShardedServiceFinderBuilder withObjectMapper(final ObjectMapper ma return this; } - public HttpShardedServiceFinderBuilder withHttpClient(final OkHttpClient httpClient){ - this.httpClient = httpClient; + public HttpShardedServiceFinderBuilder withHttpCommunicator(final HttpCommunicator httpCommunicator){ + this.httpCommunicator = httpCommunicator; return this; } @@ -58,9 +57,9 @@ public SimpleShardedServiceFinder build() { @Override protected NodeDataSource> dataSource(Service service) { - return new HttpNodeDataSource<>(service, clientConfig, mapper, - Objects.requireNonNullElseGet(httpClient, - () -> RangerHttpUtils.httpClient(clientConfig))); + return new HttpNodeDataSource<>(service, clientConfig, + Objects.requireNonNullElseGet(httpCommunicator, + () -> RangerHttpUtils.httpClient(clientConfig, mapper))); } } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java index 5ae1b73e..fae76ea2 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinder/HttpUnshardedServiceFinderBuilider.java @@ -23,7 +23,6 @@ import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; import io.appform.ranger.http.utils.RangerHttpUtils; -import okhttp3.OkHttpClient; import java.util.Objects; @@ -32,7 +31,7 @@ public class HttpUnshardedServiceFinderBuilider private HttpClientConfig clientConfig; private ObjectMapper mapper; - private OkHttpClient httpClient; + private HttpCommunicator httpClient; public HttpUnshardedServiceFinderBuilider withClientConfig(final HttpClientConfig clientConfig) { this.clientConfig = clientConfig; @@ -44,7 +43,7 @@ public HttpUnshardedServiceFinderBuilider withObjectMapper(final ObjectMapper return this; } - public HttpUnshardedServiceFinderBuilider withHttpClient(final OkHttpClient httpClient) { + public HttpUnshardedServiceFinderBuilider withHttpClient(final HttpCommunicator httpClient) { this.httpClient = httpClient; return this; } @@ -56,9 +55,9 @@ public SimpleUnshardedServiceFinder build() { @Override protected NodeDataSource> dataSource(Service service) { - return new HttpNodeDataSource<>(service, clientConfig, mapper, + return new HttpNodeDataSource<>(service, clientConfig, Objects.requireNonNullElseGet(httpClient, - () -> RangerHttpUtils.httpClient(clientConfig))); + () -> RangerHttpUtils.httpClient(clientConfig, mapper))); } } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java index a48e0c10..a52cc214 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSource.java @@ -15,79 +15,26 @@ */ package io.appform.ranger.http.servicefinderhub; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import io.appform.ranger.core.finderhub.ServiceDataSource; import io.appform.ranger.core.model.Service; import io.appform.ranger.http.common.HttpNodeDataStoreConnector; import io.appform.ranger.http.config.HttpClientConfig; -import io.appform.ranger.http.model.ServiceDataSourceResponse; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import lombok.extern.slf4j.Slf4j; -import lombok.val; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import java.io.IOException; import java.util.Collection; -import java.util.Collections; +import java.util.Objects; @Slf4j public class HttpServiceDataSource extends HttpNodeDataStoreConnector implements ServiceDataSource { - public HttpServiceDataSource(HttpClientConfig config, ObjectMapper mapper, OkHttpClient httpClient) { - super(config, mapper, httpClient); + public HttpServiceDataSource(HttpClientConfig config, HttpCommunicator httpClient) { + super(config, httpClient); } @Override public Collection services() { - Preconditions.checkNotNull(config, "client config has not been set for node data"); - Preconditions.checkNotNull(mapper, "mapper has not been set for node data"); - - val httpUrl = new HttpUrl.Builder() - .scheme(config.isSecure() - ? "https" - : "http") - .host(config.getHost()) - .port(config.getPort() == 0 - ? defaultPort() - : config.getPort()) - .encodedPath("/ranger/services/v1") - .build(); - val request = new Request.Builder() - .url(httpUrl) - .get() - .build(); - - try (val response = httpClient.newCall(request).execute()) { - if (response.isSuccessful()) { - try (val body = response.body()) { - if (null == body) { - log.warn("HTTP call to {} returned empty body", httpUrl); - } - else { - val bytes = body.bytes(); - val serviceDataSourceResponse = mapper.readValue(bytes, ServiceDataSourceResponse.class); - if (serviceDataSourceResponse.valid()) { - return serviceDataSourceResponse.getData(); - } - else { - log.warn("Http call to {} returned a failure response with data {}", - httpUrl, - serviceDataSourceResponse); - } - } - } - } - else { - log.warn("HTTP call to {} returned code: {}", httpUrl, response.code()); - } - } - catch (IOException e) { - log.info("Error parsing the response from server for : {} with exception {}", httpUrl, e); - } - - log.error("No data returned from server: " + httpUrl); - return Collections.emptySet(); + Objects.requireNonNull(config, "client config has not been set for node data"); + return httpCommunicator.services(); } } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java index 557c43c7..b1b1f508 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpShardedServiceFinderFactory.java @@ -24,15 +24,15 @@ import io.appform.ranger.core.model.ShardSelector; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import io.appform.ranger.http.servicefinder.HttpShardedServiceFinderBuilder; import lombok.Builder; import lombok.val; -import okhttp3.OkHttpClient; public class HttpShardedServiceFinderFactory implements ServiceFinderFactory> { private final HttpClientConfig clientConfig; - private final OkHttpClient httpClient; + private final HttpCommunicator httpClient; private final ObjectMapper mapper; private final HTTPResponseDataDeserializer deserializer; private final ShardSelector> shardSelector; @@ -41,7 +41,8 @@ public class HttpShardedServiceFinderFactory implements ServiceFinderFactory @Builder public HttpShardedServiceFinderFactory( - HttpClientConfig httpClientConfig, OkHttpClient httpClient, + HttpClientConfig httpClientConfig, + HttpCommunicator httpClient, ObjectMapper mapper, HTTPResponseDataDeserializer deserializer, ShardSelector> shardSelector, @@ -61,8 +62,8 @@ public HttpShardedServiceFinderFactory( public ServiceFinder> buildFinder(Service service) { val serviceFinder = new HttpShardedServiceFinderBuilder() .withClientConfig(clientConfig) - .withHttpClient(httpClient) .withObjectMapper(mapper) + .withHttpCommunicator(httpClient) .withDeserializer(deserializer) .withNamespace(service.getNamespace()) .withServiceName(service.getServiceName()) diff --git a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java index 321e0757..51c8cb02 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/servicefinderhub/HttpUnshardedServiceFinderFactory.java @@ -24,16 +24,16 @@ import io.appform.ranger.core.model.ShardSelector; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HTTPResponseDataDeserializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import io.appform.ranger.http.servicefinder.HttpUnshardedServiceFinderBuilider; import lombok.Builder; import lombok.val; -import okhttp3.OkHttpClient; public class HttpUnshardedServiceFinderFactory implements ServiceFinderFactory> { private final HttpClientConfig clientConfig; private final ObjectMapper mapper; - private final OkHttpClient httpClient; + private final HttpCommunicator httpClient; private final HTTPResponseDataDeserializer deserializer; private final ShardSelector> shardSelector; private final ServiceNodeSelector nodeSelector; @@ -42,12 +42,12 @@ public class HttpUnshardedServiceFinderFactory implements ServiceFinderFactor @Builder public HttpUnshardedServiceFinderFactory( HttpClientConfig httpClientConfig, - ObjectMapper mapper, OkHttpClient httpClient, + ObjectMapper mapper, + HttpCommunicator httpClient, HTTPResponseDataDeserializer deserializer, ShardSelector> shardSelector, ServiceNodeSelector nodeSelector, - int nodeRefreshIntervalMs) - { + int nodeRefreshIntervalMs) { this.clientConfig = httpClientConfig; this.mapper = mapper; this.httpClient = httpClient; diff --git a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java index 176fbb26..627d8676 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpNodeDataSink.java @@ -26,10 +26,10 @@ import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.model.ServiceRegistrationResponse; import io.appform.ranger.http.serde.HttpRequestDataSerializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import lombok.extern.slf4j.Slf4j; import lombok.val; import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; @@ -40,10 +40,12 @@ public class HttpNodeDataSink> extends HttpNodeDataStoreConnector implements NodeDataSink { private final Service service; + private final ObjectMapper mapper; - public HttpNodeDataSink(Service service, HttpClientConfig config, ObjectMapper mapper, OkHttpClient httpClient) { - super(config, mapper, httpClient); + public HttpNodeDataSink(Service service, HttpClientConfig config, ObjectMapper mapper, HttpCommunicator httpClient) { + super(config, httpClient); this.service = service; + this.mapper = mapper; } @Override @@ -77,7 +79,7 @@ private Optional> registerService(HttpUrl httpUrl .url(httpUrl) .post(requestBody) .build(); - try (val response = httpClient.newCall(request).execute()) { + try (val response = httpCommunicator.getHttpClient().newCall(request).execute()) { if (response.isSuccessful()) { try (val body = response.body()) { if (null == body) { diff --git a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java index 393a2934..4caec43d 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/serviceprovider/HttpShardedServiceProviderBuilder.java @@ -22,9 +22,9 @@ import io.appform.ranger.core.serviceprovider.ServiceProvider; import io.appform.ranger.http.config.HttpClientConfig; import io.appform.ranger.http.serde.HttpRequestDataSerializer; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import io.appform.ranger.http.utils.RangerHttpUtils; import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; import java.util.Objects; @@ -33,7 +33,7 @@ public class HttpShardedServiceProviderBuilder extends BaseServiceProviderBui private HttpClientConfig clientConfig; private ObjectMapper mapper; - private OkHttpClient httpClient; + private HttpCommunicator httpClient; public HttpShardedServiceProviderBuilder withClientConfiguration(final HttpClientConfig clientConfig) { this.clientConfig = clientConfig; @@ -45,7 +45,7 @@ public HttpShardedServiceProviderBuilder withObjectMapper(final ObjectMapper return this; } - public HttpShardedServiceProviderBuilder withHttpClient(final OkHttpClient httpClient) { + public HttpShardedServiceProviderBuilder withHttpClient(final HttpCommunicator httpClient) { this.httpClient = httpClient; return this; } @@ -59,6 +59,6 @@ public ServiceProvider> build() { protected NodeDataSink> dataSink(Service service) { return new HttpNodeDataSink<>(service, clientConfig, mapper, Objects.requireNonNullElseGet(httpClient, - () -> RangerHttpUtils.httpClient(clientConfig))); + () -> RangerHttpUtils.httpClient(clientConfig, mapper))); } } diff --git a/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java b/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java index de7c85d2..c3aaf105 100644 --- a/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java +++ b/ranger-http/src/main/java/io/appform/ranger/http/utils/RangerHttpUtils.java @@ -16,7 +16,10 @@ package io.appform.ranger.http.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import io.appform.ranger.http.config.HttpClientConfig; +import io.appform.ranger.http.servicefinder.HttpApiCommunicator; +import io.appform.ranger.http.servicefinder.HttpCommunicator; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import okhttp3.ConnectionPool; @@ -30,17 +33,22 @@ @UtilityClass @Slf4j public class RangerHttpUtils { - public static OkHttpClient httpClient(final HttpClientConfig config) { + public static HttpCommunicator httpClient( + final HttpClientConfig config, + final ObjectMapper mapper) { log.info("Creating http client for {}:{}", config.getHost(), config.getPort()); - return new OkHttpClient.Builder() - .callTimeout(config.getOperationTimeoutMs() == 0 - ? 3000 - : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS) - .connectTimeout(config.getConnectionTimeoutMs() == 0 - ? 3000 - : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) - .followRedirects(true) - .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS)) - .build(); + return new HttpApiCommunicator<>( + new OkHttpClient.Builder() + .callTimeout(config.getOperationTimeoutMs() == 0 + ? 3000 + : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS) + .connectTimeout(config.getConnectionTimeoutMs() == 0 + ? 3000 + : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) + .followRedirects(true) + .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS)) + .build(), + config, + mapper); } } diff --git a/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java b/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java index 58677990..41a67b0c 100644 --- a/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java +++ b/ranger-http/src/test/java/io/appform/ranger/http/common/HttpNodeDataStoreConnectorTest.java @@ -31,8 +31,8 @@ void testHttpNodeDataStoreConnector(){ .host("localhost-1") .port(80) .build(); - val httpNodeDataStoreConnector = new HttpNodeDataStoreConnector<>(httpClientConfig, objectMapper, - RangerHttpUtils.httpClient(httpClientConfig)); + val httpNodeDataStoreConnector = new HttpNodeDataStoreConnector<>(httpClientConfig, + RangerHttpUtils.httpClient(httpClientConfig, objectMapper)); Assertions.assertNotNull(httpNodeDataStoreConnector); Assertions.assertTrue(httpNodeDataStoreConnector.isActive()); } diff --git a/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java b/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java index 909ed69d..7662a167 100644 --- a/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java +++ b/ranger-http/src/test/java/io/appform/ranger/http/servicefinderhub/HttpServiceDataSourceTest.java @@ -55,7 +55,7 @@ void testServiceDataSource(WireMockRuntimeInfo wireMockRuntimeInfo) throws IOExc .connectionTimeoutMs(30_000) .operationTimeoutMs(30_000) .build(); - val httpServiceDataSource = new HttpServiceDataSource<>(clientConfig, MAPPER, RangerHttpUtils.httpClient(clientConfig)); + val httpServiceDataSource = new HttpServiceDataSource<>(clientConfig, RangerHttpUtils.httpClient(clientConfig, MAPPER)); val services = httpServiceDataSource.services(); Assertions.assertNotNull(services); Assertions.assertFalse(services.isEmpty()); diff --git a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java index 976b28df..b6013c1d 100644 --- a/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java +++ b/ranger-hub-server-bundle/src/main/java/io/appform/ranger/hub/server/bundle/RangerHubServerBundle.java @@ -128,7 +128,7 @@ private RangerHubClient> getHttpH .namespace(namespace) .mapper(getMapper()) .clientConfig(httpClientConfig) - .httpClient(RangerHttpUtils.httpClient(httpClientConfig)) + .httpClient(RangerHttpUtils.httpClient(httpClientConfig, getMapper())) .serviceRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs()) .hubRefreshDurationMs(httpConfiguration.getServiceRefreshDurationMs()) .nodeRefreshTimeMs(httpConfiguration.getNodeRefreshTimeMs()) diff --git a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java index 7d4249c7..f2fea6fa 100644 --- a/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java +++ b/ranger-zookeeper/src/main/java/io/appform/ranger/zookeeper/servicefinder/ZkNodeDataSource.java @@ -20,7 +20,6 @@ import io.appform.ranger.core.model.NodeDataSource; import io.appform.ranger.core.model.Service; import io.appform.ranger.core.model.ServiceNode; -import io.appform.ranger.core.util.FinderUtils; import io.appform.ranger.zookeeper.common.ZkNodeDataStoreConnector; import io.appform.ranger.zookeeper.common.ZkStoreType; import io.appform.ranger.zookeeper.serde.ZkNodeDataDeserializer; @@ -29,11 +28,11 @@ import lombok.val; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import java.util.Collections; import java.util.List; import java.util.Optional; -import org.apache.zookeeper.KeeperException.NoNodeException; /** * @@ -65,7 +64,6 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer) } Preconditions.checkNotNull(deserializer, "Deserializer has not been set for node data"); try { - val healthcheckZombieCheckThresholdTime = healthcheckZombieCheckThresholdTime(service); //1 Minute val serviceName = service.getServiceName(); if (!isActive()) { log.warn("ZK connection is not active. Ignoring refresh request for service: {}", @@ -79,13 +77,11 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer) log.debug("Found {} nodes for [{}]", children.size(), serviceName); for (val child : children) { byte[] data = readChild(parentPath, child).orElse(null); - if (data == null || data.length <= 0) { + if (data == null || data.length == 0) { continue; } val node = deserializer.deserialize(data); - if(FinderUtils.isValidNode(service, healthcheckZombieCheckThresholdTime, node)) { - nodes.add(node); - } + nodes.add(node); } return Optional.of(nodes); }