Skip to content

Commit

Permalink
Stability fixes and optimizations
Browse files Browse the repository at this point in the history
- Introduced communication execption
- Introduced communicator for http
- Introduced circuit breakers for drove and http
- Staleness check is done centrally at ServiceRegistryUpdater
- Bugfix: If upstream is inactive, the refresher will override timestamps with current time so that all nodes don't go stale in one minute
- Performance optimization: Introduced ForkJoinPool for hub refresh. Default pool of 20 or processor count whichever is higher. Significant improvement in startup time.
  • Loading branch information
santanusinha committed Oct 4, 2024
1 parent 00e280b commit 2761ccb
Show file tree
Hide file tree
Showing 38 changed files with 549 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;
import lombok.Builder;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

@Builder
public class TestSimpleUnshardedServiceFinder <T>
Expand All @@ -51,6 +51,7 @@ public Optional<List<ServiceNode<TestNodeData>>> refresh(Deserializer<TestNodeDa
.host("localhost")
.port(9200)
.nodeData(TestNodeData.builder().shardId(1).build())
.lastUpdatedTimeStamp(Long.MAX_VALUE)
.build())
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.appform.ranger.core.exceptions;

/**
* Base for communication exception
*/
public abstract class CommunicationException extends RuntimeException {
protected CommunicationException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package io.appform.ranger.core.finder.serviceregistry;

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.Deserializer;
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.Signal;
import io.appform.ranger.core.util.Exceptions;
import io.appform.ranger.core.util.FinderUtils;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

Expand All @@ -39,7 +42,7 @@
public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

private final ServiceRegistry<T> serviceRegistry;
private final NodeDataSource<T,D> nodeDataSource;
private final NodeDataSource<T, D> nodeDataSource;
private final D deserializer;

private final Lock checkLock = new ReentrantLock();
Expand All @@ -51,7 +54,7 @@ public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

public ServiceRegistryUpdater(
ServiceRegistry<T> serviceRegistry,
NodeDataSource<T,D> nodeDataSource,
NodeDataSource<T, D> nodeDataSource,
List<Signal<T>> signalGenerators,
D deserializer) {
this.serviceRegistry = serviceRegistry;
Expand All @@ -70,6 +73,8 @@ public void start() {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> null == r || !r)
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.build()
.call(serviceRegistry::isRefreshed);
}
Expand All @@ -81,7 +86,7 @@ public void start() {
}

public void stop() {
if(null != queryThreadFuture) {
if (null != queryThreadFuture) {
executorService.shutdownNow();
}
}
Expand Down Expand Up @@ -125,21 +130,42 @@ private Void queryExecutor() {

private void updateRegistry() throws InterruptedException {
log.debug("Checking for updates on data source for service: {}",
serviceRegistry.getService().getServiceName());
if(!nodeDataSource.isActive()) {
log.warn("Node data source seems to be down. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
return;
}
val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
if (null != nodeList) {
log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
serviceRegistry.getService().getServiceName());
serviceRegistry.updateNodes(nodeList);
serviceRegistry.getService().getServiceName());
var callFailed = false;
if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some
// time
try {
val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
if (null != nodeList) {
log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
serviceRegistry.getService().getServiceName());
val livenessCheckMaxAge = nodeDataSource.healthcheckZombieCheckThresholdTime(serviceRegistry.getService());
//Remove all stale nodes before updating. This is done centrally to ensure some data sources
//don't skip this check. Some control is still provided so that they can overload.
serviceRegistry.updateNodes(FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge));
}
else {
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
}
}
catch (Exception e) {
log.error("Error updating data from registry. Error: [{}] {}",
e.getClass().getSimpleName(),
e.getMessage());
callFailed = true;
}
}
else {
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
if (!nodeDataSource.isActive() || callFailed) {
val currTime = System.currentTimeMillis();
log.warn("Node data source seems to be down. Keeping old list for {}." +
" Will update timestamp to keep stale date relevant.",
serviceRegistry.getService().getServiceName());
serviceRegistry.updateNodes(serviceRegistry.nodeList()
.stream()
.filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus())
.map(node -> node.setLastUpdatedTimeStamp(currTime))
.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.appform.ranger.core.finder.ServiceFinder;
import io.appform.ranger.core.model.HubConstants;
import io.appform.ranger.core.model.Service;
Expand All @@ -34,11 +33,13 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -68,11 +69,14 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final ServiceFinderFactory<T, R> finderFactory;

private final AtomicBoolean alreadyUpdating = new AtomicBoolean(false);
private final AtomicInteger poolThreadIndex = new AtomicInteger(0);
private Future<?> monitorFuture = null;

private final long serviceRefreshDurationMs;
private final long hubRefreshDurationMs;

private final ForkJoinPool refresherPool;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory
Expand All @@ -88,12 +92,13 @@ public ServiceFinderHub(
long hubRefreshDurationMs) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs;
this.serviceRefreshDurationMs = serviceRefreshDurationMs == 0 ? HubConstants.SERVICE_REFRESH_DURATION_MS : serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs == 0 ? HubConstants.HUB_REFRESH_DURATION_MS : hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
10_000));
this.refresherPool = createRefresherPool();
}

