Skip to content

Commit

Permalink
Deciding on replication only based on http client config
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Dec 17, 2024
1 parent 14e41b7 commit 9f24172
Show file tree
Hide file tree
Showing 17 changed files with 28 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D
*/
private long hubStartTimeoutMs;
private Set<String> excludedServices;
private boolean replicationSource;

@Override
public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,7 @@ List<ServiceNode<T>> getAllNodes(
final Predicate<T> criteria,
final ShardSelector<T, R> shardSelector);

boolean isReplicationSource();
default boolean isReplicationSource() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,26 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
private final long hubStartTimeoutMs;

private final Set<String> excludedServices;
@Getter
private final boolean replicationSource;

private final ForkJoinPool refresherPool;

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory) {
this(serviceDataSource, finderFactory,
HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of(), false);
HubConstants.SERVICE_REFRESH_TIMEOUT_MS, HubConstants.HUB_START_TIMEOUT_MS, Set.of());
}

public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory,
long serviceRefreshTimeoutMs,
long hubStartTimeoutMs,
final Set<String> excludedServices,
boolean replicationSource) {
final Set<String> excludedServices) {
this.serviceDataSource = serviceDataSource;
this.finderFactory = finderFactory;
this.serviceRefreshTimeoutMs = serviceRefreshTimeoutMs == 0 ? HubConstants.SERVICE_REFRESH_TIMEOUT_MS : serviceRefreshTimeoutMs;
this.hubStartTimeoutMs = hubStartTimeoutMs == 0 ? HubConstants.HUB_START_TIMEOUT_MS : hubStartTimeoutMs;
this.replicationSource = replicationSource;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public abstract class ServiceFinderHubBuilder<T, R extends ServiceRegistry<T>> {
private final List<Signal<Void>> extraRefreshSignals = new ArrayList<>();
private long serviceRefreshTimeoutMs = HubConstants.SERVICE_REFRESH_TIMEOUT_MS;
private long hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;
private boolean replicationSource = false;

private Set<String> excludedServices = new HashSet<>();

Expand Down Expand Up @@ -87,18 +86,13 @@ public ServiceFinderHubBuilder<T, R> withExcludedServices(Set<String> excludedSe
return this;
}

public ServiceFinderHubBuilder<T, R> withReplicationSource(boolean replicationSource) {
this.replicationSource = replicationSource;
return this;
}

public ServiceFinderHub<T, R> build() {
preBuild();
Preconditions.checkNotNull(serviceDataSource, "Provide a non-null service data source");
Preconditions.checkNotNull(serviceFinderFactory, "Provide a non-null service finder factory");

val hub = new ServiceFinderHub<>(serviceDataSource, serviceFinderFactory, serviceRefreshTimeoutMs,
hubStartTimeoutMs, excludedServices, replicationSource);
hubStartTimeoutMs, excludedServices);
final ScheduledSignal<Void> refreshSignal = new ScheduledSignal<>("service-hub-refresh-timer",
() -> null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ void testDelayedServiceAddition() {
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(5)
.build(), 1_000, 5_000, Set.of(), false);
.build(), 1_000, 5_000, Set.of());
Assertions.assertThrows(IllegalStateException.class, delayedHub::start);
val serviceFinderHub = new ServiceFinderHub<>(new DynamicDataSource(Lists.newArrayList(new Service("NS", "SERVICE"))),
service -> new TestServiceFinderBuilder()
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<TestNodeData>() {})
.withSleepDuration(1)
.build(), 5_000, 5_000, Set.of(), false);
.build(), 5_000, 5_000, Set.of());
serviceFinderHub.start();
Assertions.assertTrue(serviceFinderHub.finder(new Service("NS", "SERVICE")).isPresent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 780203.0578756403
"mean_ops" : 659517.6790772061
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 610879.2316480416
"mean_ops" : 494777.4621186971
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected ServiceFinderHub<T, R> buildHub() {
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.withReplicationSource(isReplicationSource())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ protected ServiceFinderHub<T, R> buildHub() {
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.withReplicationSource(isReplicationSource())
.build();
}

@Override
public boolean isReplicationSource() {
return clientConfig.isReplicationSource();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public class HttpClientConfig {
long connectionTimeoutMs;
long operationTimeoutMs;
long refreshIntervalMillis;
boolean skipReplicatedData;
boolean replicationSource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Set<Service> services() {
.host(config.getHost())
.port(config.getPort() == 0 ? defaultPort() : config.getPort())
.encodedPath("/ranger/services/v1")
.addQueryParameter("skipReplicationSources", Objects.toString(config.isSkipReplicatedData()))
.addQueryParameter("skipReplicationSources", Objects.toString(config.isReplicationSource()))
.build();
val request = new Request.Builder()
.url(httpUrl)
Expand Down Expand Up @@ -110,7 +110,7 @@ public List<ServiceNode<T>> listNodes(
.host(config.getHost())
.port(config.getPort() == 0 ? defaultPort() : config.getPort())
.encodedPath(url)
.addQueryParameter("skipReplicationSources", Objects.toString(config.isSkipReplicatedData()))
.addQueryParameter("skipReplicationSources", Objects.toString(config.isReplicationSource()))
.build();
val request = new Request.Builder()
.url(httpUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void testServiceDataSource(WireMockRuntimeInfo wireMockRuntimeInfo) throws IOExc
.port(wireMockRuntimeInfo.getHttpPort())
.connectionTimeoutMs(30_000)
.operationTimeoutMs(30_000)
.skipReplicatedData(true)
.replicationSource(true)
.build();
val httpServiceDataSource = new HttpServiceDataSource<>(clientConfig,
RangerHttpUtils.httpClient(clientConfig, MAPPER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> addCurat
.hubStartTimeoutMs(zkConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(zkConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(zkConfiguration.isReplicationSource())
.deserializer(data -> {
try {
return getMapper().readValue(data, new TypeReference<ServiceNode<ShardInfo>>() {
Expand All @@ -134,7 +133,6 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> getHttpH
.hubStartTimeoutMs(httpConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(httpConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(httpConfiguration.isReplicationSource())
.deserializer(data -> {
try {
return getMapper().readValue(data, new TypeReference<>() {});
Expand Down Expand Up @@ -163,7 +161,6 @@ private RangerHubClient<ShardInfo, ListBasedServiceRegistry<ShardInfo>> getDrove
.hubStartTimeoutMs(droveUpstreamConfiguration.getHubStartTimeoutMs())
.nodeRefreshTimeMs(droveUpstreamConfiguration.getNodeRefreshTimeMs())
.excludedServices(excludedServices)
.replicationSource(droveUpstreamConfiguration.isReplicationSource())
.deserializer(new DroveResponseDataDeserializer<>() {
@Override
protected ShardInfo translate(ExposedAppInfo appInfo, ExposedAppInfo.ExposedHost host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class RangerHttpUpstreamConfiguration extends RangerUpstreamConfiguration {

@NotEmpty
@Valid
private List<HttpClientConfig> httpClientConfigs;
@NotEmpty
@Valid
private List<HttpClientConfig> httpClientConfigs;

public RangerHttpUpstreamConfiguration() {
super(BackendType.HTTP);
}

@Override
public <T> T accept(RangerConfigurationVisitor<T> visitor) {
return visitor.visit(this);
}
@Override
public <T> T accept(RangerConfigurationVisitor<T> visitor) {
return visitor.visit(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public abstract class RangerUpstreamConfiguration {
@Min(HubConstants.MINIMUM_HUB_START_TIMEOUT_MS)
private int hubStartTimeoutMs = HubConstants.HUB_START_TIMEOUT_MS;

private boolean replicationSource;

protected RangerUpstreamConfiguration(BackendType type) {
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public GenericResponse<Set<Service>> getServices(
@QueryParam("skipReplicationSources") @DefaultValue("false") boolean skipReplicationSources) {
return GenericResponse.<Set<Service>>builder()
.data(rangerHubs.stream()
.filter(hub -> !skipReplicationSources || !hub.isReplicationSource())
.map(RangerHubClient::getRegisteredServices)
.flatMap(Collection::stream)
.collect(Collectors.toSet()))
Expand All @@ -71,7 +72,7 @@ public GenericResponse<Collection<ServiceNode<T>>> getNodes(
val service = Service.builder().namespace(namespace).serviceName(serviceName).build();
return GenericResponse.<Collection<ServiceNode<T>>>builder()
.data(rangerHubs.stream()
.filter(hub -> skipReplicationSources && !hub.isReplicationSource())
.filter(hub -> !skipReplicationSources || !hub.isReplicationSource())
.map(hub -> hub.getAllNodes(service))
.flatMap(List::stream)
.collect(Collectors.toMap(node -> node.getHost() + ":" + node.getPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public abstract class AbstractRangerZKHubClient<T, R extends ServiceRegistry<T>,
private final boolean disablePushUpdaters;
private final String connectionString;
private final CuratorFramework curatorFramework;
private boolean replicationSource;

@Override
protected ServiceFinderHub<T, R> buildHub() {
Expand All @@ -50,7 +49,6 @@ protected ServiceFinderHub<T, R> buildHub() {
.withHubStartTimeout(getHubStartTimeoutMs())
.withServiceRefreshTimeout(getServiceRefreshTimeoutMs())
.withExcludedServices(getExcludedServices())
.withReplicationSource(isReplicationSource())
.build();
}

Expand Down

0 comments on commit 9f24172

Please sign in to comment.