From 7a8d8279748d77170c7edee9946dc0c52ad73b53 Mon Sep 17 00:00:00 2001 From: Constantine Date: Sun, 4 Feb 2024 12:29:05 -0800 Subject: [PATCH] thoth-2 (#183) * fix parallelism in Binding/ViewManagement (join/seed). api for delegated verifiers/validation in ProcessDomain. * formatting * moar formatting * 2 threads for testing * 4 threads for testing, 1.5 per core parallel maven build. * Think I don't need the separate cache. use 2 threads per core mvn build * back to 1 mvn thread per core, 2 threads for test --- .github/workflows/maven.yml | 9 +-- .../salesforce/apollo/fireflies/Binding.java | 25 ++++--- .../com/salesforce/apollo/fireflies/View.java | 2 + .../apollo/fireflies/ViewManagement.java | 68 ++++++++++--------- .../apollo/model/ProcessDomain.java | 15 ++-- pom.xml | 4 ++ .../com/salesforce/apollo/thoth/KerlDHT.java | 67 +++++++++--------- 7 files changed, 99 insertions(+), 91 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 91f74ffa4..9c51fa116 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -8,13 +8,6 @@ jobs: runs-on: ubuntu-latest steps: - - name: Cache local Maven repository - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-maven- - uses: actions/checkout@v3 - uses: graalvm/setup-graalvm@v1 with: @@ -23,4 +16,4 @@ jobs: cache: 'maven' github-token: ${{ secrets.GITHUB_TOKEN }} - name: Build with Maven - run: ./mvnw -batch-mode clean install -Ppre --file pom.xml + run: ./mvnw -T 1C -batch-mode clean install -Ppre --file pom.xml diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 6c23dfbec..53296ee6b 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -220,18 +220,21 @@ private Join join(Digest v) { log.error("Empty seeding response on: {}", node.getId()); return; } - var view = Digest.from(r.getView()); - log.info("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(), view, - context.getId(), node.getId()); - this.context.rebalance(r.getCardinality()); - node.nextNote(view); - log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(), - r.getSampleCount(), node.getId()); - if (timer != null) { - timer.close(); - } - join(r, view, duration); + Thread.ofVirtual().start(Utils.wrapped(() -> { + var view = Digest.from(r.getView()); + log.info("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(), + view, context.getId(), node.getId()); + this.context.rebalance(r.getCardinality()); + node.nextNote(view); + + log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(), + r.getSampleCount(), node.getId()); + if (timer != null) { + timer.close(); + } + join(r, view, duration); + }, log)); }; } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 0b1ecf6ab..edafc6106 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -581,6 +581,8 @@ void viewChange(Runnable r) { try { r.run(); // log.error("Exit view change on: {}", node.getId()); + } catch (Throwable t) { + log.error("Error during view change on: {}", node.getId(), t); } finally { lock.unlock(); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 62584d8f0..61811b4a2 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -300,43 +300,47 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time } BiConsumer join(Duration duration, Timer.Context timer) { - return (bound, t) -> view.viewChange(() -> { - final var hex = bound.view(); + return (bound, t) -> { if (t != null) { log.error("Failed to join view on: {}", node.getId(), t); view.stop(); return; } - - log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(), - hex.compact(), context.getId(), node.getId()); - context.rebalance(hex.getCardinality()); - context.activate(node); - diadem.set(hex); - currentView.set(hex.compact()); - - bound.successors().forEach(view::addToView); - bound.initialSeedSet().forEach(view::addToView); - - view.reset(); - - context.allMembers().forEach(Participant::clearAccusations); - - view.schedule(duration); - - if (timer != null) { - timer.stop(); - } - - view.introduced(); - log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(), - bound.successors().size(), cardinality(), context.totalCount(), node.getId()); - if (context.totalCount() == cardinality()) { - join(); - } else { - populate(new ArrayList<>(context.activeMembers())); - } - }); + Thread.ofVirtual().start(Utils.wrapped(() -> { + view.viewChange(() -> { + final var hex = bound.view(); + + log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(), + hex.compact(), context.getId(), node.getId()); + context.rebalance(hex.getCardinality()); + context.activate(node); + diadem.set(hex); + currentView.set(hex.compact()); + + bound.successors().forEach(view::addToView); + bound.initialSeedSet().forEach(view::addToView); + + view.reset(); + + context.allMembers().forEach(Participant::clearAccusations); + + view.schedule(duration); + + if (timer != null) { + timer.stop(); + } + + view.introduced(); + log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(), + bound.successors().size(), cardinality(), context.totalCount(), node.getId()); + if (context.totalCount() == cardinality()) { + join(); + } else { + populate(new ArrayList<>(context.activeMembers())); + } + }); + }, log)); + }; } void joinUpdatesFor(BloomFilter joinBff, Builder builder) { diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index cf69556f6..ac3796e17 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -40,13 +40,14 @@ * @author hal.hildebrand */ public class ProcessDomain extends Domain { - private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); - protected final KerlDHT dht; - protected final View foundation; - private final EventValidation.DelegatedValidation validations; - private final Verifiers.DelegatedVerifiers verifiers; - private final ProcessDomainParameters parameters; - private final ViewLifecycleListener listener = listener(); + private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); + + protected final KerlDHT dht; + protected final View foundation; + private final EventValidation.DelegatedValidation validations; + private final Verifiers.DelegatedVerifiers verifiers; + private final ProcessDomainParameters parameters; + private final ViewLifecycleListener listener = listener(); public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams, Builder builder, Parameters.RuntimeParameters.Builder runtime, InetSocketAddress endpoint, diff --git a/pom.xml b/pom.xml index e4dbd549a..a1d2b5a28 100644 --- a/pom.xml +++ b/pom.xml @@ -776,6 +776,10 @@ org.apache.maven.plugins maven-surefire-plugin 3.1.2 + + 2 + true + org.apache.maven.plugins diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 65ce18665..b83df4e82 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -184,10 +184,6 @@ public static void updateLocationHash(Identifier identifier, DigestAlgorithm dig }); } - static T completeIt(T result) { - return result; - } - public KeyState_ append(AttachmentEvent event) { if (event == null) { return null; @@ -232,12 +228,12 @@ public KeyState_ append(AttachmentEvent event) { @Override public List append(KERL_ kerl) { if (kerl.getEventsList().isEmpty()) { - return completeIt(Collections.emptyList()); + return Collections.emptyList(); } final var event = kerl.getEventsList().getFirst(); Digest identifier = digestOf(event, digestAlgorithm()); if (identifier == null) { - return completeIt(Collections.emptyList()); + return Collections.emptyList(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -307,7 +303,7 @@ public KeyState_ append(KeyEvent_ event) { @Override public List append(List events) { if (events.isEmpty()) { - return completeIt(Collections.emptyList()); + return Collections.emptyList(); } List states = new ArrayList<>(); events.stream().map(this::append).forEach(states::add); @@ -317,7 +313,7 @@ public List append(List events) { @Override public List append(List events, List attachments) { if (events.isEmpty()) { - return completeIt(Collections.emptyList()); + return Collections.emptyList(); } List states = new ArrayList<>(); events.stream().map(this::append).forEach(states::add); @@ -329,12 +325,12 @@ public List append(List events, List atta @Override public Empty appendAttachments(List events) { if (events.isEmpty()) { - return completeIt(Empty.getDefaultInstance()); + return Empty.getDefaultInstance(); } final var event = events.getFirst(); Digest identifier = digestAlgorithm().digest(event.getCoordinates().getIdentifier().toByteString()); if (identifier == null) { - return completeIt(Empty.getDefaultInstance()); + return Empty.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -369,11 +365,11 @@ public Empty appendAttachments(List events) { @Override public Empty appendValidations(Validations validations) { if (validations.getValidationsCount() == 0) { - return completeIt(null); + return null; } Digest identifier = digestAlgorithm().digest(validations.getCoordinates().getIdentifier().toByteString()); if (identifier == null) { - return completeIt(null); + return null; } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -426,11 +422,11 @@ public Ani getAni() { @Override public Attachment getAttachment(EventCoords coordinates) { if (coordinates == null) { - return completeIt(Attachment.getDefaultInstance()); + return Attachment.getDefaultInstance(); } Digest identifier = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (identifier == null) { - return completeIt(Attachment.getDefaultInstance()); + return Attachment.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -462,11 +458,11 @@ public Attachment getAttachment(EventCoords coordinates) { @Override public KERL_ getKERL(Ident identifier) { if (identifier == null) { - return completeIt(KERL_.getDefaultInstance()); + return KERL_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(identifier.toByteString()); if (digest == null) { - return completeIt(KERL_.getDefaultInstance()); + return KERL_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -497,14 +493,17 @@ public KERL_ getKERL(Ident identifier) { @Override public KeyEvent_ getKeyEvent(EventCoords coordinates) { + if (!coordinates.isInitialized()) { + return KeyEvent_.getDefaultInstance(); + } var operation = "getKeyEvent(%s)".formatted(EventCoordinates.from(coordinates)); log.trace("{} on: {}", operation, member.getId()); if (coordinates == null) { - return completeIt(KeyEvent_.getDefaultInstance()); + return KeyEvent_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (digest == null) { - return completeIt(KeyEvent_.getDefaultInstance()); + return KeyEvent_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -537,11 +536,11 @@ public KeyState_ getKeyState(EventCoords coordinates) { var operation = "getKeyState(%s)".formatted(EventCoordinates.from(coordinates)); log.info("{} on: {}", operation, member.getId()); if (coordinates == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (digest == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -574,11 +573,11 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { var operation = "getKeyState(%s, %s)".formatted(Identifier.from(identifier), ULong.valueOf(sequenceNumber)); log.info("{} on: {}", operation, member.getId()); if (identifier == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(identifier.toByteString()); if (digest == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } var identAndSeq = IdentAndSeq.newBuilder().setIdentifier(identifier).setSequenceNumber(sequenceNumber).build(); Instant timedOut = Instant.now().plus(operationTimeout); @@ -610,13 +609,13 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { @Override public KeyState_ getKeyState(Ident identifier) { if (identifier == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } var operation = "getKeyState(%s)".formatted(Identifier.from(identifier)); log.info("{} on: {}", operation, member.getId()); Digest digest = digestAlgorithm().digest(identifier.toByteString()); if (digest == null) { - return completeIt(KeyState_.getDefaultInstance()); + return KeyState_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -648,11 +647,11 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat var operation = "getKeyStateWithAttachments(%s)".formatted(EventCoordinates.from(coordinates)); log.info("{} on: {}", operation, member.getId()); if (coordinates == null) { - return completeIt(KeyStateWithAttachments_.getDefaultInstance()); + return KeyStateWithAttachments_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (digest == null) { - return completeIt(KeyStateWithAttachments_.getDefaultInstance()); + return KeyStateWithAttachments_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -684,11 +683,11 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal var operation = "getKeyStateWithEndorsementsAndValidations(%s)".formatted(EventCoordinates.from(coordinates)); log.info("{} on: {}", operation, member.getId()); if (coordinates == null) { - return completeIt(KeyStateWithEndorsementsAndValidations_.getDefaultInstance()); + return KeyStateWithEndorsementsAndValidations_.getDefaultInstance(); } Digest digest = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (digest == null) { - return completeIt(KeyStateWithEndorsementsAndValidations_.getDefaultInstance()); + return KeyStateWithEndorsementsAndValidations_.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -720,11 +719,11 @@ public Validations getValidations(EventCoords coordinates) { var operation = "getValidations(%s)".formatted(EventCoordinates.from(coordinates)); log.info("{} on: {}", operation, member.getId()); if (coordinates == null) { - return completeIt(Validations.getDefaultInstance()); + return Validations.getDefaultInstance(); } Digest identifier = digestAlgorithm().digest(coordinates.getIdentifier().toByteString()); if (identifier == null) { - return completeIt(Validations.getDefaultInstance()); + return Validations.getDefaultInstance(); } Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -795,7 +794,7 @@ private T complete(Function func) { try { return func.apply(new ProtoKERLAdapter(kerl)); } catch (Throwable t) { - log.error("Error completing", t); + log.error("Error completing on: {}", member.getId(), t); return null; } } @@ -839,10 +838,12 @@ private void initializeSchema() { database)) { liquibase.update((String) null); } catch (LiquibaseException e) { + log.error("Unable to initialize schema on: {}", member.getId(), e); throw new IllegalStateException(e); } - } catch (SQLException e1) { - throw new IllegalStateException(e1); + } catch (SQLException e) { + log.error("Unable to initialize schema on: {}", member.getId(), e); + throw new IllegalStateException(e); } }