Skip to content

Commit

Permalink
Added ability to skip replicated data from http source
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Dec 17, 2024
1 parent ed08f8f commit 31d6176
Show file tree
Hide file tree
Showing 21 changed files with 81 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,15 @@
import io.appform.ranger.core.finderhub.ServiceFinderFactory;
import io.appform.ranger.core.finderhub.ServiceFinderHub;
import io.appform.ranger.core.model.*;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.appform.ranger.core.util.FinderUtils;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

@Slf4j
@Getter
@SuperBuilder
Expand All @@ -63,6 +56,7 @@ public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D
*/
private long hubStartTimeoutMs;
private Set<String> excludedServices;
private boolean replicationSource;

@Override
public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ List<ServiceNode<T>> getAllNodes(
final Service service,
final Predicate<T> criteria,
final ShardSelector<T, R> shardSelector);

boolean isReplicationSource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,30 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final long hubStartTimeoutMs;

private final Set<String> excludedServices;
@Getter
private final boolean replicationSource;

private final ForkJoinPool refresherPool;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory
) {
ServiceFinderFactory<T, R> finderFactory) {
this(serviceDataSource, finderFactory,
HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of());
HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of(), false);
}

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory,
long serviceRefreshTimeoutMs,
long hubStartTimeoutMs,
final Set<String> excludedServices) {
final Set<String> excludedServices,
boolean replicationSource) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs == 0 ? HubConstants.SERVICE_REFRESH_TIMEOUT_MS : serviceRefreshTimeoutMs;
this.hubStartTimeoutMs = hubStartTimeoutMs == 0 ? HubConstants.HUB_START_TIMEOUT_MS : hubStartTimeoutMs;
this.replicationSource = replicationSource;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.ScheduledSignal;
import io.appform.ranger.core.signals.Signal;

import java.util.*;

import lombok.val;

import java.util.*;
import java.util.function.Consumer;

