From 7d225978254c7829bc40cb5a142d5ed040243347 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 29 Jun 2024 21:21:46 -0700 Subject: [PATCH] better observation attempt lifecycle. use executor for linearizing --- .../com/salesforce/apollo/choam/CHOAM.java | 71 +++++++------------ .../com/salesforce/apollo/choam/Producer.java | 58 +++++---------- .../salesforce/apollo/choam/fsm/Driven.java | 12 +--- .../ethereal/memberships/ChRbcGossip.java | 2 + .../com/salesforce/apollo/fireflies/View.java | 19 +++-- .../apollo/fireflies/ViewManagement.java | 42 +++++++---- 6 files changed, 85 insertions(+), 119 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 42cf1dce1..68fe30726 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -99,13 +99,14 @@ public class CHOAM { private final AtomicReference view = new AtomicReference<>(); private final PendingViews pendingViews = new PendingViews(); private final ScheduledExecutorService scheduler; - private final Semaphore linear = new Semaphore(1); + private final ExecutorService linear; private final AtomicBoolean ongoingJoin = new AtomicBoolean(); public CHOAM(Parameters params) { scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); this.params = params; + linear = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); pendingViews.add(params.context().getId(), params.context().delegate()); rotateViewKeys(); @@ -116,18 +117,16 @@ public CHOAM(Parameters params) { combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(), params.metrics() == null ? null : params.metrics().getCombineMetrics(), adapter); - combine.registerHandler((_, messages) -> { - Thread.ofVirtual().start(() -> { - if (!started.get()) { - return; - } - try { - combine(messages); - } catch (Throwable t) { - log.error("Failed to combine messages on: {}", params.member().getId(), t); - } - }); - }); + combine.registerHandler((_, messages) -> Thread.ofVirtual().start(() -> { + if (!started.get()) { + return; + } + try { + combine(messages); + } catch (Throwable t) { + log.error("Failed to combine messages on: {}", params.member().getId(), t); + } + })); head.set(new NullBlock(params.digestAlgorithm())); view.set(new NullBlock(params.digestAlgorithm())); checkpoint.set(new NullBlock(params.digestAlgorithm())); @@ -354,10 +353,11 @@ public void stop() { if (!started.compareAndSet(true, false)) { return; } - linear.release(10000); + linear.shutdown(); try { scheduler.shutdownNow(); } catch (Throwable e) { + // ignore } session.cancelAll(); final var c = current.get(); @@ -365,11 +365,13 @@ public void stop() { try { c.complete(); } catch (Throwable e) { + // ignore } } try { combine.stop(); } catch (Throwable e) { + // ignore } } @@ -577,12 +579,11 @@ private void execute(List execs) { h.hash, h.height(), execs.size(), params.member().getId()); for (int i = 0; i < execs.size(); i++) { var exec = execs.get(i); - final var index = i; Digest hash = hashOf(exec, params.digestAlgorithm()); var stxn = session.complete(hash); try { params.processor() - .execute(index, CHOAM.hashOf(exec, params.digestAlgorithm()), exec, + .execute(i, CHOAM.hashOf(exec, params.digestAlgorithm()), exec, stxn == null ? null : stxn.onCompletion()); } catch (Throwable t) { log.error("Exception processing transaction: {} block: {} height: {} on: {}", hash, h.hash, h.height(), @@ -591,7 +592,7 @@ private void execute(List execs) { } } - private CheckpointSegments fetch(CheckpointReplication request, Digest from) { + private CheckpointSegments fetch(CheckpointReplication request) { CheckpointState state = cachedCheckpoints.get(ULong.valueOf(request.getCheckpoint())); if (state == null) { log.info("No cached checkpoint for {} on: {}", request.getCheckpoint(), params.member().getId()); @@ -604,14 +605,14 @@ private CheckpointSegments fetch(CheckpointReplication request, Digest from) { .build(); } - private Blocks fetchBlocks(BlockReplication rep, Digest from) { + private Blocks fetchBlocks(BlockReplication rep) { BloomFilter bff = BloomFilter.from(rep.getBlocksBff()); Blocks.Builder blocks = Blocks.newBuilder(); store.fetchBlocks(bff, blocks, 100, ULong.valueOf(rep.getFrom()), ULong.valueOf(rep.getTo())); return blocks.build(); } - private Blocks fetchViewChain(BlockReplication rep, Digest from) { + private Blocks fetchViewChain(BlockReplication rep) { BloomFilter bff = BloomFilter.from(rep.getBlocksBff()); Blocks.Builder blocks = Blocks.newBuilder(); store.fetchViewChain(bff, blocks, 100, ULong.valueOf(rep.getFrom()), ULong.valueOf(rep.getTo())); @@ -649,7 +650,7 @@ private boolean isNext(HashedBlock next) { return isNext; } - private Empty join(SignedViewMember nextView, Digest from) { + private void join(SignedViewMember nextView, Digest from) { var c = current.get(); if (c == null) { log.trace("No committee for: {} to join: {} diadem: {} on: {}", from, @@ -658,7 +659,6 @@ private Empty join(SignedViewMember nextView, Digest from) { throw new StatusRuntimeException(FAILED_PRECONDITION); } c.join(nextView, from); - return Empty.getDefaultInstance(); } private Supplier pendingViews() { @@ -1196,25 +1196,7 @@ public void cancelTimer(String timer) { @Override public void combine() { - Thread.ofVirtual().start(Utils.wrapped(() -> { - if (!started.get()) { - return; - } - try { - linear.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - try { - CHOAM.this.combine(); - } finally { - linear.release(); - } - }, log)); - if (linear.getQueueLength() > 0) { - log.info("Linear Q: {} on: {}", linear.getQueueLength(), params.member().getId()); - } + linear.execute(Utils.wrapped(() -> CHOAM.this.combine(), log)); } @Override @@ -1270,17 +1252,17 @@ public class Trampoline implements Concierge { @Override public CheckpointSegments fetch(CheckpointReplication request, Digest from) { - return CHOAM.this.fetch(request, from); + return CHOAM.this.fetch(request); } @Override public Blocks fetchBlocks(BlockReplication request, Digest from) { - return CHOAM.this.fetchBlocks(request, from); + return CHOAM.this.fetchBlocks(request); } @Override public Blocks fetchViewChain(BlockReplication request, Digest from) { - return CHOAM.this.fetchViewChain(request, from); + return CHOAM.this.fetchViewChain(request); } @Override @@ -1506,9 +1488,6 @@ private Attempt join(View view, Terminal t, SignedViewMember svm) { record Attempt(Member m, ListenableFuture fs) { } - - private record JoinState(AtomicBoolean halt, Thread joining) { - } } /** a member of the current committee */ diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index f4b0df709..def7fd907 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -53,7 +53,7 @@ public class Producer { private final Transitions transitions; private final ViewContext view; private final Digest nextViewId; - private final Semaphore serialize = new Semaphore(1); + private final ExecutorService serialize; private final ViewAssembly assembly; private final int maxEpoch; private final AtomicBoolean assembled = new AtomicBoolean(false); @@ -94,7 +94,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash log.trace("Pid: {} for: {} on: {}", pid, getViewId(), params().member().getId()); config.setPid(pid).setnProc((short) view.roster().size()); } - + serialize = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); config.setLabel("Producer" + getViewId() + " on: " + params().member().getId()); var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics(); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial, @@ -155,7 +155,7 @@ public void stop() { return; } log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId()); - serialize.release(10000); + serialize.shutdown(); controller.stop(); coordinator.stop(); ds.close(); @@ -221,32 +221,18 @@ private Digest getViewId() { } private void newEpoch(Integer epoch) { - Thread.ofVirtual().start(Utils.wrapped(() -> { - try { - serialize.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - try { - log.trace("new epoch: {} on: {}", epoch, params().member().getId()); - assembly.newEpoch(); - var last = epoch >= maxEpoch && assembled.get(); - if (last) { - controller.completeIt(); - Producer.this.transitions.viewComplete(); - } else { - ds.reset(); - } - transitions.newEpoch(epoch, last); - } finally { - serialize.release(); + serialize.execute(Utils.wrapped(() -> { + log.trace("new epoch: {} on: {}", epoch, params().member().getId()); + assembly.newEpoch(); + var last = epoch >= maxEpoch && assembled.get(); + if (last) { + controller.completeIt(); + Producer.this.transitions.viewComplete(); + } else { + ds.reset(); } + transitions.newEpoch(epoch, last); }, log)); - var awaiting = serialize.getQueueLength(); - if (awaiting > 0) { - log.error("Serialize: {} on: {}", awaiting, params().member().getId()); - } } private Parameters params() { @@ -374,27 +360,14 @@ private void reconfigure() { } private void serial(List preblock, Boolean last) { - Thread.ofVirtual().start(() -> { - try { - serialize.acquire(); - create(preblock, last); - } catch (Throwable t) { - log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t); - } finally { - serialize.release(); - } - }); - var awaiting = serialize.getQueueLength(); - if (awaiting > 0) { - log.error("Serialize: {} on: {}", awaiting, params().member().getId()); - } + serialize.execute(Utils.wrapped(() -> create(preblock, last), log)); } private PendingBlock validate(Validate v) { Digest hash = Digest.from(v.getHash()); var p = pending.get(hash); if (p == null) { - pendingValidations.computeIfAbsent(hash, h -> new CopyOnWriteArrayList<>()).add(v); + pendingValidations.computeIfAbsent(hash, _ -> new CopyOnWriteArrayList<>()).add(v); return null; } return validate(v, p, hash); @@ -442,6 +415,7 @@ public void checkpoint() { final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean()); pending.put(next.hash, p); p.witnesses.put(params().member(), validation); + assert next.block != null; log.info("Produced: {} hash: {} height: {} for: {} on: {}", next.block.getBodyCase(), next.hash, next.height(), getViewId(), params().member().getId()); processPendingValidations(next, p); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java index 2cf86b242..890a841b9 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java @@ -41,11 +41,6 @@ public void assemble() { public Transitions assembled() { return SPICE; } - - @Override - public Transitions newEpoch(int epoch, boolean lastEpoch) { - return lastEpoch ? PROTOCOL_FAILURE : null; - } }, CHECKPOINTING { @Entry public void check() { @@ -116,11 +111,6 @@ public void terminate() { context().fail(); } }, SPICE { - @Override - public Transitions newEpoch(int epoch, boolean lastEpoch) { - return lastEpoch ? PROTOCOL_FAILURE : null; - } - @Override public Transitions viewComplete() { return END_EPOCHS; @@ -157,7 +147,7 @@ default Transitions lastBlock() { } default Transitions newEpoch(int epoch, boolean lastEpoch) { - throw fsm().invalidTransitionOn(); + return null; } default Transitions start() { diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index 6cd5e48af..6e9f82705 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -98,6 +98,8 @@ public void start(Duration duration, Predicate= context.majority(); + } + boolean hasPendingRebuttals() { return !pendingRebuttals.isEmpty(); } @@ -835,7 +839,8 @@ private boolean add(NoteWrapper note) { */ private boolean add(SignedViewChange observation) { var svu = new SVU(observation, digestAlgo); - if (!viewManagement.isObserver(svu.observer)) { + var highWater = viewManagement.highWater(svu.observer); + if (highWater == null) { log.trace("Invalid observer: {} current: {} on: {}", svu.observer, currentView(), node.getId()); return false; } @@ -845,17 +850,17 @@ private boolean add(SignedViewChange observation) { node.getId()); return false; } + if (highWater >= svu.attempt) { + log.trace("Redundant view change: {} current: {} view: {} from {} on: {}", svu.attempt, highWater, + currentView(), svu.observer, node.getId()); + return false; + } final var member = context.getActiveMember(svu.observer); if (member == null) { log.trace("Cannot validate view change: {} current: {} from: {} on: {}", inView, currentView(), svu.observer, node.getId()); return false; } - if (!viewManagement.isObserver(member.id)) { - log.trace("Not an observer of: {} current: {} from: {} on: {}", inView, currentView(), svu.observer, - node.getId()); - return false; - } final var signature = JohnHancock.from(observation.getSignature()); if (!member.verify(signature, observation.getChange().toByteString())) { return false; @@ -866,6 +871,8 @@ private boolean add(SignedViewChange observation) { log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}", svu.attempt, cur.attempt, inView, currentView(), svu.observer, node.getId()); return cur; + } else { + viewManagement.updateHighWater(d, svu.attempt); } } log.trace("Observation: {} current: {} view change: {} from: {} on: {}", svu.attempt, inView, currentView(), diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index f850a852a..f4c77e354 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -48,7 +48,7 @@ public class ViewManagement { private static final Logger log = LoggerFactory.getLogger(ViewManagement.class); final AtomicReference diadem = new AtomicReference<>(); - final Set observers = new ConcurrentSkipListSet<>(); + final Map observers = new ConcurrentSkipListMap<>(); private final AtomicInteger attempt = new AtomicInteger(); private final Digest bootstrapView; private final DynamicContext context; @@ -127,7 +127,7 @@ Digest currentView() { } void enjoin(Join join, Digest observer) { - if (!observers.contains(node.getId()) || !observers.contains(observer)) { + if (!observers.containsKey(node.getId()) || !observers.containsKey(observer)) { log.trace("Not observer, ignored enjoin from: {} on: {}", observer, node.getId()); throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("Not observer")); } @@ -152,7 +152,7 @@ void enjoin(Join join, Digest observer) { context.getId(), cardinality(), node.getId()); return; } - if (!observers.contains(node.getId())) { + if (!observers.containsKey(node.getId())) { log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId()); throw new StatusRuntimeException( Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view")); @@ -170,7 +170,7 @@ void enjoin(Join join, Digest observer) { void gc(Participant member) { assert member != null; view.stable(() -> { - if (observers.remove(member.id)) { + if (observers.remove(member.id) != null) { log.trace("Removed observer: {} view: {} on: {}", member.id, currentView.get(), node.getId()); resetObservers(); } @@ -189,6 +189,10 @@ BloomFilter getJoinsBff(long seed, double p) { return bff; } + Integer highWater(Digest observer) { + return observers.get(observer); + } + /** * Initiate the view change */ @@ -372,7 +376,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time .toList(), from, responseObserver, timer); return; } - if (!observers.contains(node.getId())) { + if (!observers.containsKey(node.getId())) { log.trace("Not observer, ignoring Join from: {} observers: {} on: {}", from, observers, node.getId()); responseObserver.onError(new StatusRuntimeException( Status.FAILED_PRECONDITION.withDescription("Not observer, ignored join of view"))); @@ -401,8 +405,8 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time log.debug("Member pending join: {} view: {} context: {} on: {}", from, currentView(), context.getId(), node.getId()); var enjoining = new SliceIterator<>("Enjoining[%s:%s]".formatted(currentView(), from), node, - observers.stream().map(context::getActiveMember).toList(), view.comm, - scheduler); + observers.keySet().stream().map(context::getActiveMember).toList(), + view.comm, scheduler); enjoining.iterate(t -> t.enjoin(join), (_, _, _, _) -> true, () -> { }, Duration.ofMillis(1)); }); @@ -484,7 +488,9 @@ void maybeViewChange() { return; } } - if ((context.offlineCount() > 0 || !joins.isEmpty())) { + var change = context.offlineCount() > 0 || !joins.isEmpty(); + var shouldChange = isObserver() || view.hasMajorityObservervations(bootstrap); + if (change && shouldChange) { initiateViewChange(); } else { view.scheduleViewChange(); @@ -492,7 +498,7 @@ void maybeViewChange() { } Set observers() { - return observers; + return observers.keySet(); } List observersList() { @@ -535,6 +541,10 @@ HexBloom resetBootstrapView() { return hex; } + void resetHighWater() { + observers.entrySet().forEach(e -> e.setValue(-1)); + } + Redirect seed(Registration registration, Digest from) { final var requestView = Digest.from(registration.getView()); @@ -560,7 +570,7 @@ Redirect seed(Registration registration, Digest from) { return view.stable(() -> { var newMember = view.new Participant(note.getId()); - final var introductions = observers.stream().map(context::getMember).toList(); + final var introductions = observers.keySet().stream().map(context::getMember).toList(); log.info("Member seeding: {} view: {} context: {} introductions: {} on: {}", newMember.getId(), currentView(), context.getId(), introductions.stream().map(p -> p.getId()).toList(), node.getId()); @@ -582,11 +592,15 @@ void start(CompletableFuture onJoin, boolean bootstrap) { this.bootstrap = bootstrap; } + void updateHighWater(Digest d, int attempt) { + observers.compute(d, (k, v) -> attempt <= v ? v : attempt); + } + /** * @return true if the receiver is part of the BFT Observers of this group */ private boolean isObserver() { - return observers.contains(node.getId()); + return observers.containsKey(node.getId()); } private void joined(Collection seedSet, Digest from, StreamObserver responseObserver, @@ -632,9 +646,9 @@ private void resetObservers() { context.bftSubset(diadem.get().compact(), context::isActive) .stream() .map(Member::getId) - .forEach(observers::add); + .forEach(d -> observers.put(d, -1)); if (observers.isEmpty()) { - observers.add(node.getId()); // bootstrap case + observers.put(node.getId(), -1); // bootstrap case } if (observers.size() > 1 && observers.size() < context.getRingCount()) { log.debug("Incomplete observers: {} cardinality: {} view: {} context: {} on: {}", observers.size(), @@ -653,7 +667,7 @@ private void setDiadem(final HexBloom hex) { resetObservers(); log.trace("View: {} set diadem: {} cardinality: {} observers: {} view: {} context: {} size: {} on: {}", context.getId(), diadem.get().compactWrapped(), diadem.get().getCardinality(), - observers.stream().toList(), currentView(), context.getId(), context.size(), node.getId()); + observers.keySet().stream().toList(), currentView(), context.getId(), context.size(), node.getId()); } record Ballot(Digest view, List leaving, List joining, int hash) {