Skip to content

Commit

Permalink
minor cleanup and fix ReliableBroadcaster - was not executing finally…
Browse files Browse the repository at this point in the history
… if empty result from partner
  • Loading branch information
Hellblazer committed Nov 5, 2023
1 parent 5034ee4 commit e105b6f
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 84 deletions.
2 changes: 2 additions & 0 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ public void awaitSynchronization() {
return;
}
roundScheduler.schedule(AWAIT_SYNC, () -> {
log.trace("Synchronization failed on: {}", params.member().getId());
synchronizationFailed();
}, params.synchronizationCycles());
}
Expand Down Expand Up @@ -1244,6 +1245,7 @@ private Formation() {
assembly = new GenesisAssembly(vc, comm, next.get().member, consumer);
nextViewId.set(params.genesisViewId());
} else {
log.trace("No formation on: {}", params.member().getId());
assembly = null;
}
}
Expand Down
55 changes: 13 additions & 42 deletions choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,34 @@
<configuration>

<!-- Stop output INFO at start -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
%d{mm:ss.SSS} [%thread] %-5level %logger{0} -
%msg%n
</Pattern>
%d{mm:ss.SSS} [%thread] %-5level %logger{0}
- %msg%n
</Pattern>
</encoder>
</appender>
<logger name="com.salesforce.apollo.stereotomy" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.choam.CHOAM" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.choam.Session" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.choam.ViewAssembly" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.choam.Producer" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.choam.support.TxDataSource" level="info" additivity="false">
<appender-ref ref="STDOUT" />
<logger name="com.salesforce.apollo.membership" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.ethereal" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.ethereal.Adder" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.ethereal.memberships" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

<logger name="com.salesforce.apollo.membership.messaging.rbc" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
<logger name="com.salesforce.apollo.choam" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.comm" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
<logger name="com.chiralbehaviors.tron" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<root level="info">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDOUT"/>
</root>

</configuration>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {
private static final Logger log = LoggerFactory.getLogger(Ethereal.class);

public static ThreadPoolExecutor consumer(String label) {
return new ThreadPoolExecutor(1, 1, 1, TimeUnit.NANOSECONDS, new PriorityBlockingQueue<>(),
return new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
Thread.ofVirtual().name("Ethereal Consumer[" + label + "]").factory(),
(r, t) -> log.trace("Shutdown, cannot consume unit", t));
}
Expand Down
8 changes: 3 additions & 5 deletions fireflies/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
- %msg%n
</Pattern>
<Pattern>- %msg%n</Pattern>
</encoder>
</appender>

Expand All @@ -28,7 +26,7 @@

<logger name="com.salesforce.apollo.fireflies" level="info" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
<!-- <appender-ref ref="FILE" />-->
</logger>

<logger name="com.salesforce.apollo.members" level="info" additivity="false">
Expand All @@ -51,4 +49,4 @@
<appender-ref ref="STDOUT" />
</root>

</configuration>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public Digest hash(T m) {
}

public T insert(T m) {
LoggerFactory.getLogger(getClass()).trace("Adding: {} to ring: {}", m, index);
LoggerFactory.getLogger(getClass()).trace("Adding: {} to ring: {}", m.getId(), index);
return ring.put(hash(m), m);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,15 @@ public void start(Duration duration) {
var initialDelay = Entropy.nextBitsStreamLong(duration.toMillis());
log.info("Starting Reliable Broadcaster[{}] for {}", context.getId(), member.getId());
comm.register(context.getId(), new Service());
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
var scheduler = Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory());
scheduler.schedule(() -> oneRound(duration, scheduler), initialDelay, TimeUnit.MILLISECONDS);
}

public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
log.info("Stopping Reliable Broadcaster[{}] for {}", context.getId(), member.getId());
log.info("Stopping Reliable Broadcaster[{}] on: {}", context.getId(), member.getId());
buffer.clear();
gossiper.reset();
comm.deregister(context.getId());
Expand All @@ -214,25 +214,25 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) {
if (!started.get()) {
return null;
}
log.trace("rbc gossiping[{}] from {} with {} on {}", buffer.round(), member.getId(), link.getMember().getId(),
ring);
log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), link.getMember().getId(),
ring, member.getId());
try {
return link.gossip(
MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build());
} catch (Throwable e) {
log.trace("rbc gossiping[{}] failed from {} with {} on {}", buffer.round(), member.getId(),
link.getMember().getId(), ring, e);
log.trace("rbc gossiping[{}] failed with: {} ring: {} on: {}", buffer.round(), link.getMember().getId(),
ring, member.getId(), e);
return null;
}
}

private void handle(Optional<Reconcile> result,
RingCommunications.Destination<Member, ReliableBroadcast> destination, Duration duration,
ScheduledExecutorService scheduler, Timer.Context timer) {
if (result.isEmpty()) {
return;
}
try {
if (result.isEmpty()) {
return;
}
Reconcile gossip = result.get();
buffer.receive(gossip.getUpdatesList());
destination.link()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private <Q> void execute(BiFunction<Comm, Integer, Q> round, SyncHandler<T, Q, C
Q result = null;
try {
result = round.apply(destination.link, destination.ring);
} catch (StatusRuntimeException e) {
} catch (Throwable e) {
log.trace("error applying round to: %s", destination.member.getId(), e);
}
handler.handle(Optional.ofNullable(result), destination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private <Q> void internalIterate(Digest digest, Runnable onMajority, BiFunction<
Q result = null;
try {
result = round.apply(link, next.ring());
} catch (StatusRuntimeException e) {
} catch (Throwable e) {
log.trace("Exception in round for: {} on: {} iteration: {} from: {} on: {}", digest, context.getId(),
iteration(), link.getMember() == null ? null : link.getMember().getId(), member.getId());
}
Expand Down
42 changes: 18 additions & 24 deletions memberships/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<!-- Stop output INFO at start -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<!-- Stop output INFO at start -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
%d{mm:ss.SSS} [%thread] %-5level %logger{0} -
%msg%n
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
%d{mm:ss.SSS} [%thread] %-5level %logger{0}
- %msg%n
</Pattern>
</encoder>
</appender>
<root level="warn">
<appender-ref ref="STDOUT" />
</root>
<logger name="com.salesforce.apollo.stereotomy" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.membership.messaging.rbc" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.membership.messaging.rbc.comms" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.salesforce.apollo.comm" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<logger name="com.salesforce.apollo.stereotomy" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="com.salesforce.apollo.comm" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

</configuration>

0 comments on commit e105b6f

Please sign in to comment.