Skip to content

Commit

Permalink
Moar cleanup. Expose dht in proc domain
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Dec 23, 2023
1 parent 3877f42 commit f7b3614
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,7 @@ private void validate(Digest from, State request) {
}
}

@FunctionalInterface
public interface ViewLifecycleListener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
*/
package com.salesforce.apollo.demesnes;

import com.salesforce.apollo.choam.proto.Foundation;
import com.salesforce.apollo.choam.proto.FoundationSeal;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.proto.Foundation;
import com.salesforce.apollo.choam.proto.FoundationSeal;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.delphinius.Oracle;
Expand Down Expand Up @@ -225,12 +225,6 @@ public void smokin() throws Exception {
domains.forEach(d -> {
var listener = new View.ViewLifecycleListener() {

@Override
public void update(EventCoordinates update) {
// TODO Auto-generated method stub

}

@Override
public void viewChange(Context<Participant> context, Digest viewId, List<EventCoordinates> joins,
List<Digest> leaves) {
Expand Down
9 changes: 9 additions & 0 deletions model/src/main/java/com/salesforce/apollo/model/Domain.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ public String toString() {
return getClass().getSimpleName() + "[" + getIdentifier() + "]";
}

protected Transaction migrations() {
return null;
}

// Provide the list of transactions establishing the unified KERL of the group
private List<Transaction> genesisOf(Map<Member, Join> members) {
log.info("Genesis joins: {} on: {}", members.keySet().stream().map(Member::getId).toList(), params.member());
Expand All @@ -203,6 +207,11 @@ private List<Transaction> genesisOf(Map<Member, Join> members) {
List<Transaction> transactions = new ArrayList<>();
// Schemas
transactions.add(transactionOf(boostrapMigration()));
var migrations = migrations();
if (migrations != null) {
// additional SQL migrations
transactions.add(migrations);
}
sorted.stream()
.map(e -> manifest(members.get(e)))
.filter(Objects::nonNull)
Expand Down
72 changes: 35 additions & 37 deletions model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.salesforce.apollo.model.demesnes.comm.DemesneKERLServer;
import com.salesforce.apollo.model.demesnes.comm.OuterContextServer;
import com.salesforce.apollo.model.demesnes.comm.OuterContextService;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.EventValidation;
import com.salesforce.apollo.stereotomy.event.Seal;
import com.salesforce.apollo.stereotomy.event.proto.AttachmentEvent;
Expand Down Expand Up @@ -137,6 +136,10 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu
this.subDomainSpecification = subDomainSpecification;
}

public KerlDHT getDht() {
return dht;
}

public View getFoundation() {
return foundation;
}
Expand All @@ -161,7 +164,7 @@ public SelfAddressingIdentifier spawn(DemesneParameters.Builder prototype) {
});
if (added.get()) {
var newSpec = subDomainSpecification.clone();
// the receiver is a witness to the sub domain's delegated key
// the receiver is a witness to the subdomain's delegated key
var newWitnesses = new ArrayList<>(subDomainSpecification.getWitnesses());
newWitnesses.add(new BasicIdentifier(witness.getPublic()));
newSpec.setWitnesses(newWitnesses);
Expand Down Expand Up @@ -228,6 +231,23 @@ public void stop() {
}
}

protected void startServices() {
dht.start(params.gossipDuration());
try {
portal.start();
} catch (IOException e) {
throw new IllegalStateException(
"Unable to start portal, local address: " + bridge.path() + " on: " + params.member().getId());
}
try {
outerContextService.start();
} catch (IOException e) {
throw new IllegalStateException(
"Unable to start outer context service, local address: " + outerContextEndpoint.path() + " on: "
+ params.member().getId());
}
}

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.executor(executor)
Expand All @@ -239,27 +259,22 @@ private ManagedChannel handler(DomainSocketAddress address) {
}

private ViewLifecycleListener listener() {
return new ViewLifecycleListener() {

@Override
public void viewChange(Context<Participant> context, Digest id, List<EventCoordinates> join,
List<Digest> leaving) {
for (var d : join) {
if (d.getIdentifier() instanceof SelfAddressingIdentifier sai) {
params.context().activate(context.getMember(sai.getDigest()));
}
}
for (var d : leaving) {
params.context().remove(d);
return (context, id, join, leaving) -> {
for (var d : join) {
if (d.getIdentifier() instanceof SelfAddressingIdentifier sai) {
params.context().activate(context.getMember(sai.getDigest()));
}
}
for (var d : leaving) {
params.context().remove(d);
}

hostedDomains.forEach((viewId, demesne) -> {
demesne.viewChange(viewId, join, leaving);
});
hostedDomains.forEach((viewId, demesne) -> {
demesne.viewChange(viewId, join, leaving);
});

log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(),
join.size(), leaving.size(), params.member().getId());
}
log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(),
join.size(), leaving.size(), params.member().getId());
};
}

Expand All @@ -278,23 +293,6 @@ public void register(SubContext context) {
}, null);
}

private void startServices() {
dht.start(Duration.ofMillis(10)); // TODO parameterize gossip frequency
try {
portal.start();
} catch (IOException e) {
throw new IllegalStateException(
"Unable to start portal, local address: " + bridge.path() + " on: " + params.member().getId());
}
try {
outerContextService.start();
} catch (IOException e) {
throw new IllegalStateException(
"Unable to start outer context service, local address: " + outerContextEndpoint.path() + " on: "
+ params.member().getId());
}
}

private void stopServices() {
portal.close(Duration.ofSeconds(30));
try {
Expand Down
39 changes: 22 additions & 17 deletions model/src/main/java/com/salesforce/apollo/model/SubDomain.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
package com.salesforce.apollo.model;

import com.codahale.metrics.Timer;
import com.salesforce.apollo.demesne.proto.DelegationUpdate;
import com.salesforce.apollo.demesne.proto.SignedDelegate;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.cryptography.proto.Digeste;
import com.salesforce.apollo.archipelago.Enclave.RoutingClientIdentity;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.cryptography.proto.Digeste;
import com.salesforce.apollo.demesne.proto.DelegationUpdate;
import com.salesforce.apollo.demesne.proto.SignedDelegate;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember;
import com.salesforce.apollo.model.comms.Delegation;
import com.salesforce.apollo.model.comms.DelegationServer;
import com.salesforce.apollo.model.comms.DelegationService;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.slf4j.Logger;
Expand All @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -57,6 +58,9 @@ public class SubDomain extends Domain {
private final RingCommunications<Member, Delegation> ring;
private final AtomicBoolean started = new AtomicBoolean();
private final MVStore store;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
Thread.ofVirtual()
.factory());

public SubDomain(ControlledIdentifierMember member, Builder params, Path checkpointBaseDir,
RuntimeParameters.Builder runtime, int maxTransfer, Duration gossipInterval, double fpr) {
Expand Down Expand Up @@ -102,8 +106,7 @@ public void start() {
super.start();
Duration initialDelay = gossipInterval.plusMillis(Entropy.nextBitsStreamLong(gossipInterval.toMillis()));
log.trace("Starting SubDomain[{}:{}]", params.context().getId(), member.getId());
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())
.schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -168,8 +171,7 @@ private void handle(Optional<DelegationUpdate> result,
timer.stop();
}
if (started.get()) {
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())
.schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS);
}
}
}
Expand All @@ -181,13 +183,16 @@ private Biff have() {
}

private void oneRound() {
Timer.Context timer = null;
try {
ring.execute((link, ring) -> gossipRound(link, ring),
(result, destination) -> handle(result, destination, timer));
} catch (Throwable e) {
log.error("Error in delegation gossip in SubDomain[{}:{}]", params.context().getId(), member.getId(), e);
}
Thread.ofVirtual().start(() -> {
Timer.Context timer = null;
try {
ring.execute((link, ring) -> gossipRound(link, ring),
(result, destination) -> handle(result, destination, timer));
} catch (Throwable e) {
log.error("Error in delegation gossip in SubDomain[{}:{}]", params.context().getId(), member.getId(),
e);
}
});
}

private DelegationUpdate.Builder update(DelegationUpdate update, DelegationUpdate.Builder builder) {
Expand Down
6 changes: 4 additions & 2 deletions thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,10 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) {
if (!started.get()) {
return;
}
reconcile.execute((link, ring) -> reconcile(link, ring),
(futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration));
Thread.ofVirtual()
.start(() -> reconcile.execute((link, ring) -> reconcile(link, ring),
(futureSailor, destination) -> reconcile(futureSailor, destination,
scheduler, duration)));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@
* @author hal.hildebrand
*/
public class AbstractDhtTest {
protected static final ProtobufEventFactory factory = new ProtobufEventFactory();
protected static final boolean LARGE_TESTS = Boolean.getBoolean(
"large_tests");
protected static final double PBYZ = 0.25;
protected final Map<SigningMember, KerlDHT> dhts = new TreeMap<>();
protected final Map<SigningMember, Router> routers = new HashMap<>();
protected final AtomicBoolean gate = new AtomicBoolean(
protected static final ProtobufEventFactory factory = new ProtobufEventFactory();
protected static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests");
protected static final double PBYZ = 0.25;

protected final TreeMap<SigningMember, KerlDHT> dhts = new TreeMap<>();
protected final Map<SigningMember, Router> routers = new HashMap<>();
protected final AtomicBoolean gate = new AtomicBoolean(
false);
protected Context<Member> context;
protected Map<SigningMember, ControlledIdentifier<SelfAddressingIdentifier>> identities;
protected MemKERL kerl;
protected String prefix;
protected Stereotomy stereotomy;
protected Context<Member> context;
protected Map<SigningMember, ControlledIdentifier<SelfAddressingIdentifier>> identities;
protected MemKERL kerl;
protected String prefix;
protected Stereotomy stereotomy;

public AbstractDhtTest() {
super();
Expand Down
25 changes: 10 additions & 15 deletions thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,20 @@
*/
package com.salesforce.apollo.thoth;

import static org.junit.jupiter.api.Assertions.assertTrue;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification;
import org.junit.jupiter.api.Test;

import java.security.SecureRandom;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;

import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* @author hal.hildebrand
*
*/
public class AniTest extends AbstractDhtTest {

Expand All @@ -32,17 +29,15 @@ public void smokin() throws Exception {
entropy.setSeed(new byte[] { 7, 7, 7 });

routers.values().forEach(lr -> lr.start());
dhts.values()
.forEach(e -> e.start(
Duration.ofSeconds(1)));
dhts.values().forEach(e -> e.start(Duration.ofSeconds(1)));

var dht = dhts.values().stream().findFirst().get();
var dht = dhts.firstEntry().getValue();

Map<SigningMember, Ani> anis = dhts.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey(),
e -> new Ani(e.getKey().getId(),
dhts.get(e.getKey()).asKERL())));
.collect(Collectors.toMap(e -> e.getKey(), e -> new Ani(e.getKey().getId(),
dhts.get(e.getKey())
.asKERL())));
var ani = anis.values().stream().findFirst().get();

// inception
Expand All @@ -51,7 +46,7 @@ public void smokin() throws Exception {
var nextKeyPair = specification.getSignatureAlgorithm().generateKeyPair(entropy);
var inception = inception(specification, initialKeyPair, factory, nextKeyPair);

dht.append(Collections.singletonList(inception.toKeyEvent_())) ;
dht.append(Collections.singletonList(inception.toKeyEvent_()));
assertTrue(ani.eventValidation(Duration.ofSeconds(10)).validate(inception));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void smokin() throws Exception {
var nextKeyPair = specification.getSignatureAlgorithm().generateKeyPair(entropy);
var inception = inception(specification, initialKeyPair, factory, nextKeyPair);

var dht = dhts.values().stream().findFirst().get();
var dht = dhts.firstEntry().getValue();

dht.append(Collections.singletonList(inception.toKeyEvent_()));
var lookup = dht.getKeyEvent(inception.getCoordinates().toEventCoords());
Expand Down
Loading

0 comments on commit f7b3614

Please sign in to comment.