/**
Expand All @@ -40,6 +38,8 @@ public abstract class ServiceFinderHubBuilder<T, R extends ServiceRegistry<T>> {
private final List<Signal<Void>> extraRefreshSignals = new ArrayList<>();
private long serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS;
private long hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;
private boolean replicationSource = false;

private Set<String> excludedServices = new HashSet<>();

public ServiceFinderHubBuilder<T, R> withServiceDataSource(ServiceDataSource serviceDataSource) {
Expand Down Expand Up @@ -87,13 +87,18 @@ public ServiceFinderHubBuilder<T, R> withExcludedServices(Set<String> excludedSe
return this;
}

public ServiceFinderHubBuilder<T, R> withReplicationSource(boolean replicationSource) {
this.replicationSource = replicationSource;
return this;
}

public ServiceFinderHub<T, R> build() {
preBuild();
Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source");
Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory");

val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory, serviceRefreshTimeoutMs,
hubStartTimeoutMs, excludedServices);
hubStartTimeoutMs, excludedServices, replicationSource);
final ScheduledSignal<Void> refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import io.appform.ranger.core.utils.RangerTestUtils;
import java.util.Set;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.*;

class ServiceFinderHubTest {

Expand Down Expand Up @@ -93,17 +89,15 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(5)
.build(), 1_000, 5_000, Set.of()
);
.build(), 1_000, 5_000, Set.of(), false);
Assertions.assertThrows(IllegalStateException.class, delayedHub::start);
val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 5_000, 5_000, Set.of()
);
.build(), 5_000, 5_000, Set.of(), false);
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 812476.3197574528
"mean_ops" : 780203.0578756403
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 592802.8071907263
"mean_ops" : 610879.2316480416
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import io.appform.ranger.core.finderhub.ServiceFinderHub;
import io.appform.ranger.core.model.ServiceNodeSelector;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.drove.common.DroveCommunicator;
import io.appform.ranger.drove.config.DroveUpstreamConfig;
import io.appform.ranger.drove.serde.DroveResponseDataDeserializer;
import io.appform.ranger.drove.servicefinderhub.DroveServiceDataSource;
import io.appform.ranger.drove.servicefinderhub.DroveServiceFinderHubBuilder;
import io.appform.ranger.drove.common.DroveCommunicator;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
Expand All @@ -34,29 +34,31 @@
@Slf4j
@Getter
@SuperBuilder
public abstract class AbstractRangerDroveHubClient<T, R extends ServiceRegistry<T>, D extends DroveResponseDataDeserializer<T>>
public abstract class AbstractRangerDroveHubClient<T, R extends ServiceRegistry<T>,
D extends DroveResponseDataDeserializer<T>>
extends AbstractRangerHubClient<T, R, D> {

private final DroveUpstreamConfig clientConfig;
private final DroveCommunicator droveCommunicator;
private final DroveUpstreamConfig clientConfig;
private final DroveCommunicator droveCommunicator;

@Builder.Default
private final ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<>();
@Builder.Default
private final ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<>();

@Override
protected ServiceDataSource getDefaultDataSource() {
return new DroveServiceDataSource<>(clientConfig, getMapper(), getNamespace(), droveCommunicator);
}
@Override
protected ServiceDataSource getDefaultDataSource() {
return new DroveServiceDataSource<>(clientConfig, getMapper(), getNamespace(), droveCommunicator);
}

@Override
protected ServiceFinderHub<T, R> buildHub() {
return new DroveServiceFinderHubBuilder<T, R>()
.withServiceDataSource(getServiceDataSource())
.withServiceFinderFactory(getFinderFactory())
.withRefreshFrequencyMs(getNodeRefreshTimeMs())
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.build();
}
@Override
protected ServiceFinderHub<T, R> buildHub() {
return new DroveServiceFinderHubBuilder<T, R>()
.withServiceDataSource(getServiceDataSource())
.withServiceFinderFactory(getFinderFactory())
.withRefreshFrequencyMs(getNodeRefreshTimeMs())
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.withReplicationSource(isReplicationSource())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected ServiceFinderHub<T, R> buildHub() {
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.withReplicationSource(isReplicationSource())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void prepareHttpMocks() throws Exception {
ServiceNodesResponse.<TestNodeData>builder()
.data(Lists.newArrayList(node))
.build());
wireMockExtension.stubFor(get(urlEqualTo("/ranger/nodes/v1/test-n/test-s"))
wireMockExtension.stubFor(get(urlPathEqualTo("/ranger/nodes/v1/test-n/test-s"))
.willReturn(aResponse()
.withBody(payload)
.withStatus(200)));
Expand All @@ -72,7 +72,7 @@ public void prepareHttpMocks() throws Exception {
))
.build();
val response = objectMapper.writeValueAsBytes(responseObj);
wireMockExtension.stubFor(get(urlEqualTo("/ranger/services/v1"))
wireMockExtension.stubFor(get(urlPathEqualTo("/ranger/services/v1"))
.willReturn(aResponse()
.withBody(response)
.withStatus(200)));
Expand Down
6 changes: 6 additions & 0 deletions ranger-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,11 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class HttpClientConfig {
long connectionTimeoutMs;
long operationTimeoutMs;
long refreshIntervalMillis;
boolean skipReplicatedData;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Set<Service> services() {
.host(config.getHost())
.port(config.getPort() == 0 ? defaultPort() : config.getPort())
.encodedPath("/ranger/services/v1")
.addQueryParameter("skipReplicationSources", Objects.toString(config.isSkipReplicatedData()))
.build();
val request = new Request.Builder()
.url(httpUrl)
Expand Down Expand Up @@ -109,6 +110,7 @@ public List<ServiceNode<T>> listNodes(
.host(config.getHost())
.port(config.getPort() == 0 ? defaultPort() : config.getPort())
.encodedPath(url)
.addQueryParameter("skipReplicationSources", Objects.toString(config.isSkipReplicatedData()))
.build();
val request = new Request.Builder()
.url(httpUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void testFinder(WireMockRuntimeInfo wireMockRuntimeInfo) throws Exception {
ServiceNodesResponse.<NodeData>builder()
.data(Collections.singletonList(node))
.build());
stubFor(get(urlEqualTo("/ranger/nodes/v1/testns/test"))
stubFor(get(urlPathEqualTo("/ranger/nodes/v1/testns/test"))
.willReturn(aResponse()
.withBody(payload)
.withStatus(200)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void testServiceDataSource(WireMockRuntimeInfo wireMockRuntimeInfo) throws IOExc
))
.build();
val response = MAPPER.writeValueAsBytes(responseObj);
stubFor(get(urlEqualTo("/ranger/services/v1"))
stubFor(get(urlPathEqualTo("/ranger/services/v1"))
.willReturn(aResponse()
.withBody(response)
.withStatus(200)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void testProvider(WireMockRuntimeInfo wireMockRuntimeInfo) throws Exception {
)
.build());
byte[] requestBytes = MAPPER.writeValueAsBytes(testNode);
stubFor(post(urlEqualTo("/ranger/nodes/v1/add/testns/test"))
stubFor(post(urlPathEqualTo("/ranger/nodes/v1/add/testns/test"))
.withRequestBody(binaryEqualTo(requestBytes))
.willReturn(aResponse()
.withBody(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> addCurat
.hubStartTimeoutMs(zkConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(zkConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(zkConfiguration.isReplicationSource())
.deserializer(data -> {
try {
return getMapper().readValue(data, new TypeReference<ServiceNode<ShardInfo>>() {
Expand All @@ -133,6 +134,7 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> getHttpH
.hubStartTimeoutMs(httpConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(httpConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(httpConfiguration.isReplicationSource())
.deserializer(data -> {
try {
return getMapper().readValue(data, new TypeReference<>() {});
Expand Down Expand Up @@ -161,6 +163,7 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> getDrove
.hubStartTimeoutMs(droveUpstreamConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(droveUpstreamConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(droveUpstreamConfiguration.isReplicationSource())
.deserializer(new DroveResponseDataDeserializer<>() {
@Override
protected ShardInfo translate(ExposedAppInfo appInfo, ExposedAppInfo.ExposedHost host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public abstract class RangerUpstreamConfiguration {
@Min(HubConstants.MINIMUM_HUB_START_TIMEOUT_MS)
private int hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;

private boolean replicationSource;

protected RangerUpstreamConfiguration(BackendType type) {
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ protected RangerServerConfiguration getRangerConfiguration(TestConfig configurat
.serviceName("service-" + i)
.build())
.collect(Collectors.toUnmodifiableSet());
stubFor(get("/ranger/services/v1")
stubFor(get(urlPathEqualTo("/ranger/services/v1"))
.willReturn(okJson(environment.getObjectMapper()
.writeValueAsString(ServiceDataSourceResponse.builder()
.data(services)
.build()))));
stubFor(any(urlMatching("/ranger/nodes/v1/test/service-[0-9]+"))
stubFor(any(urlPathMatching("/ranger/nodes/v1/test/service-[0-9]+"))
.willReturn(okJson(mapper.writeValueAsString(
ServiceNodesResponse.builder()
.data(IntStream.rangeClosed(1, 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public RangerResource(List<RangerHubClient<T, R>> rangerHubs) {
@GET
@Path("/services/v1")
@Timed
public GenericResponse<Set<Service>> getServices() {
public GenericResponse<Set<Service>> getServices(
@QueryParam("skipReplicationSources") @DefaultValue("false") boolean skipReplicationSources) {
return GenericResponse.<Set<Service>>builder()
.data(rangerHubs.stream()
.map(RangerHubClient::getRegisteredServices)
Expand All @@ -65,10 +66,12 @@ public GenericResponse<Set<Service>> getServices() {
@Timed
public GenericResponse<Collection<ServiceNode<T>>> getNodes(
@NotNull @NotEmpty @PathParam("namespace") final String namespace,
@NotNull @NotEmpty @PathParam("serviceName") final String serviceName) {
@NotNull @NotEmpty @PathParam("serviceName") final String serviceName,
@QueryParam("skipReplicationSources") @DefaultValue("false") boolean skipReplicationSources) {
val service = Service.builder().namespace(namespace).serviceName(serviceName).build();
return GenericResponse.<Collection<ServiceNode<T>>>builder()
.data(rangerHubs.stream()
.filter(hub -> skipReplicationSources && !hub.isReplicationSource())
.map(hub -> hub.getAllNodes(service))
.flatMap(List::stream)
.collect(Collectors.toMap(node -> node.getHost() + ":" + node.getPort(),
Expand Down
Loading

0 comments on commit 31d6176

Please sign in to comment.