Skip to content

Commit

Permalink
use majority for observer's view change.
Browse files Browse the repository at this point in the history
utility for listing network interfaces
  • Loading branch information
Hellblazer committed Jun 13, 2024
1 parent aa6d0b2 commit 8f6a81c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 17 deletions.
30 changes: 15 additions & 15 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,15 @@ void finalizeViewChange() {
return;
}
viewChange(() -> {
var rings = context.getRingCount();
final var superMajority = context.size() == 1 ? 1 : rings - ((rings - 1) / 4);
if (observations.size() < superMajority) {
log.trace("Do not have superMajority: {} required: {} observers: {} for: {} on: {}",
observations.size(), superMajority, viewManagement.observersList(), currentView(),
node.getId());
final var majority = context.size() == 1 ? 1 : context.majority();
if (observations.size() < majority) {
log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(),
majority, viewManagement.observersList(), currentView(), node.getId());
scheduleFinalizeViewChange(2);
return;
}
log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(),
superMajority, viewManagement.observersList(), currentView(), node.getId());
log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority,
viewManagement.observersList(), currentView(), node.getId());
HashMultiset<Ballot> ballots = HashMultiset.create();
final var current = currentView();
observations.values()
Expand All @@ -374,15 +372,15 @@ void finalizeViewChange() {
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
.orElse(null);
if (max != null && max.getCount() >= superMajority) {
log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max,
superMajority, viewManagement.cardinality(), currentView(), node.getId());
if (max != null && max.getCount() >= majority) {
log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, majority,
viewManagement.cardinality(), currentView(), node.getId());
viewManagement.install(max.getElement());
} else {
@SuppressWarnings("unchecked")
final var reversed = Comparator.comparing(e -> ((Entry<Ballot>) e).getCount()).reversed();
log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
observations.size(), superMajority, viewManagement.cardinality(),
observations.size(), majority, viewManagement.cardinality(),
ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId());
}

Expand Down Expand Up @@ -609,8 +607,6 @@ void viewChange(Runnable r) {
protected Gossip gossip(Fireflies link, int ring) {
tick();
if (shunned.contains(link.getMember().getId())) {
log.trace("Shunning gossip view: {} with: {} on: {}", currentView(), link.getMember().getId(),
node.getId());
if (metrics != null) {
metrics.shunnedGossip().mark();
}
Expand Down Expand Up @@ -646,6 +642,8 @@ protected Gossip gossip(Fireflies link, int ring) {
node.getId());
break;
case UNAVAILABLE:
log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(),
node.getId(), sre);
accuse(p, ring, sre);
break;
default:
Expand Down Expand Up @@ -678,7 +676,7 @@ private void accuse(Participant member, int ring, Throwable e) {
member.addAccusation(node.accuse(member, ring));
pendingRebuttals.computeIfAbsent(member.getId(),
d -> roundTimers.schedule(() -> gc(member), params.rebuttalTimeout()));
log.debug("Accuse {} on ring {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(),
log.debug("Accuse: {} on ring: {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(),
e.getMessage(), node.getId());
}

Expand Down Expand Up @@ -1102,6 +1100,8 @@ private void gossip(Optional<Gossip> result, RingCommunications.Destination<Part
if (e.getCause() instanceof StatusRuntimeException sre) {
handleSRE("gossip", destination, member, sre);
} else {
log.debug("Exception gossiping with: {} view: {} on: {}", member.getId(), currentView(),
node.getId(), e);
accuse(member, destination.ring(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ private void resetObservers() {
if (observers.size() > 1 && observers.size() < context.getRingCount()) {
log.debug("Incomplete observers: {} cardinality: {} view: {} context: {} on: {}", observers.size(),
context.cardinality(), currentView(), context.getId(), node.getId());
assert observers.size() > 1 && observers.size() < context.getRingCount();
}
log.trace("Reset observers: {} cardinality: {} view: {} context: {} on: {}", observers.size(),
context.cardinality(), currentView(), context.getId(), node.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ public class StandardEpProvider implements EndpointProvider {

public StandardEpProvider(String bindAddress, ClientAuth clientAuth, CertificateValidator validator,
Function<Member, String> resolver) {
this.bindAddress = EndpointProvider.reify(bindAddress);
this(EndpointProvider.reify(bindAddress), clientAuth, validator, resolver);
}

public StandardEpProvider(SocketAddress bindAddress, ClientAuth clientAuth, CertificateValidator validator,
Function<Member, String> resolver) {
this.bindAddress = bindAddress;
this.clientAuth = clientAuth;
this.validator = validator;
this.resolver = resolver;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.salesforce.apollo.protocols;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Enumeration;

import static java.lang.System.out;

/**
* @author hal.hildebrand
**/
public class ListNIFs {
public static void main(String[] args) throws SocketException {
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();

for (NetworkInterface netIf : Collections.list(nets)) {
out.printf("Display name: %s\n", netIf.getDisplayName());
out.printf("Name: %s\n", netIf.getName());
var addresses = netIf.getInterfaceAddresses();
for (var add : addresses) {
out.printf("Address: %s\n", add);
}
displaySubInterfaces(netIf);
out.print("\n");
}
}

static void displaySubInterfaces(NetworkInterface netIf) throws SocketException {
Enumeration<NetworkInterface> subIfs = netIf.getSubInterfaces();
for (NetworkInterface subIf : Collections.list(subIfs)) {
out.printf("\tSub Interface Display name: %s\n", subIf.getDisplayName());
out.printf("\tSub Interface Name: %s\n", subIf.getName());
Enumeration<InetAddress> inetAddresses = subIf.getInetAddresses();
for (InetAddress add : Collections.list(inetAddresses)) {
out.printf("\t Sub Interface Address: %s\n", add.getHostAddress());
}
}
}
}

0 comments on commit 8f6a81c

Please sign in to comment.