diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index fae441228..196c1fa46 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -1400,6 +1400,7 @@ private void validate(Digest from, State request) { } } + @FunctionalInterface public interface ViewLifecycleListener { /** diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java index d435c7878..16aa545b8 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java @@ -6,8 +6,6 @@ */ package com.salesforce.apollo.demesnes; -import com.salesforce.apollo.choam.proto.Foundation; -import com.salesforce.apollo.choam.proto.FoundationSeal; import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; @@ -15,6 +13,8 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; +import com.salesforce.apollo.choam.proto.Foundation; +import com.salesforce.apollo.choam.proto.FoundationSeal; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; @@ -225,12 +225,6 @@ public void smokin() throws Exception { domains.forEach(d -> { var listener = new View.ViewLifecycleListener() { - @Override - public void update(EventCoordinates update) { - // TODO Auto-generated method stub - - } - @Override public void viewChange(Context context, Digest viewId, List joins, List leaves) { diff --git a/model/src/main/java/com/salesforce/apollo/model/Domain.java b/model/src/main/java/com/salesforce/apollo/model/Domain.java index cb3e0c780..3052b5073 100644 --- a/model/src/main/java/com/salesforce/apollo/model/Domain.java +++ b/model/src/main/java/com/salesforce/apollo/model/Domain.java @@ -195,6 +195,10 @@ public String toString() { return getClass().getSimpleName() + "[" + getIdentifier() + "]"; } + protected Transaction migrations() { + return null; + } + // Provide the list of transactions establishing the unified KERL of the group private List genesisOf(Map members) { log.info("Genesis joins: {} on: {}", members.keySet().stream().map(Member::getId).toList(), params.member()); @@ -203,6 +207,11 @@ private List genesisOf(Map members) { List transactions = new ArrayList<>(); // Schemas transactions.add(transactionOf(boostrapMigration())); + var migrations = migrations(); + if (migrations != null) { + // additional SQL migrations + transactions.add(migrations); + } sorted.stream() .map(e -> manifest(members.get(e))) .filter(Objects::nonNull) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 063e24108..887cd1c16 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -26,7 +26,6 @@ import com.salesforce.apollo.model.demesnes.comm.DemesneKERLServer; import com.salesforce.apollo.model.demesnes.comm.OuterContextServer; import com.salesforce.apollo.model.demesnes.comm.OuterContextService; -import com.salesforce.apollo.stereotomy.EventCoordinates; import com.salesforce.apollo.stereotomy.EventValidation; import com.salesforce.apollo.stereotomy.event.Seal; import com.salesforce.apollo.stereotomy.event.proto.AttachmentEvent; @@ -137,6 +136,10 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu this.subDomainSpecification = subDomainSpecification; } + public KerlDHT getDht() { + return dht; + } + public View getFoundation() { return foundation; } @@ -161,7 +164,7 @@ public SelfAddressingIdentifier spawn(DemesneParameters.Builder prototype) { }); if (added.get()) { var newSpec = subDomainSpecification.clone(); - // the receiver is a witness to the sub domain's delegated key + // the receiver is a witness to the subdomain's delegated key var newWitnesses = new ArrayList<>(subDomainSpecification.getWitnesses()); newWitnesses.add(new BasicIdentifier(witness.getPublic())); newSpec.setWitnesses(newWitnesses); @@ -228,6 +231,23 @@ public void stop() { } } + protected void startServices() { + dht.start(params.gossipDuration()); + try { + portal.start(); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to start portal, local address: " + bridge.path() + " on: " + params.member().getId()); + } + try { + outerContextService.start(); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to start outer context service, local address: " + outerContextEndpoint.path() + " on: " + + params.member().getId()); + } + } + private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) .executor(executor) @@ -239,27 +259,22 @@ private ManagedChannel handler(DomainSocketAddress address) { } private ViewLifecycleListener listener() { - return new ViewLifecycleListener() { - - @Override - public void viewChange(Context context, Digest id, List join, - List leaving) { - for (var d : join) { - if (d.getIdentifier() instanceof SelfAddressingIdentifier sai) { - params.context().activate(context.getMember(sai.getDigest())); - } - } - for (var d : leaving) { - params.context().remove(d); + return (context, id, join, leaving) -> { + for (var d : join) { + if (d.getIdentifier() instanceof SelfAddressingIdentifier sai) { + params.context().activate(context.getMember(sai.getDigest())); } + } + for (var d : leaving) { + params.context().remove(d); + } - hostedDomains.forEach((viewId, demesne) -> { - demesne.viewChange(viewId, join, leaving); - }); + hostedDomains.forEach((viewId, demesne) -> { + demesne.viewChange(viewId, join, leaving); + }); - log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(), - join.size(), leaving.size(), params.member().getId()); - } + log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(), + join.size(), leaving.size(), params.member().getId()); }; } @@ -278,23 +293,6 @@ public void register(SubContext context) { }, null); } - private void startServices() { - dht.start(Duration.ofMillis(10)); // TODO parameterize gossip frequency - try { - portal.start(); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to start portal, local address: " + bridge.path() + " on: " + params.member().getId()); - } - try { - outerContextService.start(); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to start outer context service, local address: " + outerContextEndpoint.path() + " on: " - + params.member().getId()); - } - } - private void stopServices() { portal.close(Duration.ofSeconds(30)); try { diff --git a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java index 7086e6f1e..deee6f1f3 100644 --- a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java @@ -7,15 +7,17 @@ package com.salesforce.apollo.model; import com.codahale.metrics.Timer; -import com.salesforce.apollo.demesne.proto.DelegationUpdate; -import com.salesforce.apollo.demesne.proto.SignedDelegate; -import com.salesforce.apollo.cryptography.proto.Biff; -import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.archipelago.Enclave.RoutingClientIdentity; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.proto.Biff; +import com.salesforce.apollo.cryptography.proto.Digeste; +import com.salesforce.apollo.demesne.proto.DelegationUpdate; +import com.salesforce.apollo.demesne.proto.SignedDelegate; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.model.comms.Delegation; @@ -23,8 +25,6 @@ import com.salesforce.apollo.model.comms.DelegationService; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; -import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; import org.slf4j.Logger; @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +58,9 @@ public class SubDomain extends Domain { private final RingCommunications ring; private final AtomicBoolean started = new AtomicBoolean(); private final MVStore store; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, + Thread.ofVirtual() + .factory()); public SubDomain(ControlledIdentifierMember member, Builder params, Path checkpointBaseDir, RuntimeParameters.Builder runtime, int maxTransfer, Duration gossipInterval, double fpr) { @@ -102,8 +106,7 @@ public void start() { super.start(); Duration initialDelay = gossipInterval.plusMillis(Entropy.nextBitsStreamLong(gossipInterval.toMillis())); log.trace("Starting SubDomain[{}:{}]", params.context().getId(), member.getId()); - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) - .schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -168,8 +171,7 @@ private void handle(Optional result, timer.stop(); } if (started.get()) { - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) - .schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS); } } } @@ -181,13 +183,16 @@ private Biff have() { } private void oneRound() { - Timer.Context timer = null; - try { - ring.execute((link, ring) -> gossipRound(link, ring), - (result, destination) -> handle(result, destination, timer)); - } catch (Throwable e) { - log.error("Error in delegation gossip in SubDomain[{}:{}]", params.context().getId(), member.getId(), e); - } + Thread.ofVirtual().start(() -> { + Timer.Context timer = null; + try { + ring.execute((link, ring) -> gossipRound(link, ring), + (result, destination) -> handle(result, destination, timer)); + } catch (Throwable e) { + log.error("Error in delegation gossip in SubDomain[{}:{}]", params.context().getId(), member.getId(), + e); + } + }); } private DelegationUpdate.Builder update(DelegationUpdate update, DelegationUpdate.Builder builder) { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 209e765e2..48a0eca67 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -953,8 +953,10 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) { if (!started.get()) { return; } - reconcile.execute((link, ring) -> reconcile(link, ring), - (futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration)); + Thread.ofVirtual() + .start(() -> reconcile.execute((link, ring) -> reconcile(link, ring), + (futureSailor, destination) -> reconcile(futureSailor, destination, + scheduler, duration))); } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index 618099da6..1b221b98f 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -53,19 +53,19 @@ * @author hal.hildebrand */ public class AbstractDhtTest { - protected static final ProtobufEventFactory factory = new ProtobufEventFactory(); - protected static final boolean LARGE_TESTS = Boolean.getBoolean( - "large_tests"); - protected static final double PBYZ = 0.25; - protected final Map dhts = new TreeMap<>(); - protected final Map routers = new HashMap<>(); - protected final AtomicBoolean gate = new AtomicBoolean( + protected static final ProtobufEventFactory factory = new ProtobufEventFactory(); + protected static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests"); + protected static final double PBYZ = 0.25; + + protected final TreeMap dhts = new TreeMap<>(); + protected final Map routers = new HashMap<>(); + protected final AtomicBoolean gate = new AtomicBoolean( false); - protected Context context; - protected Map> identities; - protected MemKERL kerl; - protected String prefix; - protected Stereotomy stereotomy; + protected Context context; + protected Map> identities; + protected MemKERL kerl; + protected String prefix; + protected Stereotomy stereotomy; public AbstractDhtTest() { super(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java index 2834e31b6..363470958 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java @@ -6,23 +6,20 @@ */ package com.salesforce.apollo.thoth; -import static org.junit.jupiter.api.Assertions.assertTrue; +import com.salesforce.apollo.membership.SigningMember; +import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; +import org.junit.jupiter.api.Test; import java.security.SecureRandom; import java.time.Duration; import java.util.Collections; import java.util.Map; -import java.util.concurrent.Executors; import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; - -import com.salesforce.apollo.membership.SigningMember; -import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author hal.hildebrand - * */ public class AniTest extends AbstractDhtTest { @@ -32,17 +29,15 @@ public void smokin() throws Exception { entropy.setSeed(new byte[] { 7, 7, 7 }); routers.values().forEach(lr -> lr.start()); - dhts.values() - .forEach(e -> e.start( - Duration.ofSeconds(1))); + dhts.values().forEach(e -> e.start(Duration.ofSeconds(1))); - var dht = dhts.values().stream().findFirst().get(); + var dht = dhts.firstEntry().getValue(); Map anis = dhts.entrySet() .stream() - .collect(Collectors.toMap(e -> e.getKey(), - e -> new Ani(e.getKey().getId(), - dhts.get(e.getKey()).asKERL()))); + .collect(Collectors.toMap(e -> e.getKey(), e -> new Ani(e.getKey().getId(), + dhts.get(e.getKey()) + .asKERL()))); var ani = anis.values().stream().findFirst().get(); // inception @@ -51,7 +46,7 @@ public void smokin() throws Exception { var nextKeyPair = specification.getSignatureAlgorithm().generateKeyPair(entropy); var inception = inception(specification, initialKeyPair, factory, nextKeyPair); - dht.append(Collections.singletonList(inception.toKeyEvent_())) ; + dht.append(Collections.singletonList(inception.toKeyEvent_())); assertTrue(ani.eventValidation(Duration.ofSeconds(10)).validate(inception)); } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java index 3cedc94f8..5c0e137ab 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java @@ -35,7 +35,7 @@ public void smokin() throws Exception { var nextKeyPair = specification.getSignatureAlgorithm().generateKeyPair(entropy); var inception = inception(specification, initialKeyPair, factory, nextKeyPair); - var dht = dhts.values().stream().findFirst().get(); + var dht = dhts.firstEntry().getValue(); dht.append(Collections.singletonList(inception.toKeyEvent_())); var lookup = dht.getKeyEvent(inception.getCoordinates().toEventCoords()); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java index b308d92da..78cac2af6 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java @@ -49,7 +49,7 @@ public void delegated() throws Exception { routers.values().forEach(r -> r.start()); dhts.values().forEach(dht -> dht.start(Duration.ofSeconds(1))); - KERL kerl = dhts.values().stream().findFirst().get().asKERL(); + KERL kerl = dhts.firstEntry().getValue().asKERL(); var ks = new MemKeyStore(); Stereotomy controller = new StereotomyImpl(ks, kerl, secureRandom); @@ -127,7 +127,7 @@ public void direct() throws Exception { routers.values().forEach(r -> r.start()); dhts.values().forEach(dht -> dht.start(Duration.ofSeconds(1))); - KERL kerl = dhts.values().stream().findFirst().get().asKERL(); + KERL kerl = dhts.firstEntry().getValue().asKERL(); Stereotomy controller = new StereotomyImpl(new MemKeyStore(), kerl, secureRandom);