diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/DigestType.java b/choam/src/main/java/com/salesforce/apollo/choam/support/DigestType.java index 0d3296422..2dd8f3d55 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/DigestType.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/DigestType.java @@ -6,16 +6,14 @@ */ package com.salesforce.apollo.choam.support; -import java.nio.ByteBuffer; - +import com.salesforce.apollo.cryptography.Digest; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.BasicDataType; -import com.salesforce.apollo.cryptography.Digest; +import java.nio.ByteBuffer; /** * @author hal.hildebrand - * */ public class DigestType extends BasicDataType { @@ -41,8 +39,7 @@ public Digest read(ByteBuffer buff) { } @Override - public void write(WriteBuffer buff, Digest obj) { - Digest digest = (Digest) obj; + public void write(WriteBuffer buff, Digest digest) { buff.put(digest.getAlgorithm().digestCode()); for (long l : digest.getLongs()) { buff.putLong(l); diff --git a/grpc/src/main/proto/leyden.proto b/grpc/src/main/proto/leyden.proto index a6e3c275a..03c6b2d9f 100644 --- a/grpc/src/main/proto/leyden.proto +++ b/grpc/src/main/proto/leyden.proto @@ -26,9 +26,8 @@ service Reconciliation { } message KeyAndToken { - int32 ring = 1; - bytes key = 2; - bytes token = 3; + bytes key = 1; + bytes token = 2; } message Update { @@ -54,9 +53,8 @@ message Interval { } message Binding { - int32 ring = 1; - Bound bound = 2; - bytes token = 3; + Bound bound = 1; + bytes token = 2; } message Bound { diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/ProtobufDatatype.java b/leyden/src/main/java/com/salesforce/apollo/leyden/BoundDatatype.java similarity index 54% rename from leyden/src/main/java/com/salesforce/apollo/leyden/ProtobufDatatype.java rename to leyden/src/main/java/com/salesforce/apollo/leyden/BoundDatatype.java index 77103330a..dd4b7e6db 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/ProtobufDatatype.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/BoundDatatype.java @@ -6,46 +6,48 @@ */ package com.salesforce.apollo.leyden; -import com.google.protobuf.Message; +import com.google.protobuf.InvalidProtocolBufferException; +import com.salesforce.apollo.leyden.proto.Bound; import org.h2.mvstore.DataUtils; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.BasicDataType; import java.nio.ByteBuffer; -import java.util.function.Function; /** * @author hal.hildebrand */ -public final class ProtobufDatatype extends BasicDataType { - private Function factory; +public final class BoundDatatype extends BasicDataType { - public ProtobufDatatype(Function factory) { - this.factory = factory; + @Override + public int compare(Bound a, Bound b) { + return super.compare(a, b); } @Override - public Type[] createStorage(int size) { - @SuppressWarnings("unchecked") - final var storage = (Type[]) new Object[size]; - return storage; + public Bound[] createStorage(int size) { + return new Bound[size]; } @Override - public int getMemory(Type data) { + public int getMemory(Bound data) { return data.getSerializedSize(); } @Override - public Type read(ByteBuffer buff) { + public Bound read(ByteBuffer buff) { int size = DataUtils.readVarInt(buff); byte[] data = new byte[size]; buff.get(data); - return factory.apply(data); + try { + return Bound.parseFrom(buff); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); + } } @Override - public void write(WriteBuffer buff, Type data) { + public void write(WriteBuffer buff, Bound data) { buff.putVarInt(data.getSerializedSize()); buff.put(data.toByteString().toByteArray()); } diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/DigestDatatype.java b/leyden/src/main/java/com/salesforce/apollo/leyden/DigestDatatype.java new file mode 100644 index 000000000..1e3bcede6 --- /dev/null +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/DigestDatatype.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +package com.salesforce.apollo.leyden; + +import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.DigestAlgorithm; +import org.h2.mvstore.WriteBuffer; +import org.h2.mvstore.type.BasicDataType; + +import java.nio.ByteBuffer; + +/** + * @author hal.hildebrand + */ +public final class DigestDatatype extends BasicDataType { + private final DigestAlgorithm algorithm; + + public DigestDatatype(DigestAlgorithm algorithm) { + this.algorithm = algorithm; + } + + @Override + public int compare(Digest a, Digest b) { + return a.compareTo(b); + } + + @Override + public Digest[] createStorage(int size) { + return new Digest[size]; + } + + @Override + public int getMemory(Digest data) { + return algorithm.longLength() * 8; + } + + @Override + public Digest read(ByteBuffer buff) { + byte[] data = new byte[algorithm.longLength() * 8]; + buff.get(data); + return new Digest(algorithm, data); + } + + @Override + public void write(WriteBuffer buff, Digest data) { + buff.put(data.getBytes()); + } +} diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java index a9f47461f..0a9a9f3aa 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java @@ -3,8 +3,6 @@ import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import com.google.common.collect.Ordering; -import com.google.protobuf.InvalidProtocolBufferException; -import com.macasaet.fernet.Token; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl; import com.salesforce.apollo.bloomFilters.BloomFilter; @@ -20,7 +18,6 @@ import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.ring.RingIterator; -import com.salesforce.apollo.stereotomy.event.proto.Attachment; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Hex; import io.grpc.Status; @@ -54,7 +51,7 @@ public class LeydenJar { private final DigestAlgorithm algorithm; private final double fpr; private final SigningMember member; - private final MVMap bottled; + private final MVMap bottled; private final AtomicBoolean started = new AtomicBoolean(); private final RingCommunications reconcile; private final NavigableMap> pending = new ConcurrentSkipListMap<>(); @@ -89,20 +86,15 @@ public LeydenJar(OpValidator validator, TemporalAmount operationTimeout, Signing binderMetrics), c -> Bind.getCreate(c, binderMetrics), Bind.getLocalLoopback(borders, member)); this.fpr = fpr; - bottled = store.openMap(LEYDEN_JAR, - new MVMap.Builder().valueType(new ProtobufDatatype(b -> { - try { - return Bound.parseFrom(b); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); - } - }))); + bottled = store.openMap(LEYDEN_JAR, new MVMap.Builder().keyType(new DigestDatatype(algorithm)) + .valueType(new BoundDatatype())); reconcile = new RingCommunications<>(this.context, member, reconComms); + reconcile.noDuplicates(); } public void bind(Binding bound) { var key = bound.getBound().getKey(); - log.info("bind: {} on: {}", Hex.hex(key.toByteArray()), member.getId()); + log.info("Bind: {} on: {}", Hex.hex(key.toByteArray()), member.getId()); var hash = algorithm.digest(key); Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); @@ -114,40 +106,49 @@ public void bind(Binding bound) { link.bind(bound); return ""; }, () -> failedMajority(result, maxCount(gathered)), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, - hash, isTimedOut, destination, "Bind", - Attachment.getDefaultInstance()), + (tally, futureSailor, destination) -> write(result, gathered, tally, + futureSailor, hash, isTimedOut, + destination), t -> failedMajority(result, maxCount(gathered))); try { result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException re) { + throw re; + } throw new IllegalStateException(e.getCause()); } } - public Bound get(KeyAndToken key) { - log.info("get: {} on: {}", Hex.hex(key.toByteArray()), member.getId()); - var hash = algorithm.digest(key.toByteString()); + public Bound get(KeyAndToken keyAndToken) { + var hash = algorithm.digest(keyAndToken.getKey()); + log.info("Get: {} on: {}", hash, member.getId()); Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); var gathered = HashMultiset.create(); var iterate = new RingIterator(operationsFrequency, context, member, scheduler, binderComms); - iterate.noDuplicates() - .iterate(hash, null, (link, r) -> link.get(key), () -> failedMajority(result, maxCount(gathered)), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, hash, - isTimedOut, destination, "Get", - Attachment.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered))); + iterate.noDuplicates().iterate(hash, null, (link, r) -> { + var bound = link.get(keyAndToken); + log.debug("Get {}: bound: <{}:{}> from: {} on: {}", hash, bound.getKey().toStringUtf8(), + bound.getValue().toStringUtf8(), link.getMember().getId(), member.getId()); + return bound; + }, () -> failedMajority(result, maxCount(gathered)), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, + hash, isTimedOut, destination, "Get"), + t -> failedMajority(result, maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException re) { + throw re; + } throw new IllegalStateException(e.getCause()); } } @@ -173,8 +174,8 @@ public void stop() { public void unbind(KeyAndToken keyAndToken) { var key = keyAndToken.toByteArray(); - log.info("bind: {} on: {}", Hex.hex(key), member.getId()); var hash = algorithm.digest(key); + log.info("Unbind: {} on: {}", hash, member.getId()); Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); @@ -187,36 +188,34 @@ public void unbind(KeyAndToken keyAndToken) { }, () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, hash, isTimedOut, destination, - "Unbind", - Attachment.getDefaultInstance()), + "Unbind"), t -> failedMajority(result, maxCount(gathered))); try { result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException re) { + throw re; + } throw new IllegalStateException(e.getCause()); } } private void add(Bound bound) { var hash = algorithm.digest(bound.getKey()); - bottled.put(hash.getBytes(), bound); + bottled.put(hash, bound); log.info("Add: {} on: {}", Hex.hex(bound.getKey().toByteArray()), member.getId()); } - private Bound binding(Digest d) { - return bottled.get(d.getBytes()); - } - private Stream bindingsIn(KeyInterval i) { Iterator it = new Iterator() { - private final Iterator iterate = bottled.keyIterator(i.getBegin().getBytes()); + private final Iterator iterate = bottled.keyIterator(i.getBegin()); private Digest next; { if (iterate.hasNext()) { - next = new Digest(algorithm, iterate.next()); + next = iterate.next(); if (next.compareTo(i.getEnd()) > 0) { next = null; // got nothing } @@ -231,11 +230,12 @@ public boolean hasNext() { @Override public Digest next() { var returned = next; + next = null; if (returned == null) { throw new NoSuchElementException(); } if (iterate.hasNext()) { - next = new Digest(algorithm, iterate.next()); + next = iterate.next(); if (next.compareTo(i.getEnd()) > 0) { next = null; // got nothing } @@ -249,24 +249,29 @@ public Digest next() { private void failedMajority(CompletableFuture result, int maxAgree) { result.completeExceptionally(new NoSuchElementException( - "Unable to achieve majority read, max: %s" + " required: %s on: %s".formatted(maxAgree, context.majority(), - member.getId()))); + "Unable to achieve majority read, max: %s required: %s on: %s".formatted(maxAgree, context.majority(), + member.getId()))); } - private boolean inValid(Digest from, int ring) { + private boolean invalid(Digest from, int ring) { if (ring >= context.getRingCount() || ring < 0) { - log.warn("invalid ring {} from {} on: {}", ring, from, member.getId()); + log.warn("invalid ring: {} from: {} on: {}", ring, from, member.getId()); return true; } Member fromMember = context.getMember(from); if (fromMember == null) { return true; } - Member successor = context.ring(ring).successor(fromMember, m -> context.isActive(m.getId())); - if (successor == null) { + Member predecessor = context.ring(ring).predecessor(member, m -> context.isActive(m.getId())); + if (predecessor == null) { return true; } - return !successor.equals(member); + var invalid = !predecessor.equals(fromMember); + if (invalid) { + log.warn("Invalid, not predecessor: {}, ring: {} expected: {} on: {}", from, ring, predecessor.getId(), + member.getId()); + } + return invalid; } private CombinedIntervals keyIntervals() { @@ -302,8 +307,7 @@ private int maxCount(HashMultiset gathered) { private Biff populate(long seed, CombinedIntervals keyIntervals) { BloomFilter.DigestBloomFilter bff = new BloomFilter.DigestBloomFilter(seed, Math.max(bottled.size(), 100), fpr); - bottled.keyIterator(algorithm.getOrigin().getBytes()).forEachRemaining(b -> { - var d = new Digest(algorithm, b); + bottled.keyIterator(algorithm.getOrigin()).forEachRemaining(d -> { if (keyIntervals.test(d)) { bff.add(d); } @@ -313,27 +317,27 @@ private Biff populate(long seed, CombinedIntervals keyIntervals) { private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, Optional futureSailor, Digest hash, Supplier isTimedOut, - RingCommunications.Destination destination, String getAttachment, - Attachment defaultInstance) { + RingCommunications.Destination destination, String op) { if (futureSailor.isEmpty()) { + log.debug("{}: {} empty from: {} on: {}", op, hash, destination.member().getId(), member.getId()); return !isTimedOut.get(); } var content = futureSailor.get(); if (content != null) { - log.trace("bound: {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + log.debug("{}: {} from: {} on: {}", op, hash, destination.member().getId(), member.getId()); gathered.add(content); var max = max(gathered); if (max != null) { tally.set(max.getCount()); if (max.getCount() > context.toleranceLevel()) { result.complete(max.getElement()); - log.debug("Majority: {} achieved: {} on: {}", max.getCount(), hash, member.getId()); + log.debug("Majority {}: {} achieved: {} on: {}", op, max.getCount(), hash, member.getId()); return false; } } return !isTimedOut.get(); } else { - log.debug("Failed {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + log.debug("Failed {}: {} from: {} on: {}", op, hash, destination.member().getId(), member.getId()); return !isTimedOut.get(); } } @@ -343,8 +347,8 @@ private Update reconcile(ReconciliationClient link, Integer ring) { return null; } CombinedIntervals keyIntervals = keyIntervals(); - log.trace("Interval reconciliation on ring: {} with: {} on: {} intervals: {}", ring, link.getMember(), - member.getId(), keyIntervals); + log.trace("Interval reconciliation on ring: {} with: {} intervals: {} on: {} ", ring, link.getMember(), + keyIntervals, member.getId()); return link.reconcile(Intervals.newBuilder() .setRing(ring) .addAllIntervals(keyIntervals.toIntervals()) @@ -381,9 +385,10 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) { if (!started.get()) { return; } - reconcile.execute(this::reconcile, - (futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration)); - + Thread.ofVirtual() + .start(() -> reconcile.execute(this::reconcile, + (futureSailor, destination) -> reconcile(futureSailor, destination, + scheduler, duration))); } /** @@ -399,10 +404,10 @@ private Update.Builder reconcile(Intervals intervals) { .stream() .map(KeyInterval::new) .flatMap(this::bindingsIn) - .peek(d -> log.trace("reconcile digest: {} on: {}", d, member.getId())) + .peek(d -> log.debug("reconcile digest: {} on: {}", d, member.getId())) .filter(d -> !biff.contains(d)) - .peek(d -> log.trace("filtered reconcile digest: {} on: {}", d, member.getId())) - .map(this::binding) + .peek(d -> log.debug("filtered reconcile digest: {} on: {}", d, member.getId())) + .map(d1 -> bottled.get(d1)) .filter(Objects::nonNull) .forEach(update::addBindings); return update; @@ -437,12 +442,38 @@ private void update(List bindings, Digest from) { } } + private boolean write(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, + Optional futureSailor, Digest hash, Supplier isTimedOut, + RingCommunications.Destination destination) { + if (futureSailor.isEmpty()) { + return !isTimedOut.get(); + } + var content = futureSailor.get(); + if (content != null) { + log.debug("Bind: {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + gathered.add(content); + var max = max(gathered); + if (max != null) { + tally.set(max.getCount()); + if (max.getCount() > context.toleranceLevel()) { + result.complete(max.getElement()); + log.debug("Majority Bind : {} achieved: {} on: {}", max.getCount(), hash, member.getId()); + return true; + } + } + return !isTimedOut.get(); + } else { + log.debug("Failed: Bind : {} from: {} on: {}", hash, destination.member().getId(), member.getId()); + return !isTimedOut.get(); + } + } + public interface OpValidator { - boolean validateBind(Bound bound, Token token); + boolean validateBind(Bound bound, byte[] token); - boolean validateGet(byte[] key, Token token); + boolean validateGet(byte[] key, byte[] token); - boolean validateUnbind(byte[] key, Token token); + boolean validateUnbind(byte[] key, byte[] token); } private static class ConsensusState { @@ -484,8 +515,8 @@ private class Reconciled implements ReconciliationService { @Override public Update reconcile(Intervals intervals, Digest from) { var ring = intervals.getRing(); - if (inValid(from, ring)) { - log.trace("Invalid reconcile from: {} ring: {} on: {}", from, ring, member.getId()); + if (invalid(from, ring)) { + log.warn("Invalid reconcile from: {} ring: {} on: {}", from, ring, member.getId()); return Update.getDefaultInstance(); } log.trace("Reconcile from: {} ring: {} on: {}", from, ring, member.getId()); @@ -501,7 +532,8 @@ public Update reconcile(Intervals intervals, Digest from) { @Override public void update(Updating update, Digest from) { var ring = update.getRing(); - if (inValid(from, ring)) { + if (invalid(from, ring)) { + log.warn("Invalid update from: {} ring: {} on: {}", from, ring, member.getId()); return; } LeydenJar.this.update(update.getBindingsList(), from); @@ -512,50 +544,37 @@ private class Borders implements BinderService { @Override public void bind(Binding request, Digest from) { - Member predecessor = context.ring(request.getRing()).predecessor(member); - if (predecessor == null || !from.equals(predecessor.getId())) { - log.debug("Invalid Bind ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), - member, from, request.getRing(), predecessor.getId()); - throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - } var bound = request.getBound(); - if (!validator.validateBind(bound, Token.fromBytes(request.getToken().toByteArray()))) { - log.warn("Invalid Bind Token on {}:{}", context.getId(), member.getId()); + if (!validator.validateBind(bound, request.getToken().toByteArray())) { + log.warn("Invalid Bind Token on: {}", member.getId()); throw new StatusRuntimeException(Status.INVALID_ARGUMENT); } - bottled.put(bound.getKey().toByteArray(), bound); + var hash = algorithm.digest(bound.getKey()); + log.debug("Bind: {} on: {}", hash, member.getId()); + bottled.put(hash, bound); } @Override public Bound get(KeyAndToken request, Digest from) { - Member predecessor = context.ring(request.getRing()).predecessor(member); - if (predecessor == null || !from.equals(predecessor.getId())) { - log.debug("Invalid Get ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), - member.getId(), from, request.getRing(), predecessor.getId()); + if (!validator.validateGet(request.getKey().toByteArray(), request.getToken().toByteArray())) { + log.warn("Invalid Get Token on: {}", member.getId()); throw new StatusRuntimeException(Status.INVALID_ARGUMENT); } - if (!validator.validateGet(request.getKey().toByteArray(), - Token.fromBytes(request.getToken().toByteArray()))) { - log.warn("Invalid Get Token on {}:{}", context.getId(), member.getId()); - throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - } - return bottled.getOrDefault(request.getKey().toByteArray(), Bound.getDefaultInstance()); + var hash = algorithm.digest(request.getKey()); + var bound = bottled.getOrDefault(hash, Bound.getDefaultInstance()); + log.debug("Get: {} bound: {} on: {}", hash, bound != null, member.getId()); + return bound; } @Override public void unbind(KeyAndToken request, Digest from) { - Member predecessor = context.ring(request.getRing()).predecessor(member); - if (predecessor == null || !from.equals(predecessor.getId())) { - log.debug("Invalid Unbind ring on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), - member.getId(), from, request.getRing(), predecessor.getId()); - throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - } - if (!validator.validateUnbind(request.getKey().toByteArray(), - Token.fromBytes(request.getToken().toByteArray()))) { - log.warn("Invalid Unbind Token on {}:{}", context.getId(), member.getId()); + if (!validator.validateUnbind(request.getKey().toByteArray(), request.getToken().toByteArray())) { + log.warn("Invalid Unbind Token on: {}", member.getId()); throw new StatusRuntimeException(Status.INVALID_ARGUMENT); } - bottled.remove(request.getKey().toByteArray()); + var hash = algorithm.digest(request.getKey()); + log.debug("Remove: {} on: {}", hash, member.getId()); + bottled.remove(hash); } } } diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java index f5ab11d4c..a6a0d9c5a 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/binding/Bind.java @@ -42,7 +42,7 @@ public void close() throws IOException { @Override public Bound get(KeyAndToken key) { - return null; + return service.get(key, member.getId()); } @Override @@ -64,7 +64,7 @@ public void bind(Binding binding) { @Override public void close() throws IOException { - channel.shutdown(); + channel.release(); } @Override diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/reconcile/Reckoning.java b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/reconcile/Reckoning.java index ad06a7e87..ac993c63f 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/comm/reconcile/Reckoning.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/comm/reconcile/Reckoning.java @@ -54,7 +54,7 @@ public void update(Updating updating) { @Override public void close() throws IOException { - channel.shutdown(); + channel.release(); } @Override diff --git a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java new file mode 100644 index 000000000..edb44bfbf --- /dev/null +++ b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java @@ -0,0 +1,141 @@ +package com.salesforce.apollo.leyden; + +import com.google.protobuf.ByteString; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.leyden.proto.Binding; +import com.salesforce.apollo.leyden.proto.Bound; +import com.salesforce.apollo.leyden.proto.KeyAndToken; +import com.salesforce.apollo.membership.Context; +import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.membership.SigningMember; +import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; +import com.salesforce.apollo.stereotomy.StereotomyImpl; +import com.salesforce.apollo.stereotomy.mem.MemKERL; +import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import com.salesforce.apollo.utils.Utils; +import org.h2.jdbcx.JdbcConnectionPool; +import org.h2.mvstore.MVStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author hal.hildebrand + **/ +public class LeydenJarTest { + + private static final double PBYZ = 0.1; + protected final TreeMap dhts = new TreeMap<>(); + protected final Map routers = new HashMap<>(); + private String prefix; + private LeydenJar.OpValidator validator; + private Context context; + + @AfterEach + public void after() { + routers.values().forEach(r -> r.close(Duration.ofSeconds(2))); + routers.clear(); + dhts.values().forEach(t -> t.stop()); + dhts.clear(); + } + + @BeforeEach + public void before() throws Exception { + validator = new LeydenJar.OpValidator() { + @Override + public boolean validateBind(Bound bound, byte[] token) { + return true; + } + + @Override + public boolean validateGet(byte[] key, byte[] token) { + return true; + } + + @Override + public boolean validateUnbind(byte[] key, byte[] token) { + return true; + } + }; + prefix = UUID.randomUUID().toString(); + var entropy = SecureRandom.getInstance("SHA1PRNG"); + entropy.setSeed(new byte[] { 6, 6, 6 }); + var kerl = new MemKERL(DigestAlgorithm.DEFAULT); + var stereotomy = new StereotomyImpl(new MemKeyStore(), kerl, entropy); + var cardinality = 5; + var identities = IntStream.range(0, cardinality) + .mapToObj(i -> stereotomy.newIdentifier()) + .collect(Collectors.toMap(controlled -> new ControlledIdentifierMember(controlled), + controlled -> controlled)); + context = Context.newBuilder().setpByz(PBYZ).setCardinality(cardinality).build(); + identities.keySet().forEach(m -> context.activate(m)); + ConcurrentSkipListMap serverMembers = new ConcurrentSkipListMap<>(); + identities.keySet().forEach(member -> instantiate(member, context, serverMembers)); + + System.out.println(); + System.out.println(); + System.out.println(String.format("Cardinality: %s, Prob Byz: %s, Rings: %s Majority: %s", cardinality, PBYZ, + context.getRingCount(), context.majority())); + System.out.println(); + } + + @Test + public void smokin() { + routers.values().forEach(r -> r.start()); + dhts.values().forEach(lj -> lj.start(Duration.ofMillis(10))); + + var source = dhts.firstEntry().getValue(); + var sink = dhts.lastEntry().getValue(); + + var key = ByteString.copyFrom("hello".getBytes()); + var value = ByteString.copyFrom("world".getBytes()); + var binding = Binding.newBuilder().setBound(Bound.newBuilder().setKey(key).setValue(value).build()).build(); + source.bind(binding); + + for (var e : dhts.entrySet()) { + var success = Utils.waitForCondition(10_000, () -> { + Bound bound; + try { + bound = e.getValue().get(KeyAndToken.newBuilder().setKey(key).build()); + } catch (NoSuchElementException nse) { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + } + return false; + } + return bound != null; + }); + assertTrue(success, "Failed for " + e.getKey().getId()); + } + } + + protected void instantiate(SigningMember member, Context context, + ConcurrentSkipListMap serverMembers) { + context.activate(member); + final var url = String.format("jdbc:h2:mem:%s-%s;DB_CLOSE_ON_EXIT=FALSE", member.getId(), prefix); + context.activate(member); + JdbcConnectionPool connectionPool = JdbcConnectionPool.create(url, "", ""); + connectionPool.setMaxConnections(10); + var exec = Executors.newVirtualThreadPerTaskExecutor(); + var router = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(2)); + routers.put(member, router); + dhts.put(member, + new LeydenJar(validator, Duration.ofSeconds(5), member, context, Duration.ofMillis(10), router, 0.0125, + DigestAlgorithm.DEFAULT, new MVStore.Builder().open(), null, null)); + } +} diff --git a/leyden/src/test/resources/logback-test.xml b/leyden/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e8e34bf67 --- /dev/null +++ b/leyden/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + + + + + + %d{mm:ss.SSS} [%thread] %-5level %logger{0} - %msg%n + + + + + + + + + + + + + + + + + + + + diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java index 7635b2641..01cc45e07 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java @@ -6,24 +6,21 @@ */ package com.salesforce.apollo.archipelago; -import static com.salesforce.apollo.cryptography.QualifiedBase64.qb64; - -import java.util.concurrent.TimeUnit; - import com.google.common.base.MoreObjects; import com.salesforce.apollo.archipelago.ServerConnectionCache.ReleasableManagedChannel; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; - -import io.grpc.CallOptions; -import io.grpc.ClientCall; -import io.grpc.ConnectivityState; +import io.grpc.*; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; -import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import static com.salesforce.apollo.cryptography.QualifiedBase64.qb64; public class ManagedServerChannel extends ManagedChannel { + private final static Logger log = LoggerFactory.getLogger(ManagedServerChannel.class); private final Digest context; private final ReleasableManagedChannel delegate; @@ -68,10 +65,10 @@ public boolean isTerminated() { } @Override - public ClientCall newCall(MethodDescriptor methodDescriptor, - CallOptions callOptions) { - return new SimpleForwardingClientCall(delegate.getChannel() - .newCall(methodDescriptor, callOptions)) { + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new SimpleForwardingClientCall( + delegate.getChannel().newCall(methodDescriptor, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { headers.put(Router.METADATA_CONTEXT_KEY, qb64(context)); @@ -97,12 +94,19 @@ public void resetConnectBackoff() { @Override public ManagedChannel shutdown() { - return delegate.getChannel().shutdown(); + if (log.isTraceEnabled()) { + log.trace("Shutting down connection to: {} on: {}", delegate.getMember().getId(), delegate.getFrom(), + new Exception("Shutdown stacktrace")); + } else if (log.isDebugEnabled()) { + log.debug("Shutting down connection to: {} on: {}", delegate.getMember().getId(), delegate.getFrom()); + } + return delegate.shutdown(); } @Override public ManagedChannel shutdownNow() { - return delegate.getChannel().shutdownNow(); + log.trace("Shutting down connection (now) to: {} on: {}", delegate.getMember().getId(), delegate.getFrom()); + return delegate.shutdownNow(); } @Override diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java index e619502ec..b72758a28 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java @@ -37,23 +37,26 @@ */ public class RouterImpl implements Router { - private final static Logger log = LoggerFactory.getLogger(RouterImpl.class); - private final ServerConnectionCache cache; - private final ClientIdentity clientIdentityProvider; - private final Consumer contextRegistration; - private final Member from; - private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); - private final Server server; - private final Map> services = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); + private final static Logger log = LoggerFactory.getLogger(RouterImpl.class); + private final ServerConnectionCache cache; + private final ClientIdentity clientIdentityProvider; + private final Consumer contextRegistration; + private final Member from; + private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); + private final Server server; + private final Map> services = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); + public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {}); + this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { + }); } + public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider, Consumer contextRegistration) { this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build(); - this.cache = cacheBuilder.build(); + this.cache = cacheBuilder.clone().setMember(from.getId()).build(); this.clientIdentityProvider = clientIdentityProvider; this.contextRegistration = contextRegistration; this.from = from; @@ -185,7 +188,7 @@ public class CommonCommunications implements Route private final CreateClientCommunications createFunction; private final Member from; private final Client localLoopback; - private final RoutableService routing; + private final RoutableService routing; public CommonCommunications(Digest context, Member from, RoutableService routing) { this(context, from, routing, m -> vanilla(from), vanilla(from)); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java index daebe5f4e..479d5a79d 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java @@ -6,262 +6,95 @@ */ package com.salesforce.apollo.archipelago; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; - import io.grpc.ManagedChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; /** - * - * Privides a safe mechanism for caching expensive connections to a server. We - * use MTLS, so we want to make good use of the ManagedChannels. Fireflies, by - * its nature, will keep some subset of connections open for gossip use, based - * on a ring. Avalanche samples a random subset of known servers. Ghost has - * access patterns based on hahes. And so on. + * Privides a safe mechanism for caching expensive connections to a server. We use MTLS, so we want to make good use of + * the ManagedChannels. Fireflies, by its nature, will keep some subset of connections open for gossip use, based on a + * ring. Avalanche samples a random subset of known servers. Ghost has access patterns based on hahes. And so on. *

- * This cache allows grpc clients to reuse the underlying ManagedChannel as - * "Bob" inteneded, enforcing some upper limit on the connections used. + * This cache allows grpc clients to reuse the underlying ManagedChannel as "Bob" inteneded, enforcing some upper limit + * on the connections used. *

- * ManagedChannels are never closed while they are open and used by a client - * stub. Connections can be opened up to some total limit, which does not have - * to be the target number of open + idle connections. ManagedChannels in the - * cache keep track of their overall usage count by client stubs - each borrow - * increments this usage count. + * ManagedChannels are never closed while they are open and used by a client stub. Connections can be opened up to some + * total limit, which does not have to be the target number of open + idle connections. ManagedChannels in the cache + * keep track of their overall usage count by client stubs - each borrow increments this usage count. *

- * When ManagedChannels are closed, they are closed in the order of least usage - * count. ManagedChannels may also have a minimum idle duration, to prevent - * cache thrashing. When this duration is > 0, the connection will not be - * closed, potentially overshooting target cache counts - * - * @author hal.hildebrand + * When ManagedChannels are closed, they are closed in the order of least usage count. ManagedChannels may also have a + * minimum idle duration, to prevent cache thrashing. When this duration is > 0, the connection will not be closed, + * potentially overshooting target cache counts * + * @author hal.hildebrand */ public class ServerConnectionCache { - public static class Builder { - private Clock clock = Clock.systemUTC(); - private ServerConnectionFactory factory = null; - private ServerConnectionCacheMetrics metrics; - private Duration minIdle = Duration.ofMillis(100); - private int target = 10; - - public ServerConnectionCache build() { - return new ServerConnectionCache(factory, target, minIdle, clock, metrics); - } - - public Clock getClock() { - return clock; - } - - public ServerConnectionFactory getFactory() { - return factory; - } - - public ServerConnectionCacheMetrics getMetrics() { - return metrics; - } - - public Duration getMinIdle() { - return minIdle; - } - - public int getTarget() { - return target; - } - - public Builder setClock(Clock clock) { - this.clock = clock; - return this; - } - - public Builder setFactory(ServerConnectionFactory factory) { - this.factory = factory; - return this; - } - - public Builder setMetrics(ServerConnectionCacheMetrics metrics) { - this.metrics = metrics; - return this; - } - - public Builder setMinIdle(Duration minIdle) { - this.minIdle = minIdle; - return this; - } - - public Builder setTarget(int target) { - this.target = target; - return this; - } - } - - @FunctionalInterface - public interface CreateClientCommunications { - Client create(ManagedServerChannel channel); - } - - public static interface ServerConnectionCacheMetrics { - - Meter borrowRate(); - - Timer channelOpenDuration(); - - Meter closeConnectionRate(); - - Counter createConnection(); - - Meter failedConnectionRate(); - - Counter failedOpenConnection(); - - Counter openConnections(); - - Meter releaseRate(); - - } - - public interface ServerConnectionFactory { - ManagedChannel connectTo(Member to); - } - - static class ReleasableManagedChannel implements Comparable { - private final AtomicInteger borrowed = new AtomicInteger(); - private final ManagedChannel channel; - private final Instant created; - private volatile Instant lastUsed; - private final Member member; - private final ServerConnectionCache scc; - private final AtomicInteger usageCount = new AtomicInteger(); - - public ReleasableManagedChannel(Member id, ManagedChannel channel, ServerConnectionCache scc) { - this.member = id; - this.channel = channel; - this.scc = scc; - created = Instant.now(scc.clock); - lastUsed = Instant.now(scc.clock); - } - - @Override - public int compareTo(ReleasableManagedChannel o) { - return Integer.compare(usageCount.get(), o.usageCount.get()); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if ((obj == null) || (getClass() != obj.getClass())) - return false; - return member.equals(((ReleasableManagedChannel) obj).member); - } - - public ManagedChannel getChannel() { - return channel; - } - - public Member getMember() { - return member; - } - - @Override - public int hashCode() { - return member.hashCode(); - } - - public boolean isCloseable() { - return lastUsed.plus(scc.minIdle).isBefore(Instant.now(scc.clock)); - } - - public void release() { - scc.release(this); - } - - private boolean decrementBorrow() { - if (borrowed.decrementAndGet() == 0) { - lastUsed = Instant.now(scc.clock); - return true; - } - return false; - } - - private boolean incrementBorrow() { - usageCount.incrementAndGet(); - return borrowed.incrementAndGet() == 1; - } - } - - private final static Logger log = LoggerFactory.getLogger(ServerConnectionCache.class); - - public static Builder newBuilder() { - return new Builder(); - } - - private final Map cache = new HashMap<>(); - private final Clock clock; - private final ServerConnectionFactory factory; - private final ReentrantLock lock = new ReentrantLock(true); - private final ServerConnectionCacheMetrics metrics; - private final Duration minIdle; - private final PriorityQueue queue = new PriorityQueue<>(); - private final int target; - - public ServerConnectionCache(ServerConnectionFactory factory, int target, Duration minIdle, Clock clock, - ServerConnectionCacheMetrics metrics) { + private final static Logger log = LoggerFactory.getLogger( + ServerConnectionCache.class); + private final Map cache = new HashMap<>(); + private final Clock clock; + private final ServerConnectionFactory factory; + private final ReentrantLock lock = new ReentrantLock(true); + private final ServerConnectionCacheMetrics metrics; + private final Duration minIdle; + private final PriorityQueue queue = new PriorityQueue<>(); + private final int target; + private final Digest member; + + public ServerConnectionCache(Digest member, ServerConnectionFactory factory, int target, Duration minIdle, + Clock clock, ServerConnectionCacheMetrics metrics) { + assert member != null; this.factory = factory; this.target = Math.max(target, 1); this.minIdle = minIdle; this.clock = clock; this.metrics = metrics; + this.member = member; + } + + public static Builder newBuilder() { + return new Builder(); } public ManagedServerChannel borrow(Digest context, Member to) { return lock(() -> { if (cache.size() >= target) { - log.debug("Cache target open connections exceeded: {}, opening to {}", target, to); + log.debug("Cache target open connections exceeded: {}, opening to: {} on: {}", target, to.getId(), + member); } - ReleasableManagedChannel connection = cache.computeIfAbsent(to, member -> { - ReleasableManagedChannel conn = new ReleasableManagedChannel(to, factory.connectTo(to), this); + ReleasableManagedChannel connection = cache.computeIfAbsent(to, m -> { + log.debug("Creating new channel to: {} on: {}", to.getId(), m.getId()); + ReleasableManagedChannel conn = new ReleasableManagedChannel(to, factory.connectTo(to), member); if (metrics != null) { metrics.createConnection().inc(); metrics.openConnections().inc(); } return conn; }); - if (connection == null) { - log.warn("Failed to open channel to {}", to); - if (metrics != null) { - metrics.failedOpenConnection().inc(); - metrics.failedConnectionRate().mark(); - } - return null; - } if (connection.incrementBorrow()) { - log.debug("Opened channel to {}, last used: {}", connection.member, connection.lastUsed); + log.debug("Increment borrow to: {} channel to: {} on: {}", connection.borrowed, + connection.member.getId(), member); if (metrics != null) { metrics.borrowRate().mark(); } queue.remove(connection); } - log.trace("Opened channel to {}, borrowed: {}, usage: {}", connection.member, connection.borrowed, - connection.usageCount); + log.trace("Borrowed channel to: {}, borrowed: {}, usage: {} on: {}", connection.member.getId(), + connection.borrowed, connection.usageCount, member); return new ManagedServerChannel(context, connection); }); } @@ -272,7 +105,7 @@ public T borrow(Digest context, Member to, CreateClientCommunications cre public void close() { lock(() -> { - log.info("Closing connection cache"); + log.info("Closing connection cache on: {}", member); for (ReleasableManagedChannel conn : new ArrayList<>(cache.values())) { try { conn.channel.shutdownNow(); @@ -281,7 +114,7 @@ public void close() { metrics.openConnections().dec(); } } catch (Throwable e) { - log.debug("Error closing {}", conn.member); + log.debug("Error closing connection to: {} on: {}", conn.member.getId(), member); } } cache.clear(); @@ -293,7 +126,7 @@ public void close() { public void release(ReleasableManagedChannel connection) { lock(() -> { if (connection.decrementBorrow()) { - log.debug("Releasing connection: {}", connection.member); + log.debug("Releasing connection to: {} on: {}", connection.member.getId(), member); queue.add(connection); if (metrics != null) { metrics.releaseRate().mark(); @@ -309,9 +142,9 @@ private boolean close(ReleasableManagedChannel connection) { try { connection.channel.shutdownNow(); } catch (Throwable t) { - log.debug("Error closing {}", connection.member); + log.debug("Error closing connection to: {} on: {}", connection.member.getId(), connection.member); } - log.debug("{} is closed", connection.member); + log.debug("connection to: {} is closed on: {}", connection.member.getId(), member); cache.remove(connection.member); if (metrics != null) { metrics.openConnections().dec(); @@ -333,7 +166,7 @@ private T lock(Supplier supplier) { } private void manageConnections() { -// log.info("Managing connections: " + cache.size() + " idle: " + queue.size()); + log.debug("Managing connections: {} idle: {} on: {}", cache.size(), queue.size(), member); Iterator connections = queue.iterator(); while (connections.hasNext() && cache.size() > target) { if (close(connections.next())) { @@ -341,4 +174,188 @@ private void manageConnections() { } } } + + @FunctionalInterface + public interface CreateClientCommunications { + Client create(ManagedServerChannel channel); + } + + public interface ServerConnectionCacheMetrics { + + Meter borrowRate(); + + Timer channelOpenDuration(); + + Meter closeConnectionRate(); + + Counter createConnection(); + + Meter failedConnectionRate(); + + Counter failedOpenConnection(); + + Counter openConnections(); + + Meter releaseRate(); + + } + + public interface ServerConnectionFactory { + ManagedChannel connectTo(Member to); + } + + public static class Builder implements Cloneable { + private Clock clock = Clock.systemUTC(); + private ServerConnectionFactory factory = null; + private ServerConnectionCacheMetrics metrics; + private Duration minIdle = Duration.ofMillis(100); + private int target = 10; + private Digest member; + + public ServerConnectionCache build() { + return new ServerConnectionCache(member, factory, target, minIdle, clock, metrics); + } + + @Override + public Builder clone() { + try { + return (Builder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + + public Clock getClock() { + return clock; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public ServerConnectionFactory getFactory() { + return factory; + } + + public Builder setFactory(ServerConnectionFactory factory) { + this.factory = factory; + return this; + } + + public Digest getMember() { + return member; + } + + public Builder setMember(Digest member) { + this.member = member; + return this; + } + + public ServerConnectionCacheMetrics getMetrics() { + return metrics; + } + + public Builder setMetrics(ServerConnectionCacheMetrics metrics) { + this.metrics = metrics; + return this; + } + + public Duration getMinIdle() { + return minIdle; + } + + public Builder setMinIdle(Duration minIdle) { + this.minIdle = minIdle; + return this; + } + + public int getTarget() { + return target; + } + + public Builder setTarget(int target) { + this.target = target; + return this; + } + } + + class ReleasableManagedChannel implements Comparable { + private final AtomicInteger borrowed = new AtomicInteger(); + private final ManagedChannel channel; + private final Instant created; + private final Member member; + private final AtomicInteger usageCount = new AtomicInteger(); + private final Digest from; + private volatile Instant lastUsed; + + public ReleasableManagedChannel(Member member, ManagedChannel channel, Digest from) { + this.member = member; + this.channel = channel; + created = Instant.now(clock); + lastUsed = Instant.now(clock); + this.from = from; + } + + @Override + public int compareTo(ReleasableManagedChannel o) { + return Integer.compare(usageCount.get(), o.usageCount.get()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if ((obj == null) || (getClass() != obj.getClass())) + return false; + return member.equals(((ReleasableManagedChannel) obj).member); + } + + public ManagedChannel getChannel() { + return channel; + } + + public Digest getFrom() { + return from; + } + + public Member getMember() { + return member; + } + + @Override + public int hashCode() { + return member.hashCode(); + } + + public boolean isCloseable() { + return lastUsed.plus(minIdle).isBefore(Instant.now(clock)); + } + + public void release() { + log.trace("Release connection to: {} on: {}", getMember().getId(), getFrom()); + ServerConnectionCache.this.release(this); + } + + public ManagedChannel shutdown() { + throw new IllegalStateException("Should not be called"); + } + + public ManagedChannel shutdownNow() { + throw new IllegalStateException("Should not be called"); + } + + private boolean decrementBorrow() { + if (borrowed.decrementAndGet() == 0) { + lastUsed = Instant.now(clock); + return true; + } + return false; + } + + private boolean incrementBorrow() { + usageCount.incrementAndGet(); + return borrowed.incrementAndGet() == 1; + } + } } diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java index 4047d3126..ab3ffec5c 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java @@ -94,10 +94,6 @@ public String toString() { return "RingCommunications [" + context.getId() + ":" + member.getId() + ":" + currentIndex + "]"; } - protected Logger getLog() { - return log; - } - @SuppressWarnings("unchecked") List> calculateTraversal(Digest digest) { var traversal = new ArrayList>(); @@ -145,6 +141,10 @@ final RingCommunications.Destination next(Digest digest) { } } + protected Logger getLog() { + return log; + } + private void execute(BiFunction round, SyncHandler handler, Destination destination) { if (destination.link == null) { @@ -154,7 +154,7 @@ private void execute(BiFunction round, SyncHandler noDuplicates() { return this; } - @Override - protected Logger getLog() { - return log; - } - private void internalIterate(Digest digest, Runnable onMajority, BiFunction round, Runnable failedMajority, ResultConsumer handler, Consumer onComplete, AtomicInteger tally, Set traversed) { @@ -114,47 +109,48 @@ private void internalIterate(Digest digest, Runnable onMajority, BiFunction< } var next = next(digest); - log.trace("Iteration: {} tally: {} for: {} on: {} ring: {} complete: false on: {}", iteration(), tally.get(), - digest, context.getId(), next.ring(), member.getId()); + log.trace("Iteration: {} tally: {} for digest: {} on: {} ring: {} complete: false on: {}", iteration(), + tally.get(), digest, context.getId(), next.ring(), member.getId()); if (next.link() == null) { - log.trace("No successor found of: {} on: {} iteration: {} traversed: {} ring: {} on: {}", digest, + log.trace("No successor found for digest: {} on: {} iteration: {} traversed: {} ring: {} on: {}", digest, context.getId(), iteration(), traversed, context.ring(currentIndex).stream().toList(), member.getId()); final boolean allow = handler.handle(tally, Optional.empty(), next); allowed.accept(allow); if (allow) { - log.trace("Finished on iteration: {} proceeding on: {} for: {} tally: {} on: {}", iteration(), digest, - context.getId(), tally.get(), member.getId()); + log.trace("Finished on iteration: {} proceeding on: {} for digest: {} tally: {} on: {}", iteration(), + digest, context.getId(), tally.get(), member.getId()); schedule(proceed); } else { - log.trace("Completed on iteration: {} on: {} for: {} for: {} tally: {} on: {}", iteration(), digest, - context.getId(), tally.get(), member.getId()); + log.trace("Completed on iteration: {} on: {} for digest: {} for: {} tally: {} on: {}", iteration(), + digest, context.getId(), tally.get(), member.getId()); } return; } try (Comm link = next.link()) { - log.trace("Continuation on iteration: {} tally: {} for: {} on: {} ring: {} to: {} on: {}", iteration(), - tally.get(), digest, context.getId(), next.ring(), + log.trace("Continuation on iteration: {} tally: {} for digest: {} on: {} ring: {} to: {} on: {}", + iteration(), tally.get(), digest, context.getId(), next.ring(), link.getMember() == null ? null : link.getMember().getId(), member.getId()); Q result = null; try { result = round.apply(link, next.ring()); } catch (Throwable e) { - log.trace("Exception in round for: {} on: {} iteration: {} from: {} on: {}", digest, context.getId(), - iteration(), link.getMember() == null ? null : link.getMember().getId(), member.getId()); + log.trace("Exception in round for digest: {} context: {} iteration: {} from: {} on: {}", digest, + context.getId(), iteration(), link.getMember() == null ? null : link.getMember().getId(), + member.getId(), e); } if (result == null) { - log.trace("No asynchronous response for: {} on: {} iteration: {} from: {} on: {}", digest, + log.trace("No asynchronous response for digest: {} on: {} iteration: {} from: {} on: {}", digest, context.getId(), iteration(), link.getMember() == null ? null : link.getMember().getId(), member.getId()); final boolean allow = handler.handle(tally, Optional.empty(), next); allowed.accept(allow); if (allow) { - log.trace("Proceeding on iteration: {} on: {} for: {} tally: {} on: {}", iteration(), digest, + log.trace("Proceeding on iteration: {} on: {} for digest: {} tally: {} on: {}", iteration(), digest, context.getId(), tally.get(), member.getId()); schedule(proceed); } else { - log.trace("Completed on iteration: {} on: {} for: {} tally: {} on: {}", iteration(), digest, + log.trace("Completed on iteration: {} on: {} for digest: {} tally: {} on: {}", iteration(), digest, context.getId(), tally.get(), member.getId()); } return; @@ -162,11 +158,11 @@ private void internalIterate(Digest digest, Runnable onMajority, BiFunction< final var allow = handler.handle(tally, Optional.of(result), next); allowed.accept(allow); if (allow) { - log.trace("Scheduling next iteration: {} on: {} for: {} tally: {} on: {}", iteration(), digest, + log.trace("Scheduling next iteration: {} on: {} for digest: {} tally: {} on: {}", iteration(), digest, context.getId(), tally.get(), member.getId()); schedule(proceed); } else { - log.trace("Finished on iteration: {} on: {} for: {} tally: {} on: {}", iteration(), digest, + log.trace("Finished on iteration: {} on: {} for digest: {} tally: {} on: {}", iteration(), digest, context.getId(), tally.get(), member.getId()); } } catch (IOException e) { @@ -179,16 +175,16 @@ private void proceed(Digest key, final boolean allow, Runnable onMajority, Runna final var current = currentIndex; if (!finalIteration) { log.trace( - "Determining: {} continuation of: {} for: {} tally: {} majority: {} final itr: {} allow: {} on: {}", + "Determining: {} continuation of: {} for digest: {} tally: {} majority: {} final itr: {} allow: {} on: {}", current, key, context.getId(), tally.get(), context.majority(), finalIteration, allow, member.getId()); } if (finalIteration && allow) { - log.trace("Completing iteration: {} of: {} for: {} tally: {} on: {}", iteration(), key, context.getId(), - tally.get(), member.getId()); + log.trace("Completing iteration: {} of: {} for digest: {} tally: {} on: {}", iteration(), key, + context.getId(), tally.get(), member.getId()); if (failedMajority != null && !majorityFailed) { if (tally.get() < context.majority()) { majorityFailed = true; - log.debug("Failed to obtain majority of: {} for: {} tally: {} required: {} on: {}", key, + log.debug("Failed to obtain majority of: {} for digest: {} tally: {} required: {} on: {}", key, context.getId(), tally.get(), context.majority(), member.getId()); failedMajority.run(); } @@ -197,13 +193,14 @@ private void proceed(Digest key, final boolean allow, Runnable onMajority, Runna onComplete.accept(tally.get()); } } else if (!allow) { - log.trace("Termination of: {} for: {} tally: {} on: {}", key, context.getId(), tally.get(), member.getId()); + log.trace("Termination of: {} for digest: {} tally: {} on: {}", key, context.getId(), tally.get(), + member.getId()); } else { if (onMajority != null && !majoritySucceed) { if (tally.get() >= context.majority()) { majoritySucceed = true; - log.debug("Obtained: {} majority of: {} for: {} tally: {} on: {}", current, key, context.getId(), - tally.get(), member.getId()); + log.debug("Obtained: {} majority of: {} for digest: {} tally: {} on: {}", current, key, + context.getId(), tally.get(), member.getId()); onMajority.run(); } }