public Optional<ServiceFinder<T, R>> finder(final Service service) {
Expand Down Expand Up @@ -158,6 +163,18 @@ public void updateAvailable() {
}
}

private ForkJoinPool createRefresherPool() {
return new ForkJoinPool(
Math.max(20, Runtime.getRuntime().availableProcessors()),
pool -> {
val thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("hub-refresher-" + poolThreadIndex.getAndIncrement());
return thread;
},
null,
false);
}

private void monitor() {
while (true) {
try {
Expand Down Expand Up @@ -185,17 +202,18 @@ private void updateRegistry() {
return;
}
alreadyUpdating.set(true);
final Map<Service, ServiceFinder<T, R>> updatedFinders = new HashMap<>();
val updatedFinders = new ConcurrentHashMap<Service, ServiceFinder<T, R>>();
try {
val services = serviceDataSource.services();
if (services.isEmpty()) {
log.debug("No services found for the service data source. Skipping update on the registry");
return;
}
val knownServiceFinders = finders.get();
val newFinders = services.stream()
.filter(service -> !knownServiceFinders.containsKey(service))
.collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder));
val newFinders = refresherPool.submit(() -> services.parallelStream()
.filter(service -> !knownServiceFinders.containsKey(service))
.collect(Collectors.toMap(Function.identity(), finderFactory::buildFinder)))
.get();
val matchingServices = knownServiceFinders.entrySet()
.stream()
.filter(entry -> services.contains(entry.getKey()))
Expand All @@ -208,6 +226,10 @@ private void updateRegistry() {
updatedFinders.putAll(matchingServices);
finders.set(updatedFinders);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Refresh interrupted.");
}
catch (Exception e) {
log.error("Error updating service list. Will maintain older list", e);
}
Expand All @@ -217,29 +239,33 @@ private void updateRegistry() {
}

private void waitTillHubIsReady() {
val services = serviceDataSource.services();
val timeToRefresh = Math.max(hubRefreshDurationMs,
(serviceRefreshDurationMs * services.size()) / refresherPool.getParallelism());
if (timeToRefresh != hubRefreshDurationMs) {
log.warn("Max hub refresh time has been dynamically adjusted to {} ms from the provided {} ms as the " +
"provided time would have been insufficient to refresh {} services.",
timeToRefresh, hubRefreshDurationMs, services.size());
}
val hubRefresher = CompletableFuture.allOf(
serviceDataSource.services()
.stream()
services.stream()
.map(service -> CompletableFuture.supplyAsync((Supplier<Void>) () -> {
waitTillServiceIsReady(service);
return null;
})).toArray(CompletableFuture[]::new)
);
})).toArray(CompletableFuture[]::new));
try {
hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
hubRefresher.get(timeToRefresh, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions.illegalState("Refresh interrupted");
}
catch (TimeoutException e) {
Exceptions
.illegalState("Couldn't perform service hub refresh at this time. " +
Exceptions.illegalState("Couldn't perform service hub refresh at this time. " +
"Refresh exceeded the start up time specified");
}
catch (Exception e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time", e);
Exceptions.illegalState("Couldn't perform hub refresh at this time", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.appform.ranger.core.model;

import io.appform.ranger.core.exceptions.CommunicationException;

import java.util.List;
import java.util.Optional;

Expand All @@ -24,9 +26,9 @@
@SuppressWarnings("unused")
public interface NodeDataSource<T, D extends Deserializer<T>> extends NodeDataStoreConnector<T> {

Optional<List<ServiceNode<T>>> refresh(D deserializer);
Optional<List<ServiceNode<T>>> refresh(D deserializer) throws CommunicationException;

default long healthcheckZombieCheckThresholdTime(Service service) {
return System.currentTimeMillis() - 60000; //1 Minute
return isActive() ? (System.currentTimeMillis() - 60000) : 0; //1 Minute
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ScheduledSignal<T> extends Signal<T> {
private final String name;
private final long refreshIntervalMillis;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

private ScheduledFuture<?> scheduledFuture = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import java.util.Optional;

import io.appform.ranger.core.utils.RangerTestUtils;
import lombok.val;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;

class ServiceFinderHubTest {

Expand Down Expand Up @@ -83,7 +82,7 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 2_000, 5_000
.build(), 5_000, 5_000
);
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
Expand Down Expand Up @@ -141,7 +140,7 @@ private static class TestNodeDataSource implements NodeDataSource<TestNodeData,
@Override
public Optional<List<ServiceNode<TestNodeData>>> refresh(Deserializer<TestNodeData> deserializer) {
val list = new ArrayList<ServiceNode<TestNodeData>>();
list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, 10L, "HTTP"));
list.add(new ServiceNode<>("HOST", 0, TestNodeData.builder().shardId(1).build(), HealthcheckStatus.healthy, Long.MAX_VALUE, "HTTP"));
return Optional.of(list);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 787544.107063881
"mean_ops" : 812476.3197574528
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 594383.3501367184
"mean_ops" : 592802.8071907263
}
Loading

0 comments on commit 2761ccb

Please sign in to comment.