Skip to content

Commit

Permalink
Merge pull request #42 from appform-io/drove_new
Browse files Browse the repository at this point in the history
Service discovery based on Drove container orchestrator
  • Loading branch information
koushikr authored Aug 23, 2024
2 parents 493f0b4 + 90ef4ae commit 4171b8b
Show file tree
Hide file tree
Showing 57 changed files with 3,273 additions and 127 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/build-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Java CI with Maven

on:
push:
branches: [ main ]
branches: [ main , drove_new ]
pull_request:
branches: [ main ]

Expand All @@ -23,4 +23,7 @@ jobs:
cache: maven
distribution: 'temurin'
- name: Build with Maven
run: mvn -B clean package --file pom.xml
env:
USER_NAME: ${{ secrets.PPE_USER_NAME }}
ACCESS_TOKEN: ${{ secrets.PPE_ACCESS_TOKEN }}
run: mvn -s $GITHUB_WORKSPACE/.github/workflows/maven-settings.xml -B clean package --file pom.xml
38 changes: 38 additions & 0 deletions .github/workflows/maven-settings.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<activeProfiles>
<activeProfile>github</activeProfile>
</activeProfiles>
<profiles>
<profile>
<id>github</id>
<repositories>
<repository>
<id>central</id>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/phonepe/drove</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</profile>
</profiles>

<servers>
<server>
<id>github</id>
<username>${env.USER_NAME}</username>
<password>${env.ACCESS_TOKEN}</password>
</server>
</servers>

</settings>
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ To use the http server bundle along with boostrap use.
}
return null;
})
.build()).collect(Collectors.toList());
.build()).toList();
}

@Override
Expand Down
31 changes: 29 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
<module>ranger-core</module>
<module>ranger-zookeeper</module>
<module>ranger-http</module>
<module>ranger-drove</module>
<module>ranger-client</module>
<module>ranger-zk-client</module>
<module>ranger-http-client</module>
<module>ranger-drove-client</module>
<module>ranger-server-common</module>
<module>ranger-server-bundle</module>
<module>ranger-http-model</module>
<module>ranger-discovery-bundle</module>
<module>ranger-hub-server-bundle</module>
<module>ranger-server</module>
</modules>
Expand Down Expand Up @@ -98,6 +101,7 @@
<mockito.version>4.2.0</mockito.version>

