diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 3bbd25ecd..afd60e771 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -100,10 +100,10 @@ public CHOAM(Parameters params) { combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(), params.metrics() == null ? null : params.metrics().getCombineMetrics(), new MessageAdapter(any -> true, - (Function) any -> signatureHash(any), - (Function>) any -> Collections.emptyList(), + (Function) any -> signatureHash(any), + (Function>) any -> Collections.emptyList(), (m, any) -> any, - (Function) am -> am.getContent())); + (Function) am -> am.getContent())); linear = Executors.newSingleThreadExecutor( Thread.ofVirtual().name("Linear " + params.member().getId()).factory()); combine.registerHandler((ctx, messages) -> { @@ -449,7 +449,7 @@ private void combine(List messages) { private void combine(Msg m) { CertifiedBlock block; try { - block = m.content().unpack(CertifiedBlock.class); + block = CertifiedBlock.parseFrom(m.content()); } catch (InvalidProtocolBufferException e) { log.debug("unable to parse block content from {} on: {}", m.source(), params.member().getId()); return; @@ -768,10 +768,10 @@ private Function service() { }; } - private Digest signatureHash(Any any) { + private Digest signatureHash(ByteString any) { CertifiedBlock cb; try { - cb = any.unpack(CertifiedBlock.class); + cb = CertifiedBlock.parseFrom(any); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException(e); } diff --git a/grpc/src/main/proto/messaging.proto b/grpc/src/main/proto/messaging.proto index ea8ebc725..c8898cf2c 100644 --- a/grpc/src/main/proto/messaging.proto +++ b/grpc/src/main/proto/messaging.proto @@ -36,13 +36,13 @@ message ReconcileContext { message AgedMessage { int32 age = 1; - google.protobuf.Any content = 3; + bytes content = 3; } message DefaultMessage { utils.Digeste source = 1; int32 nonce = 2; - google.protobuf.Any content = 3; + bytes content = 3; } message SignedDefaultMessage { diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 71c04ee8b..18c518686 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -14,6 +14,8 @@ import com.salesfoce.apollo.messaging.proto.*; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.JohnHancock; @@ -24,8 +26,6 @@ import com.salesforce.apollo.membership.messaging.rbc.comms.ReliableBroadcast; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; -import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,10 +77,10 @@ public ReliableBroadcaster(Context context, SigningMember member, Parame } public static MessageAdapter defaultMessageAdapter(Context context, DigestAlgorithm algo) { - final Predicate verifier = any -> { + final Predicate verifier = any -> { SignedDefaultMessage sdm; try { - sdm = any.unpack(SignedDefaultMessage.class); + sdm = SignedDefaultMessage.parseFrom(any); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Cannot unwrap", e); } @@ -91,34 +91,37 @@ public static MessageAdapter defaultMessageAdapter(Context context, Dige } return member.verify(JohnHancock.from(sdm.getSignature()), dm.toByteString()); }; - final Function hasher = any -> { + final Function hasher = any -> { try { - return JohnHancock.from(any.unpack(SignedDefaultMessage.class).getSignature()).toDigest(algo); + return JohnHancock.from(SignedDefaultMessage.parseFrom(any).getSignature()).toDigest(algo); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Cannot unwrap", e); } }; - Function> source = any -> { + Function> source = any -> { try { return Collections.singletonList( - Digest.from(any.unpack(SignedDefaultMessage.class).getContent().getSource())); + Digest.from(SignedDefaultMessage.parseFrom(any).getContent().getSource())); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Cannot unwrap", e); } }; var sn = new AtomicInteger(); - BiFunction wrapper = (m, any) -> { + BiFunction wrapper = (m, any) -> { final var dm = DefaultMessage.newBuilder() .setNonce(sn.incrementAndGet()) .setSource(m.getId().toDigeste()) .setContent(any) .build(); - return Any.pack( - SignedDefaultMessage.newBuilder().setContent(dm).setSignature(m.sign(dm.toByteString()).toSig()).build()); + return SignedDefaultMessage.newBuilder() + .setContent(dm) + .setSignature(m.sign(dm.toByteString()).toSig()) + .build() + .toByteString(); }; - Function extractor = am -> { + Function extractor = am -> { try { - return am.getContent().unpack(SignedDefaultMessage.class).getContent().getContent(); + return SignedDefaultMessage.parseFrom(am.getContent()).getContent().getContent(); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Cannot unwrap", e); } @@ -147,7 +150,7 @@ public void publish(Message message, boolean notifyLocal) { if (!started.get()) { return; } - AgedMessage m = buffer.send(Any.pack(message), member); + AgedMessage m = buffer.send(message.toByteString(), member); if (notifyLocal) { deliver(Collections.singletonList( new Msg(Collections.singletonList(member.getId()), adapter.extractor.apply(m), @@ -214,8 +217,8 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) { if (!started.get()) { return null; } - log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), link.getMember().getId(), - ring, member.getId()); + log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), + link.getMember().getId(), ring, member.getId()); try { return link.gossip( MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build()); @@ -282,12 +285,13 @@ public interface MessageHandler { public record HashedContent(Digest hash, ByteString content) { } - public record MessageAdapter(Predicate verifier, Function hasher, - Function> source, BiFunction wrapper, - Function extractor) { + public record MessageAdapter(Predicate verifier, Function hasher, + Function> source, + BiFunction wrapper, + Function extractor) { } - public record Msg(List source, Any content, Digest hash) { + public record Msg(List source, ByteString content, Digest hash) { } public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate, @@ -456,7 +460,7 @@ public int round() { return round.get(); } - public AgedMessage send(Any msg, SigningMember member) { + public AgedMessage send(ByteString msg, SigningMember member) { AgedMessage.Builder message = AgedMessage.newBuilder().setContent(adapter.wrapper.apply(member, msg)); var hash = adapter.hasher.apply(message.getContent()); state s = new state(hash, message); diff --git a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java index 6c643fdd7..da07127f2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java @@ -40,7 +40,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -54,13 +53,14 @@ */ public class RbcTest { - private static final Parameters.Builder parameters = Parameters.newBuilder() - .setMaxMessages(100) - .setFalsePositiveRate(0.0125) - .setBufferSize(500); - private final List communications = new ArrayList<>(); - private final AtomicInteger totalReceived = new AtomicInteger(0); - private List messengers; + private static final Parameters.Builder parameters = Parameters.newBuilder() + .setMaxMessages(100) + .setFalsePositiveRate(0.0125) + .setBufferSize(500); + final AtomicReference round = new AtomicReference<>(); + private final List communications = new ArrayList<>(); + private final AtomicInteger totalReceived = new AtomicInteger(0); + private List messengers; @AfterEach public void after() { @@ -75,10 +75,14 @@ public void broadcast() throws Exception { MetricRegistry registry = new MetricRegistry(); var entropy = SecureRandom.getInstance("SHA1PRNG"); - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); - List members = IntStream.range(0, 50).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList(); + List members = IntStream.range(0, 50) + .mapToObj(i -> stereotomy.newIdentifier()) + .map(cpk -> new ControlledIdentifierMember(cpk)) + .map(e -> (SigningMember) e) + .toList(); Context context = Context.newBuilder().setCardinality(members.size()).build(); RbcMetrics metrics = new RbcMetricsImpl(context.getId(), "test", registry); @@ -87,10 +91,11 @@ public void broadcast() throws Exception { final var prefix = UUID.randomUUID().toString(); final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT); messengers = members.stream().map(node -> { - var comms = new LocalServer(prefix, node).router( - ServerConnectionCache.newBuilder() - .setTarget(30) - .setMetrics(new ServerConnectionCacheMetricsImpl(registry))); + var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder() + .setTarget(30) + .setMetrics( + new ServerConnectionCacheMetricsImpl( + registry))); communications.add(comms); comms.start(); return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication); @@ -134,17 +139,16 @@ public void broadcast() throws Exception { System.out.println(); ConsoleReporter.forRegistry(registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build() - .report(); + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + .report(); } - final AtomicReference round = new AtomicReference<>(); class Receiver implements MessageHandler { - final Set counted = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final Set counted = Collections.newSetFromMap(new ConcurrentHashMap<>()); final AtomicInteger current; - final Digest memberId; + final Digest memberId; Receiver(Digest memberId, int cardinality, AtomicInteger current) { this.current = current; @@ -157,7 +161,7 @@ public void message(Digest context, List messages) { assert m.source() != null : "null member"; ByteBuffer buf; try { - buf = m.content().unpack(ByteMessage.class).getContents().asReadOnlyByteBuffer(); + buf = ByteMessage.parseFrom(m.content()).getContents().asReadOnlyByteBuffer(); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException(e); }