Skip to content

Commit

Permalink
better ViewChange
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 13, 2024
1 parent 8f6a81c commit 4c99318
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 140 deletions.
10 changes: 5 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DelegatedContext;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.Signer.SignerImpl;
import com.salesforce.apollo.cryptography.proto.PubKey;
Expand Down Expand Up @@ -322,15 +323,14 @@ public String logState() {

/**
* A view change has occurred
*
* @param context - the new membership context
* @param diadem - the compact HexBloom of the context view
*/
public void rotateViewKeys(Context<Member> context, Digest diadem) {
public void rotateViewKeys(ViewChange viewChange) {
var context = viewChange.context();
var diadem = viewChange.diadem();
((DelegatedContext<Member>) combine.getContext()).setContext(context);
var c = current.get();
if (c != null) {
c.nextView(diadem, context);
c.nextView(viewChange.diadem(), context);
} else {
log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void gather(List<ByteString> preblock, boolean last) {
try {
return Join.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("error parsing join: {} on: {}", bs, params().member().getId(), e);
log.trace("error parsing join: {} on: {}", bs, params().member().getId(), e);
return null;
}
})
Expand Down
36 changes: 19 additions & 17 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import com.salesforce.apollo.archipelago.Router.ServiceRouting;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.context.DynamicContextImpl;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.fireflies.Binding.Bound;
Expand Down Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -87,25 +88,23 @@ public class View {
private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change";

final CommonCommunications<Fireflies, Service> comm;
final AtomicBoolean started = new AtomicBoolean();
final AtomicBoolean started = new AtomicBoolean();
private final CommonCommunications<Entrance, Service> approaches;
private final DynamicContext<Participant> context;
private final DigestAlgorithm digestAlgo;
private final RingCommunications<Participant, Fireflies> gossiper;
private final AtomicBoolean introduced = new AtomicBoolean();
private final List<BiConsumer<Context, Digest>> viewChangeListeners = new CopyOnWriteArrayList<>();
private final Executor viewNotificationQueue = Executors.newSingleThreadExecutor(
Thread.ofVirtual().factory());
private final AtomicBoolean introduced = new AtomicBoolean();
private final List<Consumer<ViewChange>> viewChangeListeners = new CopyOnWriteArrayList<>();
private final Executor viewNotificationQueue;
private final FireflyMetrics metrics;
private final Node node;
private final Map<Digest, SignedViewChange> observations = new ConcurrentSkipListMap<>();
private final Map<Digest, SignedViewChange> observations = new ConcurrentSkipListMap<>();
private final Parameters params;
private final ConcurrentMap<Digest, RoundScheduler.Timer> pendingRebuttals = new ConcurrentSkipListMap<>();
private final ConcurrentMap<Digest, RoundScheduler.Timer> pendingRebuttals = new ConcurrentSkipListMap<>();
private final RoundScheduler roundTimers;
private final Set<Digest> shunned = new ConcurrentSkipListSet<>();
private final Map<String, RoundScheduler.Timer> timers = new HashMap<>();
private final ReadWriteLock viewChange = new ReentrantReadWriteLock(
true);
private final Set<Digest> shunned = new ConcurrentSkipListSet<>();
private final Map<String, RoundScheduler.Timer> timers = new HashMap<>();
private final ReadWriteLock viewChange;
private final ViewManagement viewManagement;
private final EventValidation validation;
private final Verifiers verifiers;
Expand Down Expand Up @@ -141,6 +140,8 @@ public View(DynamicContext<Participant> context, ControlledIdentifierMember memb
gossiper.ignoreSelf();
this.validation = validation;
this.verifiers = verifiers;
viewNotificationQueue = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory());
viewChange = new ReentrantReadWriteLock(true);
}

/**
Expand Down Expand Up @@ -170,7 +171,7 @@ public static boolean isValidMask(BitSet mask, DynamicContext<?> context) {
/**
* Deregister the listener
*/
public void deregister(BiConsumer<Context, Digest> listener) {
public void deregister(Consumer<ViewChange> listener) {
viewChangeListeners.remove(listener);
}

Expand All @@ -191,7 +192,7 @@ public Digest getNodeId() {
/**
* Register the listener to receive view changes
*/
public void register(BiConsumer<Context, Digest> listener) {
public void register(Consumer<ViewChange> listener) {
viewChangeListeners.add(listener);
}

Expand Down Expand Up @@ -416,14 +417,15 @@ void introduced() {
}

void notifyListeners(List<SelfAddressingIdentifier> joining, List<Digest> leaving) {
final var current = currentView();
var sc = context.asStatic();
final var viewChange = new ViewChange(context.asStatic(), currentView(),
joining.stream().map(SelfAddressingIdentifier::getDigest).toList(),
Collections.unmodifiableList(leaving));
viewNotificationQueue.execute(Utils.wrapped(() -> {
viewChangeListeners.forEach(listener -> {
try {
log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", listener,
currentView(), context.size(), joining.size(), leaving.size(), node.getId());
listener.accept(sc, current);
listener.accept(viewChange);
} catch (Throwable e) {
log.error("error in view change listener: {} on: {} ", listener, node.getId(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.salesforce.apollo.context;

import com.salesforce.apollo.cryptography.Digest;

import java.util.Collection;

public record ViewChange(Context context, Digest diadem, Collection<Digest> joining, Collection<Digest> leaving) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.SignatureAlgorithm;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* The logical domain of the current "Process" - OS and Simulation defined, 'natch.
Expand All @@ -51,7 +53,7 @@ public class ProcessDomain extends Domain {
private final Verifiers.DelegatedVerifiers verifiers;
private final ProcessDomainParameters parameters;
private final List<BiConsumer<Context, Digest>> lifecycleListeners = new CopyOnWriteArrayList<>();
private final BiConsumer<Context, Digest> listener = listener();
private final Consumer<ViewChange> listener = listener();

public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams,
Builder builder, Parameters.RuntimeParameters.Builder runtime, String endpoint,
Expand Down Expand Up @@ -118,13 +120,13 @@ public void stop() {
}
}

protected BiConsumer<Context, Digest> listener() {
return (context, diadem) -> {
dht.nextView(context, diadem);
choam.rotateViewKeys(context, diadem);
protected Consumer<ViewChange> listener() {
return (viewChange) -> {
dht.nextView(viewChange);
choam.rotateViewKeys(viewChange);

log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.size(),
params.member().getId());
log.info("View change: {} for: {} cardinality: {} on: {}", viewChange.diadem(), params.context().getId(),
viewChange.context().size(), params.member().getId());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.proto.FoundationSeal;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContextImpl;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.delphinius.Oracle;
Expand All @@ -36,7 +36,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -115,14 +115,14 @@ public void smokin() throws Exception {
final var seeds = Collections.singletonList(
new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), EndpointProvider.allocatePort()));
domains.forEach(d -> {
BiConsumer<Context, Digest> c = (context, viewId) -> {
if (context.cardinality() == CARDINALITY) {
System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.cardinality(),
d.getMember().getId());
Consumer<ViewChange> c = viewChange -> {
if (viewChange.context().cardinality() == CARDINALITY) {
System.out.printf("Full view: %s members: %s on: %s%n", viewChange.diadem(),
viewChange.context().cardinality(), d.getMember().getId());
countdown.countDown();
} else {
System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.cardinality(),
d.getMember().getId());
System.out.printf("Members joining: %s members: %s on: %s%n", viewChange.diadem(),
viewChange.context().cardinality(), d.getMember().getId());
}
};
d.foundation.register(c);
Expand Down
Loading

0 comments on commit 4c99318

Please sign in to comment.