<dropwizard.version>2.1.10</dropwizard.version>
<logback.version>1.2.12</logback.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -181,9 +185,32 @@
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<version>3.3.1</version>
<dependencies>
<dependency>
<groupId>me.fabriciorby</groupId>
<artifactId>maven-surefire-junit5-tree-reporter</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
<configuration>
<reportFormat>plain</reportFormat>
<consoleOutputReporter>
<disable>true</disable>
</consoleOutputReporter>
<statelessTestsetInfoReporter
implementation="org.apache.maven.plugin.surefire.extensions.junit5.JUnit5StatelessTestsetInfoTreeReporter">
<printStacktraceOnError>true</printStacktraceOnError>
<printStacktraceOnFailure>true</printStacktraceOnFailure>
<printStdoutOnError>true</printStdoutOnError>
<printStdoutOnFailure>true</printStdoutOnFailure>
<printStdoutOnSuccess>false</printStdoutOnSuccess>
<printStderrOnError>true</printStderrOnError>
<printStderrOnFailure>true</printStderrOnFailure>
<printStderrOnSuccess>false</printStderrOnSuccess>
</statelessTestsetInfoReporter>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public List<ServiceNode<T>> nodes(Predicate<T> criteria, ListBasedServiceRegistr
return null == criteria ? serviceRegistry.nodeList() : serviceRegistry.nodeList()
.stream()
.filter(node -> criteria.test(node.getNodeData()))
.collect(Collectors.toList());
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ public List<ServiceNode<T>> nodes(Predicate<T> criteria, MapBasedServiceRegistry
.stream()
.filter(e -> criteria.test(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
@Slf4j
public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
@Getter
private final AtomicReference<Map<Service, ServiceFinder<T, R>>> finders = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicReference<Map<Service, ServiceFinder<T, R>>> finders =
new AtomicReference<>(new ConcurrentHashMap<>());
private final Lock updateLock = new ReentrantLock();
private final Condition updateCond = updateLock.newCondition();
private boolean updateAvailable = false;
Expand Down Expand Up @@ -75,9 +76,9 @@ public class ServiceFinderHub<T, R extends ServiceRegistry<T>> {
public ServiceFinderHub(
ServiceDataSource serviceDataSource,
ServiceFinderFactory<T, R> finderFactory
) {
) {
this(serviceDataSource, finderFactory,
HubConstants.SERVICE_REFRESH_DURATION_MS, HubConstants.HUB_REFRESH_DURATION_MS);
HubConstants.SERVICE_REFRESH_DURATION_MS, HubConstants.HUB_REFRESH_DURATION_MS);
}

public ServiceFinderHub(
Expand All @@ -90,9 +91,9 @@ public ServiceFinderHub(
this.serviceRefreshDurationMs = serviceRefreshDurationMs;
this.hubRefreshDurationMs = hubRefreshDurationMs;
this.refreshSignals.add(new ScheduledSignal<>("service-hub-updater",
() -> null,
Collections.emptyList(),
10_000));
() -> null,
Collections.emptyList(),
10_000));
}

public Optional<ServiceFinder<T, R>> finder(final Service service) {
Expand All @@ -110,7 +111,8 @@ public CompletableFuture<ServiceFinder<T, R>> buildFinder(final Service service)
updateAvailable();
waitTillServiceIsReady(service);
return finders.get().get(service);
} catch(Exception e) {
}
catch (Exception e) {
log.warn("Exception whiling building finder", e);
throw e;
}
Expand All @@ -133,12 +135,14 @@ public void stop() {
if (null != monitorFuture) {
try {
monitorFuture.cancel(true);
} catch (Exception e) {
}
catch (Exception e) {
log.warn("Error stopping service finder hub monitor: {}", e.getMessage());
}
}
log.info("Service finder hub stopped");
}

public void registerUpdateSignal(final Signal<Void> refreshSignal) {
refreshSignals.add(refreshSignal);
}
Expand All @@ -148,7 +152,8 @@ public void updateAvailable() {
updateLock.lock();
updateAvailable = true;
updateCond.signalAll();
} finally {
}
finally {
updateLock.unlock();
}
}
Expand All @@ -161,11 +166,13 @@ private void monitor() {
updateCond.await();
}
updateRegistry();
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
log.info("Updater thread interrupted");
Thread.currentThread().interrupt();
break;
} finally {
}
finally {
updateAvailable = false;
updateLock.unlock();
}
Expand All @@ -181,7 +188,7 @@ private void updateRegistry() {
final Map<Service, ServiceFinder<T, R>> updatedFinders = new HashMap<>();
try {
val services = serviceDataSource.services();
if(services.isEmpty()) {
if (services.isEmpty()) {
log.debug("No services found for the service data source. Skipping update on the registry");
return;
}
Expand All @@ -200,9 +207,11 @@ private void updateRegistry() {
updatedFinders.putAll(newFinders);
updatedFinders.putAll(matchingServices);
finders.set(updatedFinders);
} catch (Exception e) {
}
catch (Exception e) {
log.error("Error updating service list. Will maintain older list", e);
} finally {
}
finally {
alreadyUpdating.set(false);
}
}
Expand All @@ -215,14 +224,20 @@ private void waitTillHubIsReady() {
waitTillServiceIsReady(service);
return null;
})).toArray(CompletableFuture[]::new)
);
);
try {
hubRefresher.get(hubRefreshDurationMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Exceptions.illegalState("Refresh interrupted");
}
catch (TimeoutException e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time. Refresh exceeded the start up time specified");
} catch (Exception e) {
.illegalState("Couldn't perform service hub refresh at this time. " +
"Refresh exceeded the start up time specified");
}
catch (Exception e) {
Exceptions
.illegalState("Couldn't perform hub refresh at this time", e);
}
Expand All @@ -238,7 +253,8 @@ private void waitTillServiceIsReady(Service service) {
.map(ServiceFinder::getServiceRegistry)
.map(ServiceRegistry::isRefreshed)
.orElse(false));
} catch (Exception e) {
}
catch (Exception e) {
Exceptions
.illegalState("Could not perform initial state for service: " + service.getServiceName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

@UtilityClass
public class HubConstants {
public static long SERVICE_REFRESH_DURATION_MS = 10_000;
public static long HUB_REFRESH_DURATION_MS = 30_000;
public static long REFRESH_FREQUENCY_MS = 10_000;
public static final long SERVICE_REFRESH_DURATION_MS = 10_000;
public static final long HUB_REFRESH_DURATION_MS = 30_000;
public static final long REFRESH_FREQUENCY_MS = 10_000;
public static final int CONNECTION_RETRY_TIME_MS = 5_000;
public static final int MINIMUM_REFRESH_TIME_MS = 5_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Service {
public class Service implements Comparable<Service> {
String namespace;
String serviceName;

Expand All @@ -37,4 +37,11 @@ public String name() {
public String toString() {
return name();
}

@Override
public int compareTo(Service service) {
return namespace.equals(service.getNamespace())
? serviceName.compareTo(service.getServiceName())
: namespace.compareTo(service.getNamespace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static<T> List<ServiceNode<T>> filterValidNodes(
long healthcheckZombieCheckThresholdTime) {
return serviceNodes.stream()
.filter(serviceNode -> isValidNode(service, healthcheckZombieCheckThresholdTime, serviceNode))
.collect(Collectors.toList());
.toList();
}

public static <T> boolean isValidNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 763105.9286163104
"mean_ops" : 787544.107063881
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 608727.2211847006
"mean_ops" : 594383.3501367184
}
2 changes: 1 addition & 1 deletion ranger-discovery-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ranger</artifactId>
<groupId>io.appform.ranger</groupId>
<version>1.1-RC1</version>
<version>1.1-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public List<ServiceNode<ShardInfo>> nodes(
.stream()
.filter(e -> e.getKey().getEnvironment().equals(env.environment) && evalPredicate.test(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
.toList();
if (!eligibleNodes.isEmpty()) {
log.debug("Effective environment for discovery of {} is {}", serviceName, env.environment);
return eligibleNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void cleanup() {
void testGenerate() {
IdGenerator.initialize(23);
val numRunners = 20;
val runners = IntStream.range(0, numRunners).mapToObj(i -> new Runner()).collect(Collectors.toList());
val runners = IntStream.range(0, numRunners).mapToObj(i -> new Runner()).toList();
val executorService = Executors.newFixedThreadPool(numRunners);
runners.forEach(executorService::submit);
Awaitility.await()
Expand Down Expand Up @@ -154,7 +154,7 @@ void testGenerateWithConstraintsNoConstraint() {
IdGenerator.initialize(23);
int numRunners = 20;

val runners = IntStream.range(0, numRunners).mapToObj(i -> new ConstraintRunner(new PartitionValidator(4, new JavaHashCodeBasedKeyPartitioner(16)))).collect(Collectors.toList());
val runners = IntStream.range(0, numRunners).mapToObj(i -> new ConstraintRunner(new PartitionValidator(4, new JavaHashCodeBasedKeyPartitioner(16)))).toList();
val executorService = Executors.newFixedThreadPool(numRunners);
runners.forEach(executorService::submit);
Awaitility.await()
Expand Down
Loading

0 comments on commit 4171b8b

Please sign in to comment.