diff --git a/grpc/src/main/proto/leyden.proto b/grpc/src/main/proto/leyden.proto index b747176f1..a6e3c275a 100644 --- a/grpc/src/main/proto/leyden.proto +++ b/grpc/src/main/proto/leyden.proto @@ -6,6 +6,7 @@ option java_outer_classname = "LeydenProto"; option objc_class_prefix = "Ley"; import "google/protobuf/empty.proto"; +import "google/protobuf/any.proto"; import "stereotomy.proto"; import "stereotomy-services.proto"; @@ -13,11 +14,10 @@ import "crypto.proto"; package leyden; - service Binder { - rpc bind(Bound) returns(google.protobuf.Empty) {} - rpc unbind(Key_) returns(google.protobuf.Empty) {} - rpc get(Key_) returns(Bound) {} + rpc bind(Binding) returns(google.protobuf.Empty) {} + rpc unbind(KeyAndToken) returns(google.protobuf.Empty) {} + rpc get(KeyAndToken) returns(Bound) {} } service Reconciliation { @@ -25,6 +25,12 @@ service Reconciliation { rpc update (Updating) returns (google.protobuf.Empty) {} } +message KeyAndToken { + int32 ring = 1; + bytes key = 2; + bytes token = 3; +} + message Update { repeated Bound bindings = 1; repeated Interval intervals = 2; @@ -47,11 +53,13 @@ message Interval { crypto.Digeste end = 2; } -message Key_ { - bytes key = 1; +message Binding { + int32 ring = 1; + Bound bound = 2; + bytes token = 3; } message Bound { - Key_ key = 1; + bytes key = 1; bytes value = 2; } diff --git a/leyden/pom.xml b/leyden/pom.xml index 9fd3261ba..e6d96068a 100644 --- a/leyden/pom.xml +++ b/leyden/pom.xml @@ -19,6 +19,10 @@ com.h2database h2-mvstore + + com.macasaet.fernet + fernet-java8 + diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java index 2eac3b0cb..975f11d25 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java @@ -1,6 +1,10 @@ package com.salesforce.apollo.leyden; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import com.google.common.collect.Ordering; import com.google.protobuf.InvalidProtocolBufferException; +import com.macasaet.fernet.Token; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl; import com.salesforce.apollo.bloomFilters.BloomFilter; @@ -15,20 +19,27 @@ import com.salesforce.apollo.membership.Ring; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.ring.RingCommunications; +import com.salesforce.apollo.ring.RingIterator; +import com.salesforce.apollo.stereotomy.event.proto.Attachment; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Hex; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.TemporalAmount; import java.util.*; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * @author hal.hildebrand @@ -49,14 +60,23 @@ public class LeydenJar { private final NavigableMap> pending = new ConcurrentSkipListMap<>(); private final Borders borders; private final Reconciled recon; - - public LeydenJar(SigningMember member, Context context, Router communications, double fpr, + private final TemporalAmount operationTimeout; + private final Duration operationsFrequency; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( + 1, Thread.ofVirtual().factory()); + private final OpValidator validator; + + public LeydenJar(OpValidator validator, TemporalAmount operationTimeout, SigningMember member, + Context context, Duration operationsFrequency, Router communications, double fpr, DigestAlgorithm algorithm, MVStore store, ReconciliationMetrics metrics, BinderMetrics binderMetrics) { + this.validator = validator; this.context = context; this.member = member; this.algorithm = algorithm; recon = new Reconciled(); + this.operationTimeout = operationTimeout; + this.operationsFrequency = operationsFrequency; reconComms = communications.create(member, context.getId(), recon, ReconciliationService.class.getCanonicalName(), r -> new ReconciliationServer(r, communications.getClientIdentityProvider(), @@ -74,18 +94,47 @@ public LeydenJar(SigningMember member, Context context, Router communica try { return Bound.parseFrom(b); } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } }))); reconcile = new RingCommunications<>(this.context, member, reconComms); } + public void bindRequest(Bound bound) { + + } + + public Bound get(KeyAndToken key) { + log.info("get: {} on: {}", Hex.hex(key.toByteArray()), member.getId()); + var hash = algorithm.digest(key.toByteString()); + Instant timedOut = Instant.now().plus(operationTimeout); + Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); + var result = new CompletableFuture(); + var gathered = HashMultiset.create(); + var iterate = new RingIterator(operationsFrequency, context, member, scheduler, + binderComms); + iterate.noDuplicates() + .iterate(hash, null, (link, r) -> link.get(key), () -> failedMajority(result, maxCount(gathered)), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, hash, + isTimedOut, destination, "get attachment", + Attachment.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered))); + try { + return result.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause()); + } + } + public void start(Duration gossip) { if (!started.compareAndSet(false, true)) { return; } log.info("Starting: {}", member.getId()); - reconcile(Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), gossip); + reconcile(scheduler, gossip); binderComms.register(context.getId(), borders); reconComms.register(context.getId(), recon); } @@ -99,10 +148,17 @@ public void stop() { reconComms.deregister(context.getId()); } + public void unbind(KeyAndToken key) { + + } + private void add(Bound bound) { - var hash = algorithm.digest(bound.getKey().toByteString()); + var hash = algorithm.digest(bound.getKey()); bottled.put(hash.getBytes(), bound); - log.info("Replicated consensus on: {} on: {}", hash, member.getId()); + log.info("Add: {} on: {}", Hex.hex(bound.getKey().toByteArray()), member.getId()); + } + + private void bindRequest(Binding request) { } private Bound binding(Digest d) { @@ -110,6 +166,50 @@ private Bound binding(Digest d) { } private Stream bindingsIn(KeyInterval i) { + Iterator it = new Iterator() { + private final Iterator iterate = bottled.keyIterator(i.getBegin().getBytes()); + private Digest next; + + { + if (iterate.hasNext()) { + next = new Digest(algorithm, iterate.next()); + if (next.compareTo(i.getEnd()) > 0) { + next = null; // got nothing + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Digest next() { + var returned = next; + if (returned == null) { + throw new NoSuchElementException(); + } + if (iterate.hasNext()) { + next = new Digest(algorithm, iterate.next()); + if (next.compareTo(i.getEnd()) > 0) { + next = null; // got nothing + } + } + return returned; + } + }; + Iterable iterable = () -> it; + return StreamSupport.stream(iterable.spliterator(), false); + } + + private void failedMajority(CompletableFuture result, int maxAgree) { + result.completeExceptionally(new NoSuchElementException( + "Unable to achieve majority read, max: %s" + " required: %s on: %s".formatted(maxAgree, context.majority(), + member.getId()))); + } + + private Bound getRequest(KeyAndToken request) { return null; } @@ -151,6 +251,15 @@ private CombinedIntervals keyIntervals() { return new CombinedIntervals(intervals); } + private Multiset.Entry max(HashMultiset gathered) { + return gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); + } + + private int maxCount(HashMultiset gathered) { + final var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)); + return max.isEmpty() ? 0 : max.get().getCount(); + } + private Biff populate(long seed, CombinedIntervals keyIntervals) { BloomFilter.DigestBloomFilter bff = new BloomFilter.DigestBloomFilter(seed, Math.max(bottled.size(), 100), fpr); bottled.keyIterator(algorithm.getOrigin().getBytes()).forEachRemaining(b -> { @@ -162,6 +271,33 @@ private Biff populate(long seed, CombinedIntervals keyIntervals) { return bff.toBff(); } + private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, + Optional futureSailor, Digest hash, Supplier isTimedOut, + RingCommunications.Destination destination, String getAttachment, + Attachment defaultInstance) { + if (futureSailor.isEmpty()) { + return !isTimedOut.get(); + } + Bound content = futureSailor.get(); + if (content != null) { + log.trace("bound: {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + gathered.add(content); + var max = max(gathered); + if (max != null) { + tally.set(max.getCount()); + if (max.getCount() > context.toleranceLevel()) { + result.complete(max.getElement()); + log.debug("Majority: {} achieved: {} on: {}", max.getCount(), hash, member.getId()); + return false; + } + } + return !isTimedOut.get(); + } else { + log.debug("Failed {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + return !isTimedOut.get(); + } + } + private Update reconcile(ReconciliationClient link, Integer ring) { if (member.equals(link.getMember())) { return null; @@ -213,9 +349,8 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) { /** * Reconcile the intervals for our partner * - * @param intervals - the relevant intervals of identifiers and the event digests of these identifiers the partner - * already have - * @return the Update.Builder of missing key events, based on the supplied intervals + * @param intervals - the relevant intervals of keys and the digests of these keys the partner already have + * @return the Update.Builder of missing keys, based on the supplied intervals */ private Update.Builder reconcile(Intervals intervals) { var biff = BloomFilter.from(intervals.getHave()); @@ -224,25 +359,28 @@ private Update.Builder reconcile(Intervals intervals) { .stream() .map(KeyInterval::new) .flatMap(this::bindingsIn) - .peek(d -> log.trace("reconcile digest: {}", d)) + .peek(d -> log.trace("reconcile digest: {} on: {}", d, member.getId())) .filter(d -> !biff.contains(d)) - .peek(d -> log.trace("filtered reconcile digest: {}", d)) + .peek(d -> log.trace("filtered reconcile digest: {} on: {}", d, member.getId())) .map(this::binding) .filter(Objects::nonNull) .forEach(update::addBindings); return update; } + private void unbindRequest(KeyAndToken request) { + } + private void update(List bindings, Digest from) { if (bindings.isEmpty()) { - log.trace("No bindings to update"); + log.trace("No bindings to update: {} on: {}", from, member.getId()); return; } - log.trace("Events to update: {}", bindings.size()); + log.trace("Events to update: {} on: {}", bindings.size(), member.getId()); for (var bound : bindings) { - var key = algorithm.digest(bound.getKey().toByteString()); - var states = pending.computeIfAbsent(key, k -> new ArrayList<>()); + var digest = algorithm.digest(bound.toByteString()); + var states = pending.computeIfAbsent(digest, k -> new ArrayList<>()); var found = false; for (var cs : states) { if (cs.test(bound, from)) { @@ -259,6 +397,18 @@ private void update(List bindings, Digest from) { } } + public enum Operation { + PUT, DELETE, GET; + } + + public interface OpValidator { + boolean validateBind(Bound bound, Token token); + + boolean validateGet(byte[] key, Token token); + + boolean validateUnbind(byte[] key, Token token); + } + private static class ConsensusState { private final Bound binding; private final List members = new ArrayList<>(); @@ -307,7 +457,8 @@ public Update reconcile(Intervals intervals, Digest from) { CombinedIntervals keyIntervals = keyIntervals(); builder.addAllIntervals(keyIntervals.toIntervals()) .setHave(populate(Entropy.nextBitsStreamLong(), keyIntervals)); - log.trace("Reconcile for: {} ring: {} count: {} on: {}", from, ring, builder.getBindingsCount(), member); + log.trace("Reconcile for: {} ring: {} count: {} on: {}", from, ring, builder.getBindingsCount(), + member.getId()); return builder.build(); } @@ -324,13 +475,50 @@ public void update(Updating update, Digest from) { private class Borders implements BinderService { @Override - public void bind(Bound request, Digest from) { - + public void bind(Binding request, Digest from) { + Member predecessor = context.ring(request.getRing()).predecessor(member); + if (predecessor == null || !from.equals(predecessor.getId())) { + log.debug("Invalid Bind ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), + member, from, request.getRing(), predecessor.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + if (!validator.validateBind(request.getBound(), Token.fromBytes(request.getToken().toByteArray()))) { + log.warn("Invalid Bind Token on {}:{}", context.getId(), member.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + LeydenJar.this.bindRequest(request); } @Override - public void unbind(Key_ request, Digest from) { + public Bound get(KeyAndToken request, Digest from) { + Member predecessor = context.ring(request.getRing()).predecessor(member); + if (predecessor == null || !from.equals(predecessor.getId())) { + log.debug("Invalid Get ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), + member.getId(), from, request.getRing(), predecessor.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + if (!validator.validateGet(request.getKey().toByteArray(), + Token.fromBytes(request.getToken().toByteArray()))) { + log.warn("Invalid Get Token on {}:{}", context.getId(), member.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + return LeydenJar.this.getRequest(request); + } + @Override + public void unbind(KeyAndToken request, Digest from) { + Member predecessor = context.ring(request.getRing()).predecessor(member); + if (predecessor == null || !from.equals(predecessor.getId())) { + log.debug("Invalid Unbind ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), + member.getId(), from, request.getRing(), predecessor.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + if (!validator.validateUnbind(request.getKey().toByteArray(), + Token.fromBytes(request.getToken().toByteArray()))) { + log.warn("Invalid Unbind Token on {}:{}", context.getId(), member.getId()); + throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + } + LeydenJar.this.unbindRequest(request); } } } diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java index 20ca235dc..f5ab11d4c 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java @@ -2,8 +2,9 @@ import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.leyden.proto.BinderGrpc; +import com.salesforce.apollo.leyden.proto.Binding; import com.salesforce.apollo.leyden.proto.Bound; -import com.salesforce.apollo.leyden.proto.Key_; +import com.salesforce.apollo.leyden.proto.KeyAndToken; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; @@ -30,7 +31,7 @@ public static BinderClient getCreate(ManagedServerChannel c, BinderMetrics binde public static BinderClient getLocalLoopback(BinderService service, SigningMember member) { return new BinderClient() { @Override - public void bind(Bound binding) { + public void bind(Binding binding) { service.bind(binding, member.getId()); } @@ -39,20 +40,25 @@ public void close() throws IOException { // no op } + @Override + public Bound get(KeyAndToken key) { + return null; + } + @Override public Member getMember() { return member; } @Override - public void unbind(Key_ key) { + public void unbind(KeyAndToken key) { service.unbind(key, member.getId()); } }; } @Override - public void bind(Bound binding) { + public void bind(Binding binding) { client.bind(binding); } @@ -61,13 +67,18 @@ public void close() throws IOException { channel.shutdown(); } + @Override + public Bound get(KeyAndToken key) { + return client.get(key); + } + @Override public Member getMember() { return channel.getMember(); } @Override - public void unbind(Key_ key) { + public void unbind(KeyAndToken key) { client.unbind(key); } } diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderClient.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderClient.java index 874e0bfd0..bfe6d404c 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderClient.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderClient.java @@ -1,15 +1,18 @@ package com.salesforce.apollo.leyden.comm.binding; import com.salesforce.apollo.archipelago.Link; +import com.salesforce.apollo.leyden.proto.Binding; import com.salesforce.apollo.leyden.proto.Bound; -import com.salesforce.apollo.leyden.proto.Key_; +import com.salesforce.apollo.leyden.proto.KeyAndToken; /** * @author hal.hildebrand **/ public interface BinderClient extends Link { - void bind(Bound binding); + void bind(Binding binding); - void unbind(Key_ key); + Bound get(KeyAndToken key); + + void unbind(KeyAndToken key); } diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderMetrics.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderMetrics.java index 56d30c2f1..cf400f4a1 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderMetrics.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderMetrics.java @@ -12,6 +12,10 @@ public interface BinderMetrics extends EndpointMetrics { Timer inboundBindTimer(); + Histogram inboundGet(); + + Timer inboundGetTimer(); + Histogram inboundUnbind(); Timer inboundUnbindTimer(); diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderServer.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderServer.java index 569dd5db2..b5134a82e 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderServer.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderServer.java @@ -5,8 +5,9 @@ import com.salesforce.apollo.archipelago.RoutableService; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.leyden.proto.BinderGrpc; +import com.salesforce.apollo.leyden.proto.Binding; import com.salesforce.apollo.leyden.proto.Bound; -import com.salesforce.apollo.leyden.proto.Key_; +import com.salesforce.apollo.leyden.proto.KeyAndToken; import com.salesforce.apollo.protocols.ClientIdentity; import io.grpc.stub.StreamObserver; @@ -27,7 +28,7 @@ public BinderServer(RoutableService r, ClientIdentity clientIdent } @Override - public void bind(Bound request, StreamObserver responseObserver) { + public void bind(Binding request, StreamObserver responseObserver) { Timer.Context timer = metrics == null ? null : metrics.inboundBindTimer().time(); if (metrics != null) { var serializedSize = request.getSerializedSize(); @@ -53,7 +54,33 @@ public void bind(Bound request, StreamObserver responseObserver) { } @Override - public void unbind(Key_ request, StreamObserver responseObserver) { + public void get(KeyAndToken request, StreamObserver responseObserver) { + Timer.Context timer = metrics == null ? null : metrics.inboundGetTimer().time(); + if (metrics != null) { + var serializedSize = request.getSerializedSize(); + metrics.inboundBandwidth().mark(serializedSize); + metrics.inboundGet().update(serializedSize); + } + Digest from = identity.getFrom(); + if (from == null) { + responseObserver.onError(new IllegalStateException("Member has been removed")); + return; + } + routing.evaluate(responseObserver, s -> { + try { + var bound = s.get(request, from); + responseObserver.onNext(bound); + responseObserver.onCompleted(); + } finally { + if (timer != null) { + timer.stop(); + } + } + }); + } + + @Override + public void unbind(KeyAndToken request, StreamObserver responseObserver) { Timer.Context timer = metrics == null ? null : metrics.inboundUnbindTimer().time(); if (metrics != null) { var serializedSize = request.getSerializedSize(); diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderService.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderService.java index f2fe855b5..e36e6429d 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderService.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/BinderService.java @@ -1,14 +1,17 @@ package com.salesforce.apollo.leyden.comm.binding; import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.leyden.proto.Binding; import com.salesforce.apollo.leyden.proto.Bound; -import com.salesforce.apollo.leyden.proto.Key_; +import com.salesforce.apollo.leyden.proto.KeyAndToken; /** * @author hal.hildebrand **/ public interface BinderService { - void bind(Bound request, Digest from); + void bind(Binding request, Digest from); - void unbind(Key_ request, Digest from); + Bound get(KeyAndToken request, Digest from); + + void unbind(KeyAndToken request, Digest from); } diff --git a/model/pom.xml b/model/pom.xml index 742499992..9ff8b9b27 100644 --- a/model/pom.xml +++ b/model/pom.xml @@ -8,7 +8,7 @@ model Model - Sytem Domain model for applications, deployments, tenants, etc. + System Domain model for applications, deployments, tenants, etc. com.salesforce.apollo diff --git a/pom.xml b/pom.xml index 05fd5a022..1a11c6fd1 100644 --- a/pom.xml +++ b/pom.xml @@ -527,6 +527,11 @@ ${os.detected.classifier} provided + + com.macasaet.fernet + fernet-java8 + 1.4.2 + org.graalvm.sdk graal-sdk diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 53f383787..209e765e2 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -12,11 +12,6 @@ import com.google.common.collect.Multiset.Entry; import com.google.common.collect.Ordering; import com.google.protobuf.Empty; -import com.salesforce.apollo.stereotomy.event.proto.*; -import com.salesforce.apollo.stereotomy.services.grpc.proto.KeyStates; -import com.salesforce.apollo.thoth.proto.Intervals; -import com.salesforce.apollo.thoth.proto.Update; -import com.salesforce.apollo.thoth.proto.Updating; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.cryptography.Digest; @@ -35,9 +30,11 @@ import com.salesforce.apollo.stereotomy.db.UniKERLDirectPooled; import com.salesforce.apollo.stereotomy.db.UniKERLDirectPooled.ClosableKERL; import com.salesforce.apollo.stereotomy.event.KeyEvent; +import com.salesforce.apollo.stereotomy.event.proto.*; import com.salesforce.apollo.stereotomy.identifier.Identifier; import com.salesforce.apollo.stereotomy.services.grpc.StereotomyMetrics; import com.salesforce.apollo.stereotomy.services.grpc.kerl.KERLAdapter; +import com.salesforce.apollo.stereotomy.services.grpc.proto.KeyStates; import com.salesforce.apollo.stereotomy.services.proto.ProtoKERLAdapter; import com.salesforce.apollo.stereotomy.services.proto.ProtoKERLService; import com.salesforce.apollo.thoth.LoggingOutputStream.LogLevel; @@ -48,6 +45,9 @@ import com.salesforce.apollo.thoth.grpc.reconciliation.ReconciliationClient; import com.salesforce.apollo.thoth.grpc.reconciliation.ReconciliationServer; import com.salesforce.apollo.thoth.grpc.reconciliation.ReconciliationService; +import com.salesforce.apollo.thoth.proto.Intervals; +import com.salesforce.apollo.thoth.proto.Update; +import com.salesforce.apollo.thoth.proto.Updating; import com.salesforce.apollo.utils.Entropy; import liquibase.Liquibase; import liquibase.Scope; @@ -940,7 +940,7 @@ private Update reconcile(ReconciliationService link, Integer ring) { return null; } CombinedIntervals keyIntervals = keyIntervals(); - log.trace("Interval reconciliation on ring: {} with: {} on: {} intervals: {}", ring, link.getMember(), + log.trace("Interval reconciliation on ring: {} with: {} on: {} intervals: {}", ring, link.getMember().getId(), member.getId(), keyIntervals); return link.reconcile(Intervals.newBuilder() .setRing(ring) @@ -1042,7 +1042,8 @@ public Update reconcile(Intervals intervals, Digest from) { CombinedIntervals keyIntervals = keyIntervals(); builder.addAllIntervals(keyIntervals.toIntervals()) .setHave(kerlSpace.populate(Entropy.nextBitsStreamLong(), keyIntervals, fpr)); - log.trace("Reconcile for: {} ring: {} count: {} on: {}", from, ring, builder.getEventsCount(), member); + log.trace("Reconcile for: {} ring: {} count: {} on: {}", from, ring, builder.getEventsCount(), + member.getId()); return builder.build(); } catch (IOException | SQLException e) { throw new IllegalStateException("Cannot acquire KERL", e);