diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml index fe461b424..8e8a9de6e 100644 --- a/.github/workflows/dependency-review.yml +++ b/.github/workflows/dependency-review.yml @@ -5,7 +5,7 @@ # Source repository: https://github.com/actions/dependency-review-action # Public documentation: https://docs.github.com/en/code-security/supply-chain-security/understanding-your-software-supply-chain/about-dependency-review#dependency-review-enforcement name: 'Dependency Review' -on: [pull_request] +on: [ pull_request ] permissions: contents: read @@ -15,6 +15,6 @@ jobs: runs-on: ubuntu-latest steps: - name: 'Checkout Repository' - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: 'Dependency Review' uses: actions/dependency-review-action@v2 diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index d392f68c5..542009684 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: graalvm/setup-graalvm@v1 with: java-version: '22' diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 242f46e96..d0bf95adf 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -109,9 +109,8 @@ public CHOAM(Parameters params) { rotateViewKeys(); var bContext = new DelegatedContext<>(params.context()); - var adapter = new MessageAdapter(_ -> true, this::signatureHash, - _ -> Collections.emptyList(), - (_, any) -> any, AgedMessageOrBuilder::getContent); + var adapter = new MessageAdapter(_ -> true, this::signatureHash, _ -> Collections.emptyList(), (_, any) -> any, + AgedMessageOrBuilder::getContent); combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(), params.metrics() == null ? null : params.metrics().getCombineMetrics(), @@ -235,6 +234,7 @@ public static Block reconfigure(Digest nextViewId, Map joins, Hash public static Map rosterMap(Context baseContext, Collection members) { return members.stream() .map(baseContext::getMember) + .filter(m -> m != null) .collect(Collectors.toMap(Member::getId, Function.identity())); } @@ -542,11 +542,17 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo } @Override - public void publish(Digest hash, CertifiedBlock cb) { - log.trace("Publishing: {} hash: {} height: {} certifications: {} on: {}", cb.getBlock().getBodyCase(), - hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), cb.getCertificationsCount(), - params.member().getId()); - combine.publish(cb, true); + public void publish(Digest hash, CertifiedBlock cb, boolean beacon) { + if (beacon) { + log.trace("Publishing beacon: {} hash: {} height: {} certifications: {} on: {}", + cb.getBlock().getBodyCase(), hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), + cb.getCertificationsCount(), params.member().getId()); + } else { + log.info("Publishing: {} hash: {} height: {} certifications: {} on: {}", + cb.getBlock().getBodyCase(), hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), + cb.getCertificationsCount(), params.member().getId()); + } + combine.publish(cb, !beacon); } @Override @@ -1022,7 +1028,7 @@ public interface BlockProducer { Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint); - void publish(Digest hash, CertifiedBlock cb); + void publish(Digest hash, CertifiedBlock cb, boolean beacon); Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index c8d4e41a4..cebbb1d38 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -207,7 +207,7 @@ public void publish() { .sorted(Comparator.comparing(e -> e.getKey().getId())) .map(Map.Entry::getValue) .forEach(v -> b.addCertifications(v.getWitness())); - view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build())); + view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()), false); controller.completeIt(); log.info("Genesis block: {} published with {} witnesses for: {} on: {}", reconfiguration.hash, witnesses.size(), view.context().getId(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index c29d20d8c..28519e234 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -57,7 +57,7 @@ public class Producer { private final Semaphore serialize = new Semaphore(1); private final ViewAssembly assembly; private final int maxEpoch; - private volatile int preblocks = 0; + private volatile int emptyPreBlocks = 0; private volatile boolean assembled = false; public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) { @@ -240,11 +240,12 @@ private void processAssemblies(List aggregate) { } private void processPendingValidations(HashedBlock block, PendingBlock p) { - var pending = pendingValidations.remove(block.hash); + var pending = pendingValidations.get(block.hash); if (pending != null) { pending.forEach(v -> validate(v, p, block.hash)); if (p.witnesses.size() >= params().majority()) { publish(p); + pendingValidations.remove(block.hash); } } } @@ -254,12 +255,14 @@ private void processTransactions(boolean last, List aggregate) { final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList(); if (txns.isEmpty()) { - if (preblocks % 5 == 0) { + var empty = emptyPreBlocks + 1; + emptyPreBlocks = empty; + if (empty % 5 == 0) { pending.values() .stream() .filter(pb -> pb.published.get()) .max(Comparator.comparing(pb -> pb.block.height())) - .ifPresent(this::publish); + .ifPresent(pb -> publish(pb, true)); } return; } @@ -306,16 +309,26 @@ private void produceAssemble(ViewAssembly.Vue v) { } private void publish(PendingBlock p) { + this.publish(p, false); + } + + private void publish(PendingBlock p, boolean beacon) { assert p.witnesses.size() >= params().majority() : "Publishing non majority block"; - log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(), - p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); - p.published.set(true); + var publish = p.published.compareAndSet(false, true); + if (!publish && !beacon) { + log.trace("Already published: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(), + p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); + return; + } + log.trace("Publishing {}pending: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "", + p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(), + params().member().getId()); final var cb = CertifiedBlock.newBuilder() .setBlock(p.block.block) .addAllCertifications( p.witnesses.values().stream().map(Validate::getWitness).toList()) .build(); - view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb)); + view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb), beacon); } private void reconfigure() { @@ -352,7 +365,6 @@ private void serial(List preblock, Boolean last) { return; } try { - preblocks++; transitions.create(preblock, last); } catch (Throwable t) { log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index 696933e4e..a9678206d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -158,8 +158,8 @@ public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock c return blockProducer.produce(height, prev, assemble, checkpoint); } - public void publish(HashedCertifiedBlock block) { - blockProducer.publish(block.hash, block.certifiedBlock); + public void publish(HashedCertifiedBlock block, boolean beacon) { + blockProducer.publish(block.hash, block.certifiedBlock, beacon); } public Block reconfigure(Map aggregate, Digest nextViewId, HashedBlock lastBlock, diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 8e8f8ac2d..079dbe402 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -149,7 +149,7 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo } @Override - public void publish(Digest hash, CertifiedBlock cb) { + public void publish(Digest hash, CertifiedBlock cb, boolean beacon) { complete.countDown(); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index e337a5dff..af5f58d1b 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -616,7 +616,7 @@ protected Gossip gossip(Fireflies link, int ring) { .setRing(ring) .setGossip(commonDigests()) .build()); - log.info("gossiping with: {} on: {}", link.getMember().getId(), node.getId()); + log.trace("gossiping with: {} on: {}", link.getMember().getId(), node.getId()); try { return link.gossip(gossip); } catch (Throwable e) { @@ -1930,7 +1930,7 @@ public Gossip rumors(SayWhat request, Digest from) { final var digests = request.getGossip(); if (!successor.equals(node)) { g = redirectTo(member, ring, successor, digests); - log.info("Redirected: {} on: {}", member.getId(), node.getId()); + log.debug("Redirected: {} on: {}", member.getId(), node.getId()); } else { g = Gossip.newBuilder() .setNotes(processNotes(from, BloomFilter.from(digests.getNoteBff()), params.fpr())) diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index 74403835d..1f28baf49 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -79,6 +79,9 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • serverBuilder = InProcessServerBuilder.forName(name) .executor(Executors.newVirtualThreadPerTaskExecutor()) + .scheduledExecutorService( + Executors.newScheduledThreadPool(100, Thread.ofVirtual() + .factory())) .intercept(ConcurrencyLimitServerInterceptor.newBuilder( limitsBuilder.build()) .statusSupplier( @@ -102,6 +105,7 @@ private ManagedChannel connectTo(Member to) { final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId())); final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name) .executor(executor) + .offloadExecutor(executor) .usePlaintext() .intercept(clientInterceptor); disableTrash(builder); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 58db9ec8e..2ee89f441 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -34,6 +34,7 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) .executor(executor) + .offloadExecutor(executor) .withOption(ChannelOption.TCP_NODELAY, true) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java index 1fa04a7e9..338564441 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java @@ -37,6 +37,7 @@ public class RingIterator extends RingCommu private final ScheduledExecutorService scheduler; private volatile boolean majorityFailed = false; private volatile boolean majoritySucceed = false; + private volatile int iteration = 0; public RingIterator(Duration frequency, Context context, SigningMember member, CommonCommunications comm, boolean ignoreSelf, ScheduledExecutorService scheduler) { @@ -88,7 +89,7 @@ public void iterate(Digest digest, BiFunction round, Resul } public int iteration() { - return currentIndex + 1; + return iteration; } @Override @@ -103,7 +104,10 @@ private void internalIterate(Digest digest, Runnable onMajority, BiFunction< Runnable proceed = () -> internalIterate(digest, onMajority, round, failedMajority, handler, onComplete, tally, traversed); - boolean completed = currentIndex == context.getRingCount() - 1; + iteration++; + final var cur = iteration; + var ringCount = context.getRingCount(); + boolean completed = cur > ringCount; Consumer allowed = allow -> proceed(digest, allow, onMajority, failedMajority, tally, completed, onComplete); diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 06ec38cb5..9b915ed9a 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -37,12 +37,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import static com.salesforce.apollo.choam.Session.retryNesting; -import static java.util.concurrent.CompletableFuture.allOf; import static org.junit.jupiter.api.Assertions.*; /** @@ -88,22 +89,47 @@ public static void smoke(Oracle oracle) throws Exception { // Map direct edges. Transitive edges added as a side effect - allOf(retryNesting(() -> oracle.map(helpDeskMembers, adminMembers), 3), - retryNesting(() -> oracle.map(ali, adminMembers), 3), retryNesting(() -> oracle.map(ali, userMembers), 3), - retryNesting(() -> oracle.map(burcu, userMembers), 3), - retryNesting(() -> oracle.map(can, userMembers), 3), - retryNesting(() -> oracle.map(managerMembers, userMembers), 3), - retryNesting(() -> oracle.map(technicianMembers, userMembers), 3), - retryNesting(() -> oracle.map(demet, helpDeskMembers), 3), - retryNesting(() -> oracle.map(egin, helpDeskMembers), 3), - retryNesting(() -> oracle.map(egin, userMembers), 3), - retryNesting(() -> oracle.map(fuat, managerMembers), 3), - retryNesting(() -> oracle.map(gl, managerMembers), 3), - retryNesting(() -> oracle.map(hakan, technicianMembers), 3), - retryNesting(() -> oracle.map(irmak, technicianMembers), 3), - retryNesting(() -> oracle.map(abcTechMembers, technicianMembers), 3), - retryNesting(() -> oracle.map(flaggedTechnicianMembers, technicianMembers), 3), - retryNesting(() -> oracle.map(jale, abcTechMembers), 3)).get(60, TimeUnit.SECONDS); + var countDown = new CountDownLatch(17); + + try (var exec = Executors.newVirtualThreadPerTaskExecutor()) { + + retryNesting(() -> oracle.map(helpDeskMembers, adminMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(ali, adminMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(ali, userMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(burcu, userMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(can, userMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(managerMembers, userMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(technicianMembers, userMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(demet, helpDeskMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(egin, helpDeskMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(egin, userMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(fuat, managerMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(gl, managerMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + retryNesting(() -> oracle.map(hakan, technicianMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(irmak, technicianMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(abcTechMembers, technicianMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(flaggedTechnicianMembers, technicianMembers), 3).whenCompleteAsync( + (_, _) -> countDown.countDown(), exec); + retryNesting(() -> oracle.map(jale, abcTechMembers), 3).whenCompleteAsync((_, _) -> countDown.countDown(), + exec); + + countDown.await(120, TimeUnit.SECONDS); + } // Protected resource namespace var docNs = Oracle.namespace("Document"); @@ -114,7 +140,7 @@ public static void smoke(Oracle oracle) throws Exception { // Users can View Document 123 Assertion tuple = userMembers.assertion(object123View); - retryNesting(() -> oracle.add(tuple), 3).get(); + retryNesting(() -> oracle.add(tuple), 3).get(120, TimeUnit.SECONDS); // Direct subjects that can View the document var viewers = oracle.read(object123View); @@ -128,7 +154,7 @@ public static void smoke(Oracle oracle) throws Exception { // Assert flagged technicians can directly view the document Assertion grantTechs = flaggedTechnicianMembers.assertion(object123View); - retryNesting(() -> oracle.add(grantTechs), 3).get(); + retryNesting(() -> oracle.add(grantTechs), 3).get(120, TimeUnit.SECONDS); // Now have 2 direct subjects that can view the doc viewers = oracle.read(object123View); @@ -167,22 +193,22 @@ public static void smoke(Oracle oracle) throws Exception { assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Remove them - retryNesting(() -> oracle.remove(abcTechMembers, technicianMembers), 3).get(); + retryNesting(() -> oracle.remove(abcTechMembers, technicianMembers), 3).get(120, TimeUnit.SECONDS); assertFalse(oracle.check(object123View.assertion(jale))); assertTrue(oracle.check(object123View.assertion(egin))); assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Remove our assertion - retryNesting(() -> oracle.delete(tuple), 3).get(); + retryNesting(() -> oracle.delete(tuple), 3).get(120, TimeUnit.SECONDS); assertFalse(oracle.check(object123View.assertion(jale))); assertFalse(oracle.check(object123View.assertion(egin))); assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Some deletes - retryNesting(() -> oracle.delete(abcTechMembers), 3).get(); - retryNesting(() -> oracle.delete(flaggedTechnicianMembers), 3).get(); + retryNesting(() -> oracle.delete(abcTechMembers), 3).get(120, TimeUnit.SECONDS); + retryNesting(() -> oracle.delete(flaggedTechnicianMembers), 3).get(120, TimeUnit.SECONDS); } @AfterEach diff --git a/pom.xml b/pom.xml index a79c47fc4..4aadc61cb 100644 --- a/pom.xml +++ b/pom.xml @@ -41,8 +41,8 @@ 3.18.15 1.78 1.4.12 - 1.62.2 - 3.25.3 + 1.64.0 + 4.27.0 4.8.0 4.1.100.Final 0.9.27 @@ -608,29 +608,33 @@ - Maven Central - https://mvnrepository.com/artifact/ + central + https://repo.maven.org/maven2/ + + false + + + + central-1 + https://repo1.maven.org/maven2/ false - daily - - true - daily - - Maven Central - https://mvnrepository.com/artifact/ - - true - daily - + central + https://repo.maven.org/maven2/ + + false + + + + central-1 + https://repo1.maven.org/maven2/ false - daily @@ -780,6 +784,7 @@ ${forks} true + -Djdk.tracePinnedThreads=full diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 45343e6de..3c8802818 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -338,7 +338,9 @@ private CHOAM createChoam(Random entropy, Builder params, SigningMember m, boole SqlStateMachine up = new SqlStateMachine(m.getId(), url, new Properties(), new File(checkpointDirBase, m.getId().toString())); updaters.put(m, up); - + if (testSubject) { + params.setGenerateGenesis(false); + } params.getProducer().ethereal().setSigner(m); return new CHOAM(params.setSynchronizationCycles(testSubject ? 100 : 10) .build(RuntimeParameters.newBuilder()