From d0670d97f5c31d66f5237e9c9140de6582991d59 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sun, 3 Dec 2023 17:24:35 -0800 Subject: [PATCH] thoth grpc service. an unfortunate reformatting event, also, too --- .../com/salesforce/apollo/fireflies/View.java | 24 +- grpc/src/main/proto/thoth.proto | 7 + .../apollo/thoth/CombinedIntervals.java | 17 +- .../com/salesforce/apollo/thoth/KerlDHT.java | 31 +- .../salesforce/apollo/thoth/KerlSpace.java | 319 +++++++++--------- .../salesforce/apollo/thoth/KeyInterval.java | 15 +- .../apollo/thoth/LoggingOutputStream.java | 16 +- .../com/salesforce/apollo/thoth/Maat.java | 2 +- .../salesforce/apollo/thoth/Publisher.java | 14 +- .../com/salesforce/apollo/thoth/Thoth.java | 28 +- .../apollo/thoth/grpc/ThothServer.java | 102 ++++++ .../thoth/grpc/delegation/Delegation.java | 5 +- .../grpc/delegation/DelegationClient.java | 1 - .../grpc/delegation/DelegationServer.java | 1 - .../grpc/delegation/DelegationService.java | 1 - .../apollo/thoth/grpc/dht/DhtClient.java | 52 +-- .../apollo/thoth/grpc/dht/DhtServer.java | 2 +- .../apollo/thoth/grpc/dht/DhtService.java | 4 +- .../grpc/reconciliation/Reconciliation.java | 1 - .../reconciliation/ReconciliationClient.java | 43 ++- .../reconciliation/ReconciliationServer.java | 2 - .../reconciliation/ReconciliationService.java | 4 +- .../thoth/metrics/GorgoneionMetrics.java | 1 - .../apollo/thoth/metrics/KerlDhtMetrics.java | 1 - .../apollo/thoth/ThothServerTest.java | 127 +++++++ .../salesforce/apollo/thoth/ThothTest.java | 8 +- 26 files changed, 520 insertions(+), 308 deletions(-) create mode 100644 thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java create mode 100644 thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 388c939046..fcdbd5af5a 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -12,13 +12,14 @@ import com.google.common.collect.Multiset.Entry; import com.google.common.collect.Ordering; import com.google.protobuf.ByteString; +import com.salesfoce.apollo.cryptography.proto.Biff; import com.salesfoce.apollo.fireflies.proto.*; import com.salesfoce.apollo.stereotomy.event.proto.KERL_; import com.salesfoce.apollo.stereotomy.event.proto.KeyState_; -import com.salesfoce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.Router.ServiceRouting; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; +import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.fireflies.Binding.Bound; import com.salesforce.apollo.fireflies.ViewManagement.Ballot; @@ -37,9 +38,7 @@ import com.salesforce.apollo.stereotomy.EventValidation; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; import com.salesforce.apollo.utils.Entropy; -import com.salesforce.apollo.membership.RoundScheduler; import com.salesforce.apollo.utils.Utils; -import com.salesforce.apollo.bloomFilters.BloomFilter; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -86,15 +85,15 @@ public class View { private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; private final CommonCommunications approaches; private final CommonCommunications comm; - private final Context context; - private final DigestAlgorithm digestAlgo; - private final RingCommunications gossiper; + private final Context context; + private final DigestAlgorithm digestAlgo; + private final RingCommunications gossiper; private final AtomicBoolean introduced = new AtomicBoolean(); private final Map lifecycleListeners = new HashMap<>(); private final FireflyMetrics metrics; - private final Node node; - private final Map observations = new ConcurrentSkipListMap<>(); - private final Parameters params; + private final Node node; + private final Map observations = new ConcurrentSkipListMap<>(); + private final Parameters params; private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); private final RoundScheduler roundTimers; private final Set shunned = new ConcurrentSkipListSet<>(); @@ -174,6 +173,13 @@ public Context getContext() { return context; } + /** + * @return the Digest ID of the Node of this View + */ + public Digest getNodeId() { + return node.getId(); + } + /** * Register a listener to receive view change events * diff --git a/grpc/src/main/proto/thoth.proto b/grpc/src/main/proto/thoth.proto index 98df516df3..219ac0a14e 100644 --- a/grpc/src/main/proto/thoth.proto +++ b/grpc/src/main/proto/thoth.proto @@ -41,6 +41,13 @@ service Reconciliation { rpc update (Updating) returns (google.protobuf.Empty) {} } +service Thoth_ { + rpc commit (stereotomy.EventCoords) returns (google.protobuf.Empty) {} + rpc identifier(google.protobuf.Empty) returns (stereotomy.Ident) {} + rpc inception(stereotomy.Ident) returns (stereotomy.InceptionEvent) {} + rpc rotate(google.protobuf.Empty) returns (stereotomy.RotationEvent) {} +} + message Update { repeated stereotomy.KeyEventWithAttachmentAndValidations_ events = 1; repeated Interval intervals = 2; diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/CombinedIntervals.java b/thoth/src/main/java/com/salesforce/apollo/thoth/CombinedIntervals.java index a0dfb448dc..da689c3ffe 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/CombinedIntervals.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/CombinedIntervals.java @@ -7,21 +7,16 @@ package com.salesforce.apollo.thoth; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; +import com.salesfoce.apollo.thoth.proto.Interval; +import com.salesforce.apollo.cryptography.Digest; + +import java.util.*; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.salesfoce.apollo.thoth.proto.Interval; -import com.salesforce.apollo.cryptography.Digest; - /** * @author hal.hildebrand - * */ public class CombinedIntervals implements Predicate { private final List intervals = new ArrayList<>(); @@ -40,8 +35,8 @@ public int compare(KeyInterval o1, KeyInterval o2) { int comparison = o1.getBegin().compareTo(o2.getBegin()); return comparison == 0 // if both intervals begin the same - ? o1.getEnd().compareTo(o2.getEnd()) // compare their ends - : comparison; + ? o1.getEnd().compareTo(o2.getEnd()) // compare their ends + : comparison; } }); KeyInterval current = allIntervals.get(0); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 6091ff44f7..dea7518aba 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -186,13 +186,6 @@ static T completeIt(T result) { return result; } - /** - * Clear the caches of the receiver - */ - public void clearCache() { - cache.clear(); - } - public KeyState_ append(AttachmentEvent event) { if (event == null) { return null; @@ -379,6 +372,13 @@ public KERL asKERL() { return cache; } + /** + * Clear the caches of the receiver + */ + public void clearCache() { + cache.clear(); + } + public DigestAlgorithm digestAlgorithm() { return kerlPool.getDigestAlgorithm(); } @@ -964,10 +964,7 @@ private boolean valid(Digest from, int ring) { if (successor == null) { return false; } - if (!successor.equals(member)) { - return false; - } - return true; + return successor.equals(member); } private DelegatedKERL wrap(ClosableKERL k) { @@ -1043,12 +1040,6 @@ public void update(Updating update, Digest from) { private class Service implements ProtoKERLService { - @Override - public Validations getValidations(EventCoords coordinates) { - log.trace("get validations for coordinates on: {}", member.getId()); - return complete(k -> k.getValidations(coordinates)); - } - @Override public List append(KERL_ kerl_) { log.info("appending kerl on: {}", member.getId()); @@ -1142,5 +1133,11 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal .build(); }); } + + @Override + public Validations getValidations(EventCoords coordinates) { + log.trace("get validations for coordinates on: {}", member.getId()); + return complete(k -> k.getValidations(coordinates)); + } } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java index 0065c7ddc5..376fb9cef4 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java @@ -8,11 +8,13 @@ package com.salesforce.apollo.thoth; import com.google.protobuf.InvalidProtocolBufferException; +import com.salesfoce.apollo.cryptography.proto.Biff; +import com.salesfoce.apollo.cryptography.proto.Digeste; import com.salesfoce.apollo.stereotomy.event.proto.*; import com.salesfoce.apollo.thoth.proto.Intervals; import com.salesfoce.apollo.thoth.proto.Update; -import com.salesfoce.apollo.cryptography.proto.Biff; -import com.salesfoce.apollo.cryptography.proto.Digeste; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.JohnHancock; @@ -23,8 +25,6 @@ import com.salesforce.apollo.stereotomy.event.protobuf.AttachmentEventImpl; import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory; import com.salesforce.apollo.stereotomy.identifier.Identifier; -import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import org.h2.jdbcx.JdbcConnectionPool; import org.jooq.DSLContext; import org.jooq.Record1; @@ -56,8 +56,8 @@ * @author hal.hildebrand */ public class KerlSpace { - private static final Logger log = LoggerFactory.getLogger(KerlSpace.class); - private final JdbcConnectionPool connectionPool; + private static final Logger log = LoggerFactory.getLogger(KerlSpace.class); + private final JdbcConnectionPool connectionPool; public KerlSpace(JdbcConnectionPool connectionPool) { this.connectionPool = connectionPool; @@ -75,10 +75,10 @@ public static void upsert(DSLContext dsl, EventCoords coordinates, Attachment at id = dsl.insertInto(PENDING_COORDINATES) .set(PENDING_COORDINATES.DIGEST, coordinates.getDigest().toByteArray()) .set(PENDING_COORDINATES.IDENTIFIER, - dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) + dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) .set(PENDING_COORDINATES.ILK, coordinates.getIlk()) .set(PENDING_COORDINATES.SEQUENCE_NUMBER, - ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger()) + ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger()) .returningResult(PENDING_COORDINATES.ID) .fetchOne(); } catch (DataAccessException e) { @@ -89,8 +89,8 @@ public static void upsert(DSLContext dsl, EventCoords coordinates, Attachment at .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toByteArray())) .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toByteArray())) - .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq(ULong.valueOf(coordinates.getSequenceNumber()) - .toBigInteger())) + .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq( + ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger())) .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOne(); } @@ -106,46 +106,45 @@ public static void upsert(DSLContext context, KeyEvent event, DigestAlgorithm di final var identBytes = event.getIdentifier().toIdent().toByteArray(); context.mergeInto(IDENTIFIER) - .using(context.selectOne()) - .on(IDENTIFIER.PREFIX.eq(identBytes)) - .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) - .values(identBytes) - .execute(); + .using(context.selectOne()) + .on(IDENTIFIER.PREFIX.eq(identBytes)) + .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) + .values(identBytes) + .execute(); long id; try { id = context.insertInto(PENDING_COORDINATES) - .set(PENDING_COORDINATES.DIGEST, prevCoords.getDigest().toDigeste().toByteArray()) - .set(PENDING_COORDINATES.IDENTIFIER, - context.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) - .set(PENDING_COORDINATES.ILK, event.getIlk()) - .set(PENDING_COORDINATES.SEQUENCE_NUMBER, event.getSequenceNumber().toBigInteger()) - .returningResult(PENDING_COORDINATES.ID) - .fetchOne() - .value1(); + .set(PENDING_COORDINATES.DIGEST, prevCoords.getDigest().toDigeste().toByteArray()) + .set(PENDING_COORDINATES.IDENTIFIER, + context.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) + .set(PENDING_COORDINATES.ILK, event.getIlk()) + .set(PENDING_COORDINATES.SEQUENCE_NUMBER, event.getSequenceNumber().toBigInteger()) + .returningResult(PENDING_COORDINATES.ID) + .fetchOne() + .value1(); } catch (DataAccessException e) { // Already exists var coordinates = event.getCoordinates(); id = context.select(PENDING_COORDINATES.ID) - .from(PENDING_COORDINATES) - .join(IDENTIFIER) - .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) - .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) - .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) - .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) - .fetchOne() - .value1(); + .from(PENDING_COORDINATES) + .join(IDENTIFIER) + .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toIdent().toByteArray())) + .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) + .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toDigeste().toByteArray())) + .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq(coordinates.getSequenceNumber().toBigInteger())) + .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) + .fetchOne() + .value1(); } final var digest = event.hash(digestAlgorithm); try { context.insertInto(PENDING_EVENT) - .set(PENDING_EVENT.COORDINATES, id) - .set(PENDING_EVENT.DIGEST, digest.toDigeste().toByteArray()) - .set(PENDING_EVENT.EVENT, event.getBytes()) - .execute(); + .set(PENDING_EVENT.COORDINATES, id) + .set(PENDING_EVENT.DIGEST, digest.toDigeste().toByteArray()) + .set(PENDING_EVENT.EVENT, event.getBytes()) + .execute(); } catch (DataAccessException e) { - return; } } @@ -158,11 +157,11 @@ public static void upsert(DSLContext dsl, Validations validations) { try { dsl.mergeInto(IDENTIFIER) - .using(dsl.selectOne()) - .on(IDENTIFIER.PREFIX.eq(identBytes)) - .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) - .values(identBytes) - .execute(); + .using(dsl.selectOne()) + .on(IDENTIFIER.PREFIX.eq(identBytes)) + .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) + .values(identBytes) + .execute(); } catch (DataAccessException e) { log.trace("Duplicate inserting identifier: {}", logIdentifier); } @@ -172,10 +171,10 @@ public static void upsert(DSLContext dsl, Validations validations) { id = dsl.insertInto(PENDING_COORDINATES) .set(PENDING_COORDINATES.DIGEST, coordinates.getDigest().toByteArray()) .set(PENDING_COORDINATES.IDENTIFIER, - dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) + dsl.select(IDENTIFIER.ID).from(IDENTIFIER).where(IDENTIFIER.PREFIX.eq(identBytes))) .set(PENDING_COORDINATES.ILK, coordinates.getIlk()) .set(PENDING_COORDINATES.SEQUENCE_NUMBER, - ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger()) + ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger()) .returningResult(PENDING_COORDINATES.ID) .fetchOne(); log.trace("Id: {} for: {}", id, logCoords); @@ -188,8 +187,8 @@ public static void upsert(DSLContext dsl, Validations validations) { .on(IDENTIFIER.PREFIX.eq(coordinates.getIdentifier().toByteArray())) .where(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) .and(PENDING_COORDINATES.DIGEST.eq(coordinates.getDigest().toByteArray())) - .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq(ULong.valueOf(coordinates.getSequenceNumber()) - .toBigInteger())) + .and(PENDING_COORDINATES.SEQUENCE_NUMBER.eq( + ULong.valueOf(coordinates.getSequenceNumber()).toBigInteger())) .and(PENDING_COORDINATES.ILK.eq(coordinates.getIlk())) .fetchOne(); } @@ -204,15 +203,12 @@ public static void upsert(DSLContext dsl, Validations validations) { } /** - * Answer the bloom filter encoding the key events contained within the combined - * intervals + * Answer the bloom filter encoding the key events contained within the combined intervals * * @param seed - the seed for the bloom filter's hash generator - * @param intervals - the combined intervals containing the identifier location - * hashes. + * @param intervals - the combined intervals containing the identifier location hashes. * @param fpr - the false positive rate for the bloom filter - * @return the bloom filter of Digests bounded by the identifier location hash - * intervals + * @return the bloom filter of Digests bounded by the identifier location hash intervals */ public Biff populate(long seed, CombinedIntervals intervals, double fpr) { DigestBloomFilter bff = new DigestBloomFilter(seed, cardinality(), fpr); @@ -230,11 +226,10 @@ public Biff populate(long seed, CombinedIntervals intervals, double fpr) { /** * Reconcile the intervals for our partner * - * @param intervals - the relevant intervals of identifiers and the event - * digests of these identifiers the partner already have + * @param intervals - the relevant intervals of identifiers and the event digests of these identifiers the partner + * already have * @param kerl - * @return the Update.Builder of missing key events, based on the supplied - * intervals + * @return the Update.Builder of missing key events, based on the supplied intervals */ public Update.Builder reconcile(Intervals intervals, DigestKERL kerl) { var biff = BloomFilter.from(intervals.getHave()); @@ -242,19 +237,19 @@ public Update.Builder reconcile(Intervals intervals, DigestKERL kerl) { try (var connection = connectionPool.getConnection()) { var dsl = DSL.using(connection); intervals.getIntervalsList() - .stream() - .map(i -> new KeyInterval(i)) - .flatMap(i -> eventDigestsIn(i, dsl)) - .filter(d -> !biff.contains(d)) - .map(d -> event(d, dsl, kerl)) - .filter(ke -> ke != null) - .forEach(ke -> { - update.addEvents(ke); - }); + .stream() + .map(i -> new KeyInterval(i)) + .flatMap(i -> eventDigestsIn(i, dsl)) + .filter(d -> !biff.contains(d)) + .map(d -> event(d, dsl, kerl)) + .filter(ke -> ke != null) + .forEach(ke -> { + update.addEvents(ke); + }); } catch (SQLException e) { log.error("Unable to provide estimated cardinality, cannot acquire JDBC connection", e); throw new IllegalStateException("Unable to provide estimated cardinality, cannot acquire JDBC connection", - e); + e); } return update; } @@ -302,58 +297,61 @@ private int cardinality() { } catch (SQLException e) { log.error("Unable to provide estimated cardinality, cannot acquire JDBC connection", e); throw new IllegalStateException("Unable to provide estimated cardinality, cannot acquire JDBC connection", - e); + e); } } private void commitPending(DSLContext context, KERL kerl) { context.select(PENDING_COORDINATES.ID, PENDING_EVENT.EVENT, PENDING_COORDINATES.ILK) - .from(PENDING_EVENT) - .join(PENDING_COORDINATES) - .on(PENDING_COORDINATES.ID.eq(PENDING_EVENT.COORDINATES)) - .join(EVENT) - .on(EVENT.DIGEST.eq(PENDING_COORDINATES.DIGEST)) - .orderBy(PENDING_COORDINATES.SEQUENCE_NUMBER) - .fetchStream() - .forEach(r -> { - KeyEvent event = ProtobufEventFactory.toKeyEvent(r.value2(), r.value3()); - EventCoordinates coordinates = event.getCoordinates(); - if (coordinates != null) { - context.select(PENDING_ATTACHMENT.ATTACHMENT) - .from(PENDING_ATTACHMENT) - .where(PENDING_ATTACHMENT.COORDINATES.eq(r.value1())) - .stream() - .forEach(bytes -> { - try { - Attachment attach = Attachment.parseFrom(bytes.value1()); - kerl.append(Collections.singletonList(new AttachmentEventImpl(AttachmentEvent.newBuilder() - .setCoordinates(coordinates.toEventCoords()) - .setAttachment(attach) - .build()))); - } catch (InvalidProtocolBufferException e) { - log.error("Cannot deserialize attachment", e); - } - }); - context.select(PENDING_VALIDATIONS.VALIDATIONS) - .from(PENDING_VALIDATIONS) - .where(PENDING_VALIDATIONS.COORDINATES.eq(r.value1())) - .stream() - .forEach(bytes -> { - try { - Validations attach = Validations.parseFrom(bytes.value1()); - kerl.appendValidations(coordinates, - attach.getValidationsList() - .stream() - .collect(Collectors.toMap(v -> EventCoordinates.from(v.getValidator()), - v -> JohnHancock.from(v.getSignature())))); - } catch (InvalidProtocolBufferException e) { - log.error("Cannot deserialize validation", e); - } - }); - kerl.append(event); - } - context.deleteFrom(PENDING_COORDINATES).where(PENDING_COORDINATES.ID.eq(r.value1())).execute(); - }); + .from(PENDING_EVENT) + .join(PENDING_COORDINATES) + .on(PENDING_COORDINATES.ID.eq(PENDING_EVENT.COORDINATES)) + .join(EVENT) + .on(EVENT.DIGEST.eq(PENDING_COORDINATES.DIGEST)) + .orderBy(PENDING_COORDINATES.SEQUENCE_NUMBER) + .fetchStream() + .forEach(r -> { + KeyEvent event = ProtobufEventFactory.toKeyEvent(r.value2(), r.value3()); + EventCoordinates coordinates = event.getCoordinates(); + if (coordinates != null) { + context.select(PENDING_ATTACHMENT.ATTACHMENT) + .from(PENDING_ATTACHMENT) + .where(PENDING_ATTACHMENT.COORDINATES.eq(r.value1())) + .stream() + .forEach(bytes -> { + try { + Attachment attach = Attachment.parseFrom(bytes.value1()); + kerl.append(Collections.singletonList(new AttachmentEventImpl( + AttachmentEvent.newBuilder() + .setCoordinates(coordinates.toEventCoords()) + .setAttachment(attach) + .build()))); + } catch (InvalidProtocolBufferException e) { + log.error("Cannot deserialize attachment", e); + } + }); + context.select(PENDING_VALIDATIONS.VALIDATIONS) + .from(PENDING_VALIDATIONS) + .where(PENDING_VALIDATIONS.COORDINATES.eq(r.value1())) + .stream() + .forEach(bytes -> { + try { + Validations attach = Validations.parseFrom(bytes.value1()); + kerl.appendValidations(coordinates, attach.getValidationsList() + .stream() + .collect(Collectors.toMap( + v -> EventCoordinates.from( + v.getValidator()), + v -> JohnHancock.from( + v.getSignature())))); + } catch (InvalidProtocolBufferException e) { + log.error("Cannot deserialize validation", e); + } + }); + kerl.append(event); + } + context.deleteFrom(PENDING_COORDINATES).where(PENDING_COORDINATES.ID.eq(r.value1())).execute(); + }); } private KeyEventWithAttachmentAndValidations_ event(Digest d, DSLContext dsl, DigestKERL kerl) { @@ -367,17 +365,15 @@ private KeyEventWithAttachmentAndValidations_ event(Digest d, DSLContext dsl, Di builder.setAttachment(a.toAttachemente()); Map vs = kerl.getValidations(coordinates); var v = Validations.newBuilder() - .setCoordinates(coordinates.toEventCoords()) - .addAllValidations(vs.entrySet() - .stream() - .map(e -> Validation_.newBuilder() - .setValidator(e.getKey() - .toEventCoords()) - .setSignature(e.getValue() - .toSig()) - .build()) - .toList()) - .build(); + .setCoordinates(coordinates.toEventCoords()) + .addAllValidations(vs.entrySet() + .stream() + .map(e -> Validation_.newBuilder() + .setValidator(e.getKey().toEventCoords()) + .setSignature(e.getValue().toSig()) + .build()) + .toList()) + .build(); builder.setValidations(v); builder.setEvent(event.toKeyEvent_()); return builder.build(); @@ -389,42 +385,43 @@ private Stream eventDigestsIn(CombinedIntervals intervals, DSLContext ds private Stream eventDigestsIn(KeyInterval interval, DSLContext dsl) { return Stream.concat(dsl.select(EVENT.DIGEST) - .from(EVENT) - .join(COORDINATES) - .on(EVENT.COORDINATES.eq(COORDINATES.ID)) - .join(IDENTIFIER) - .on(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .join(IDENTIFIER_LOCATION_HASH) - .on(IDENTIFIER.ID.eq(IDENTIFIER_LOCATION_HASH.IDENTIFIER)) - .where(IDENTIFIER_LOCATION_HASH.DIGEST.ge(interval.getBegin().getBytes())) - .and(IDENTIFIER_LOCATION_HASH.DIGEST.le(interval.getEnd().getBytes())) - .stream() - .map(r -> { - try { - return Digest.from(Digeste.parseFrom(r.value1())); - } catch (InvalidProtocolBufferException e) { - return null; - } - }) - .filter(d -> d != null), - dsl.select(PENDING_EVENT.DIGEST) - .from(PENDING_EVENT) - .join(PENDING_COORDINATES) - .on(PENDING_EVENT.COORDINATES.eq(PENDING_COORDINATES.ID)) - .join(IDENTIFIER) - .on(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) - .join(IDENTIFIER_LOCATION_HASH) - .on(IDENTIFIER.ID.eq(IDENTIFIER_LOCATION_HASH.IDENTIFIER)) - .where(IDENTIFIER_LOCATION_HASH.DIGEST.ge(interval.getBegin().getBytes())) - .and(IDENTIFIER_LOCATION_HASH.DIGEST.le(interval.getEnd().getBytes())) - .stream() - .map(r -> { - try { - return Digest.from(Digeste.parseFrom(r.value1())); - } catch (InvalidProtocolBufferException e) { - return null; - } - }) - .filter(d -> d != null)); + .from(EVENT) + .join(COORDINATES) + .on(EVENT.COORDINATES.eq(COORDINATES.ID)) + .join(IDENTIFIER) + .on(COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) + .join(IDENTIFIER_LOCATION_HASH) + .on(IDENTIFIER.ID.eq(IDENTIFIER_LOCATION_HASH.IDENTIFIER)) + .where(IDENTIFIER_LOCATION_HASH.DIGEST.ge(interval.getBegin().getBytes())) + .and(IDENTIFIER_LOCATION_HASH.DIGEST.le(interval.getEnd().getBytes())) + .stream() + .map(r -> { + try { + return Digest.from(Digeste.parseFrom(r.value1())); + } catch (InvalidProtocolBufferException e) { + return null; + } + }) + .filter(d -> d != null), dsl.select(PENDING_EVENT.DIGEST) + .from(PENDING_EVENT) + .join(PENDING_COORDINATES) + .on(PENDING_EVENT.COORDINATES.eq(PENDING_COORDINATES.ID)) + .join(IDENTIFIER) + .on(PENDING_COORDINATES.IDENTIFIER.eq(IDENTIFIER.ID)) + .join(IDENTIFIER_LOCATION_HASH) + .on(IDENTIFIER.ID.eq(IDENTIFIER_LOCATION_HASH.IDENTIFIER)) + .where(IDENTIFIER_LOCATION_HASH.DIGEST.ge( + interval.getBegin().getBytes())) + .and(IDENTIFIER_LOCATION_HASH.DIGEST.le( + interval.getEnd().getBytes())) + .stream() + .map(r -> { + try { + return Digest.from(Digeste.parseFrom(r.value1())); + } catch (InvalidProtocolBufferException e) { + return null; + } + }) + .filter(d -> d != null)); } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KeyInterval.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KeyInterval.java index 8d2147e27c..081475f7c8 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KeyInterval.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KeyInterval.java @@ -7,14 +7,13 @@ package com.salesforce.apollo.thoth; -import java.util.function.Predicate; - import com.salesfoce.apollo.thoth.proto.Interval; import com.salesforce.apollo.cryptography.Digest; +import java.util.function.Predicate; + /** * @author hal.hildebrand - * */ public class KeyInterval implements Predicate { private final Digest begin; @@ -30,11 +29,6 @@ public KeyInterval(Interval interval) { this(Digest.from(interval.getStart()), Digest.from(interval.getEnd())); } - @Override - public boolean test(Digest t) { - return begin.compareTo(t) > 0 && end.compareTo(t) > 0; - } - public Digest getBegin() { return begin; } @@ -43,6 +37,11 @@ public Digest getEnd() { return end; } + @Override + public boolean test(Digest t) { + return begin.compareTo(t) > 0 && end.compareTo(t) > 0; + } + public Interval toInterval() { return Interval.newBuilder().setStart(begin.toDigeste()).setEnd(end.toDigeste()).build(); } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/LoggingOutputStream.java b/thoth/src/main/java/com/salesforce/apollo/thoth/LoggingOutputStream.java index 4b7e3b8008..be365bdbec 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/LoggingOutputStream.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/LoggingOutputStream.java @@ -7,25 +7,19 @@ package com.salesforce.apollo.thoth; +import org.slf4j.Logger; + import java.io.ByteArrayOutputStream; import java.io.OutputStream; -import org.slf4j.Logger; - /** * @author hal.hildebrand - * */ public class LoggingOutputStream extends OutputStream { - public enum LogLevel { - DEBUG, ERROR, INFO, TRACE, WARN, - } - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(1000); private final LogLevel level; - - private final Logger logger; + private final Logger logger; public LoggingOutputStream(Logger logger, LogLevel level) { this.logger = logger; @@ -60,4 +54,8 @@ public void write(int b) { } } + public enum LogLevel { + DEBUG, ERROR, INFO, TRACE, WARN, + } + } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java index 7ba584517b..2b0928c23a 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java @@ -36,7 +36,7 @@ * @author hal.hildebrand */ public class Maat extends DelegatedKERL { - private static Logger log = LoggerFactory.getLogger(Maat.class); + private static final Logger log = LoggerFactory.getLogger(Maat.class); private final Context context; diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Publisher.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Publisher.java index 90ad3266c6..cbeeea1527 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Publisher.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Publisher.java @@ -30,16 +30,17 @@ public class Publisher implements ProtoEventObserver { private final CommonCommunications comms; - private final Digest context; - private final ProtoKERLAdapter kerl; - private final EventObserver service; + private final Digest context; + private final ProtoKERLAdapter kerl; + private final EventObserver service; + public Publisher(SigningMember member, ProtoKERLAdapter kerl, Router router, Digest context) { this.kerl = kerl; this.context = context; service = new Service(); comms = router.create(member, context, service, service.getClass().getSimpleName(), - r -> new EventObserverServer(r, router.getClientIdentityProvider(), null), null, - EventObserverClient.getLocalLoopback(this, member)); + r -> new EventObserverServer(r, router.getClientIdentityProvider(), null), null, + EventObserverClient.getLocalLoopback(this, member)); } @Override @@ -78,8 +79,7 @@ public void publishAttachments(List attachments, Digest from) { } @Override - public void publishEvents(List events, List validations, - Digest from) { + public void publishEvents(List events, List validations, Digest from) { Publisher.this.publishEvents(events, validations); } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java index f2dd3b7747..9f435a0ada 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java @@ -29,11 +29,11 @@ * @author hal.hildebrand */ public class Thoth { - private static final Logger log = LoggerFactory.getLogger(Thoth.class); - private final Stereotomy stereotomy; - private volatile SelfAddressingIdentifier controller; - private volatile ControlledIdentifier identifier; - private volatile Consumer pending; + private static final Logger log = LoggerFactory.getLogger(Thoth.class); + private final Stereotomy stereotomy; + private volatile SelfAddressingIdentifier controller; + private volatile ControlledIdentifier identifier; + private volatile Consumer pending; public Thoth(Stereotomy stereotomy) { this.stereotomy = stereotomy; @@ -85,11 +85,9 @@ public DelegatedRotationEvent rotate(RotationSpecification.Builder specification private Consumer inception(DelegatedInceptionEvent incp) { return coordinates -> { - var commitment = ProtobufEventFactory.INSTANCE.attachment(incp, - new AttachmentImpl(Seal.EventSeal.construct(coordinates.getIdentifier(), - coordinates.getDigest(), - coordinates.getSequenceNumber() - .longValue()))); + var commitment = ProtobufEventFactory.INSTANCE.attachment(incp, new AttachmentImpl( + Seal.EventSeal.construct(coordinates.getIdentifier(), coordinates.getDigest(), + coordinates.getSequenceNumber().longValue()))); ControlledIdentifier cid = stereotomy.commit(incp, commitment); identifier = cid; controller = (SelfAddressingIdentifier) identifier.getDelegatingIdentifier().get(); @@ -100,15 +98,13 @@ private Consumer inception(DelegatedInceptionEvent incp) { private Consumer rotation(DelegatedRotationEvent rot) { return coordinates -> { - var commitment = ProtobufEventFactory.INSTANCE.attachment(rot, - new AttachmentImpl(Seal.EventSeal.construct(coordinates.getIdentifier(), - coordinates.getDigest(), - coordinates.getSequenceNumber() - .longValue()))); + var commitment = ProtobufEventFactory.INSTANCE.attachment(rot, new AttachmentImpl( + Seal.EventSeal.construct(coordinates.getIdentifier(), coordinates.getDigest(), + coordinates.getSequenceNumber().longValue()))); Void cid = identifier.commit(rot, commitment); pending = null; log.info("Rotated delegated identifier: {} controller: {}", identifier.getCoordinates(), controller, - identifier.getCoordinates()); + identifier.getCoordinates()); }; } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java new file mode 100644 index 0000000000..bf5edafbd3 --- /dev/null +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java @@ -0,0 +1,102 @@ +package com.salesforce.apollo.thoth.grpc; + +import com.google.protobuf.Empty; +import com.salesfoce.apollo.stereotomy.event.proto.EventCoords; +import com.salesfoce.apollo.stereotomy.event.proto.Ident; +import com.salesfoce.apollo.stereotomy.event.proto.InceptionEvent; +import com.salesfoce.apollo.stereotomy.event.proto.RotationEvent; +import com.salesfoce.apollo.thoth.proto.Thoth_Grpc; +import com.salesforce.apollo.stereotomy.EventCoordinates; +import com.salesforce.apollo.stereotomy.event.protobuf.InceptionEventImpl; +import com.salesforce.apollo.stereotomy.event.protobuf.RotationEventImpl; +import com.salesforce.apollo.stereotomy.identifier.Identifier; +import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; +import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; +import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; +import com.salesforce.apollo.thoth.Thoth; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author hal.hildebrand + **/ +public class ThothServer extends Thoth_Grpc.Thoth_ImplBase { + private static final Logger log = LoggerFactory.getLogger( + ThothServer.class); + private final IdentifierSpecification.Builder inception; + private final RotationSpecification.Builder rotation; + private final Thoth thoth; + + public ThothServer(IdentifierSpecification.Builder inception, + RotationSpecification.Builder rotation, Thoth thoth) { + this.inception = inception; + this.rotation = rotation; + this.thoth = thoth; + } + + @Override + public void commit(EventCoords request, StreamObserver responseObserver) { + var from = EventCoordinates.from(request); + try { + thoth.commit(from); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } catch (Throwable t) { + log.info("Error committing delegation event: " + from, t); + responseObserver.onError(t); + } + } + + @Override + public void identifier(Empty request, StreamObserver responseObserver) { + try { + var ident = thoth.identifier().toIdent(); + responseObserver.onNext(ident); + responseObserver.onCompleted(); + } catch (Throwable t) { + log.info("Error getting identifier", t); + responseObserver.onError(t); + } + } + + @Override + public void inception(Ident request, StreamObserver responseObserver) { + try { + var i = Identifier.from(request); + if (i instanceof SelfAddressingIdentifier sai) { + var incep = thoth.inception(sai, inception); + if (incep instanceof InceptionEventImpl incp) { + responseObserver.onNext(incp.toInceptionEvent_()); + responseObserver.onCompleted(); + } else { + log.info("Not an inception event impl: {}", incep); + responseObserver.onError(new IllegalArgumentException("Not an inception event: " + incep)); + } + } else { + log.info("Not a SelfAddressingIdentifier: {}", i); + responseObserver.onError(new IllegalArgumentException("Not a SelfAddressingIdentifier: " + i)); + } + } catch (Throwable t) { + log.info("Error creating inception event", t); + responseObserver.onError(t); + } + } + + @Override + public void rotate(Empty request, StreamObserver responseObserver) { + try { + var rot = thoth.rotate(rotation); + if (rot instanceof RotationEventImpl incp) { + responseObserver.onNext(incp.toRotationEvent_()); + responseObserver.onCompleted(); + } else { + log.info("Not a rotation event impl: {}", rot); + responseObserver.onError(new IllegalArgumentException("Not a rotation event: " + rot)); + } + } catch (Throwable t) { + log.info("Error rotating identifier", t); + responseObserver.onError(t); + } + } +} diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/Delegation.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/Delegation.java index 32305fbf16..d072d419f3 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/Delegation.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/Delegation.java @@ -6,17 +6,16 @@ */ package com.salesforce.apollo.thoth.grpc.delegation; -import java.util.concurrent.CompletableFuture; - import com.salesforce.apollo.cryptography.SigningThreshold; import com.salesforce.apollo.stereotomy.event.DelegatedInceptionEvent; import com.salesforce.apollo.stereotomy.event.DelegatedRotationEvent; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; +import java.util.concurrent.CompletableFuture; + /** * @author hal.hildebrand - * */ public interface Delegation { DelegatedInceptionEvent inception(SelfAddressingIdentifier controller, SigningThreshold signingThreshold, diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationClient.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationClient.java index b28919aebd..c99cfe1944 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationClient.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationClient.java @@ -8,7 +8,6 @@ /** * @author hal.hildebrand - * */ public class DelegationClient { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationServer.java index df661c60d3..97b4521be4 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationServer.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationServer.java @@ -10,7 +10,6 @@ /** * @author hal.hildebrand - * */ public class DelegationServer extends DelegatedImplBase { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationService.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationService.java index 2563f94579..d90cfe69a6 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationService.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/delegation/DelegationService.java @@ -10,7 +10,6 @@ /** * @author hal.hildebrand - * */ public interface DelegationService extends Link { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtClient.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtClient.java index f6b1d4c76c..7825d2a708 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtClient.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtClient.java @@ -98,6 +98,11 @@ public KeyState_ getKeyState(Ident identifier) { return service.getKeyState(identifier); } + @Override + public KeyState_ getKeyState(IdentAndSeq identAndSeq) { + return null; + } + @Override public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinates) { return service.getKeyStateWithAttachments(coordinates); @@ -118,11 +123,6 @@ public Member getMember() { public Validations getValidations(EventCoords coordinates) { return service.getValidations(coordinates); } - - @Override - public KeyState_ getKeyState(IdentAndSeq identAndSeq) { - return null; - } }; } @@ -312,6 +312,27 @@ public KeyState_ getKeyState(Ident identifier) { return result; } + @Override + public KeyState_ getKeyState(IdentAndSeq identAndSeq) { + Context timer = metrics == null ? null : metrics.getKeyStateClient().time(); + if (metrics != null) { + final var bs = identAndSeq.getSerializedSize(); + metrics.outboundBandwidth().mark(bs); + metrics.outboundGetKeyStateRequest().mark(bs); + } + var result = client.getKeyStateSeqNum(identAndSeq); + if (timer != null) { + timer.stop(); + } + if (timer != null) { + final var serializedSize = result.getSerializedSize(); + timer.stop(); + metrics.inboundBandwidth().mark(serializedSize); + metrics.inboundGetKeyStateCoordsResponse().mark(serializedSize); + } + return result; + } + @Override public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinates) { Context timer = metrics == null ? null : metrics.getAttachmentClient().time(); @@ -377,25 +398,4 @@ public Validations getValidations(EventCoords coordinates) { } return complete; } - - @Override - public KeyState_ getKeyState(IdentAndSeq identAndSeq) { - Context timer = metrics == null ? null : metrics.getKeyStateClient().time(); - if (metrics != null) { - final var bs = identAndSeq.getSerializedSize(); - metrics.outboundBandwidth().mark(bs); - metrics.outboundGetKeyStateRequest().mark(bs); - } - var result = client.getKeyStateSeqNum(identAndSeq); - if (timer != null) { - timer.stop(); - } - if (timer != null) { - final var serializedSize = result.getSerializedSize(); - timer.stop(); - metrics.inboundBandwidth().mark(serializedSize); - metrics.inboundGetKeyStateCoordsResponse().mark(serializedSize); - } - return result; - } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java index c429d66cfe..897ddf349d 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java @@ -23,7 +23,7 @@ */ public class DhtServer extends KerlDhtImplBase { - private final StereotomyMetrics metrics; + private final StereotomyMetrics metrics; private final RoutableService routing; public DhtServer(RoutableService router, StereotomyMetrics metrics) { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtService.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtService.java index 6aef9d6bc4..05be7b395e 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtService.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtService.java @@ -41,11 +41,11 @@ public interface DhtService extends Link { KeyState_ getKeyState(Ident identifier); + KeyState_ getKeyState(IdentAndSeq identAndSeq); + KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinates); KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndValidations(EventCoords coordinates); Validations getValidations(EventCoords coordinates); - - KeyState_ getKeyState(IdentAndSeq identAndSeq); } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/Reconciliation.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/Reconciliation.java index 728e0f19a3..a8e3507127 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/Reconciliation.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/Reconciliation.java @@ -14,7 +14,6 @@ /** * @author hal.hildebrand - * */ public interface Reconciliation { Update reconcile(Intervals intervals, Digest member); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationClient.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationClient.java index 2d84b4c315..9c24a318cb 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationClient.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationClient.java @@ -7,14 +7,12 @@ package com.salesforce.apollo.thoth.grpc.reconciliation; -import java.io.IOException; - import com.google.protobuf.Empty; +import com.salesfoce.apollo.cryptography.proto.Digeste; import com.salesfoce.apollo.thoth.proto.Intervals; import com.salesfoce.apollo.thoth.proto.ReconciliationGrpc; import com.salesfoce.apollo.thoth.proto.Update; import com.salesfoce.apollo.thoth.proto.Updating; -import com.salesfoce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; import com.salesforce.apollo.cryptography.Digest; @@ -22,11 +20,26 @@ import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.stereotomy.services.grpc.StereotomyMetrics; +import java.io.IOException; + /** * @author hal.hildebrand - * */ public class ReconciliationClient implements ReconciliationService { + private final ManagedServerChannel channel; + private final ReconciliationGrpc.ReconciliationBlockingStub client; + @SuppressWarnings("unused") + private final Digeste context; + @SuppressWarnings("unused") + private final StereotomyMetrics metrics; + + public ReconciliationClient(Digest context, ManagedServerChannel channel, StereotomyMetrics metrics) { + this.context = context.toDigeste(); + this.channel = channel; + this.client = ReconciliationGrpc.newBlockingStub(channel).withCompression("gzip"); + this.metrics = metrics; + } + public static CreateClientCommunications getCreate(Digest context, StereotomyMetrics metrics) { return (c) -> { @@ -47,31 +60,17 @@ public Member getMember() { } @Override - public Update reconcile(Intervals intervals) { + public Update reconcile(Intervals intervals) { return Update.getDefaultInstance(); } @Override - public Empty update(Updating update) { + public Empty update(Updating update) { return Empty.getDefaultInstance(); } }; } - private final ManagedServerChannel channel; - private final ReconciliationGrpc.ReconciliationBlockingStub client; - @SuppressWarnings("unused") - private final Digeste context; - @SuppressWarnings("unused") - private final StereotomyMetrics metrics; - - public ReconciliationClient(Digest context, ManagedServerChannel channel, StereotomyMetrics metrics) { - this.context = context.toDigeste(); - this.channel = channel; - this.client = ReconciliationGrpc.newBlockingStub(channel).withCompression("gzip"); - this.metrics = metrics; - } - @Override public void close() throws IOException { channel.release(); @@ -83,12 +82,12 @@ public Member getMember() { } @Override - public Update reconcile(Intervals intervals) { + public Update reconcile(Intervals intervals) { return client.reconcile(intervals); } @Override - public Empty update(Updating update) { + public Empty update(Updating update) { return client.update(update); } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationServer.java index 47f2337bac..e83d77bf7d 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationServer.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationServer.java @@ -16,12 +16,10 @@ import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.protocols.ClientIdentity; import com.salesforce.apollo.stereotomy.services.grpc.StereotomyMetrics; - import io.grpc.stub.StreamObserver; /** * @author hal.hildebrand - * */ public class ReconciliationServer extends ReconciliationImplBase { private final ClientIdentity identity; diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationService.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationService.java index 20f6953ed0..b15c6abcbf 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationService.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/reconciliation/ReconciliationService.java @@ -7,7 +7,6 @@ package com.salesforce.apollo.thoth.grpc.reconciliation; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; import com.salesfoce.apollo.thoth.proto.Intervals; import com.salesfoce.apollo.thoth.proto.Update; @@ -16,11 +15,10 @@ /** * @author hal.hildebrand - * */ public interface ReconciliationService extends Link { - Update reconcile(Intervals intervals); + Update reconcile(Intervals intervals); Empty update(Updating update); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/GorgoneionMetrics.java b/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/GorgoneionMetrics.java index 019a3ffa8c..ae8fb1d4b5 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/GorgoneionMetrics.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/GorgoneionMetrics.java @@ -8,7 +8,6 @@ /** * @author hal.hildebrand - * */ public interface GorgoneionMetrics { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/KerlDhtMetrics.java b/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/KerlDhtMetrics.java index 24305f183b..9362a19978 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/KerlDhtMetrics.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/metrics/KerlDhtMetrics.java @@ -8,7 +8,6 @@ /** * @author hal.hildebrand - * */ public interface KerlDhtMetrics { diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java new file mode 100644 index 0000000000..7dba13264d --- /dev/null +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java @@ -0,0 +1,127 @@ +package com.salesforce.apollo.thoth; + +import com.google.protobuf.Empty; +import com.salesfoce.apollo.thoth.proto.Thoth_Grpc; +import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; +import com.salesforce.apollo.stereotomy.ControlledIdentifier; +import com.salesforce.apollo.stereotomy.EventCoordinates; +import com.salesforce.apollo.stereotomy.Stereotomy; +import com.salesforce.apollo.stereotomy.StereotomyImpl; +import com.salesforce.apollo.stereotomy.event.InceptionEvent; +import com.salesforce.apollo.stereotomy.event.RotationEvent; +import com.salesforce.apollo.stereotomy.event.Seal; +import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory; +import com.salesforce.apollo.stereotomy.identifier.Identifier; +import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; +import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; +import com.salesforce.apollo.stereotomy.identifier.spec.InteractionSpecification; +import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; +import com.salesforce.apollo.stereotomy.mem.MemKERL; +import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import com.salesforce.apollo.thoth.grpc.ThothServer; +import io.grpc.Channel; +import io.grpc.ServerBuilder; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * @author hal.hildebrand + **/ +public class ThothServerTest { + private SecureRandom secureRandom; + + @BeforeEach + public void before() throws Exception { + secureRandom = SecureRandom.getInstance("SHA1PRNG"); + secureRandom.setSeed(new byte[] { 0 }); + } + + @Test + public void smokin() throws Exception { + var ks = new MemKeyStore(); + var kerl = new MemKERL(DigestAlgorithm.DEFAULT); + Stereotomy stereotomy = new StereotomyImpl(ks, kerl, secureRandom); + var member = new ControlledIdentifierMember(stereotomy.newIdentifier()); + + var localId = UUID.randomUUID().toString(); + ServerBuilder serverBuilder = InProcessServerBuilder.forName(localId) + .addService( + new ThothServer(IdentifierSpecification.newBuilder(), + RotationSpecification.newBuilder(), + new Thoth(stereotomy))); + var server = serverBuilder.build(); + server.start(); + try { + + var channel = InProcessChannelBuilder.forName(localId).usePlaintext().build(); + var thoth = new ThothClient(channel); + + ControlledIdentifier controller = stereotomy.newIdentifier(); + + // delegated inception + var incp = thoth.inception(controller.getIdentifier()); + assertNotNull(incp); + + var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(stereotomy.digestAlgorithm()), + incp.getSequenceNumber().longValue()); + + var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); + + // Commit + EventCoordinates coords = controller.seal(builder); + thoth.commit(coords); + assertNotNull(thoth.identifier()); + + // Delegated rotation + var rot = thoth.rotate(); + + assertNotNull(rot); + + seal = Seal.EventSeal.construct(rot.getIdentifier(), rot.hash(stereotomy.digestAlgorithm()), + rot.getSequenceNumber().longValue()); + + builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); + + // Commit + coords = controller.seal(builder); + thoth.commit(coords); + } finally { + server.shutdown(); + server.awaitTermination(3, TimeUnit.SECONDS); + } + } + + private static class ThothClient { + private Thoth_Grpc.Thoth_BlockingStub client; + + private ThothClient(Channel channel) { + this.client = Thoth_Grpc.newBlockingStub(channel); + } + + public void commit(EventCoordinates coordinates) { + client.commit(coordinates.toEventCoords()); + } + + public SelfAddressingIdentifier identifier() { + return (SelfAddressingIdentifier) Identifier.from(client.identifier(Empty.getDefaultInstance())); + } + + public InceptionEvent inception(SelfAddressingIdentifier identifier) { + return ProtobufEventFactory.toKeyEvent(client.inception(identifier.toIdent())); + } + + public RotationEvent rotate() { + return ProtobufEventFactory.toKeyEvent(client.rotate(Empty.getDefaultInstance())); + } + } +} diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothTest.java index a31f5a4cc1..887b9ba629 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothTest.java @@ -35,7 +35,7 @@ public class ThothTest { @BeforeEach public void before() throws Exception { secureRandom = SecureRandom.getInstance("SHA1PRNG"); - secureRandom.setSeed(new byte[]{0}); + secureRandom.setSeed(new byte[] { 0 }); } @Test @@ -50,11 +50,11 @@ public void smokin() throws Exception { // delegated inception var incp = thoth.inception(controller.getIdentifier(), - IdentifierSpecification.newBuilder()); + IdentifierSpecification.newBuilder()); assertNotNull(incp); var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(stereotomy.digestAlgorithm()), - incp.getSequenceNumber().longValue()); + incp.getSequenceNumber().longValue()); var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); @@ -69,7 +69,7 @@ public void smokin() throws Exception { assertNotNull(rot); seal = Seal.EventSeal.construct(rot.getIdentifier(), rot.hash(stereotomy.digestAlgorithm()), - rot.getSequenceNumber().longValue()); + rot.getSequenceNumber().longValue()); builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal));