Skip to content

Commit

Permalink
Do not use Any for RBC content (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer authored Nov 15, 2023
1 parent 3449cca commit 9189379
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 51 deletions.
12 changes: 6 additions & 6 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public CHOAM(Parameters params) {
combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
new MessageAdapter(any -> true,
(Function<Any, Digest>) any -> signatureHash(any),
(Function<Any, List<Digest>>) any -> Collections.emptyList(),
(Function<ByteString, Digest>) any -> signatureHash(any),
(Function<ByteString, List<Digest>>) any -> Collections.emptyList(),
(m, any) -> any,
(Function<AgedMessageOrBuilder, Any>) am -> am.getContent()));
(Function<AgedMessageOrBuilder, ByteString>) am -> am.getContent()));
linear = Executors.newSingleThreadExecutor(
Thread.ofVirtual().name("Linear " + params.member().getId()).factory());
combine.registerHandler((ctx, messages) -> {
Expand Down Expand Up @@ -449,7 +449,7 @@ private void combine(List<Msg> messages) {
private void combine(Msg m) {
CertifiedBlock block;
try {
block = m.content().unpack(CertifiedBlock.class);
block = CertifiedBlock.parseFrom(m.content());
} catch (InvalidProtocolBufferException e) {
log.debug("unable to parse block content from {} on: {}", m.source(), params.member().getId());
return;
Expand Down Expand Up @@ -768,10 +768,10 @@ private Function<SubmittedTransaction, SubmitResult> service() {
};
}

private Digest signatureHash(Any any) {
private Digest signatureHash(ByteString any) {
CertifiedBlock cb;
try {
cb = any.unpack(CertifiedBlock.class);
cb = CertifiedBlock.parseFrom(any);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException(e);
}
Expand Down
4 changes: 2 additions & 2 deletions grpc/src/main/proto/messaging.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ message ReconcileContext {

message AgedMessage {
int32 age = 1;
google.protobuf.Any content = 3;
bytes content = 3;
}

message DefaultMessage {
utils.Digeste source = 1;
int32 nonce = 2;
google.protobuf.Any content = 3;
bytes content = 3;
}

message SignedDefaultMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.salesfoce.apollo.messaging.proto.*;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.crypto.JohnHancock;
Expand All @@ -24,8 +26,6 @@
import com.salesforce.apollo.membership.messaging.rbc.comms.ReliableBroadcast;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,10 +77,10 @@ public ReliableBroadcaster(Context<Member> context, SigningMember member, Parame
}

public static MessageAdapter defaultMessageAdapter(Context<Member> context, DigestAlgorithm algo) {
final Predicate<Any> verifier = any -> {
final Predicate<ByteString> verifier = any -> {
SignedDefaultMessage sdm;
try {
sdm = any.unpack(SignedDefaultMessage.class);
sdm = SignedDefaultMessage.parseFrom(any);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
Expand All @@ -91,34 +91,37 @@ public static MessageAdapter defaultMessageAdapter(Context<Member> context, Dige
}
return member.verify(JohnHancock.from(sdm.getSignature()), dm.toByteString());
};
final Function<Any, Digest> hasher = any -> {
final Function<ByteString, Digest> hasher = any -> {
try {
return JohnHancock.from(any.unpack(SignedDefaultMessage.class).getSignature()).toDigest(algo);
return JohnHancock.from(SignedDefaultMessage.parseFrom(any).getSignature()).toDigest(algo);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
};
Function<Any, List<Digest>> source = any -> {
Function<ByteString, List<Digest>> source = any -> {
try {
return Collections.singletonList(
Digest.from(any.unpack(SignedDefaultMessage.class).getContent().getSource()));
Digest.from(SignedDefaultMessage.parseFrom(any).getContent().getSource()));
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
};
var sn = new AtomicInteger();
BiFunction<SigningMember, Any, Any> wrapper = (m, any) -> {
BiFunction<SigningMember, ByteString, ByteString> wrapper = (m, any) -> {
final var dm = DefaultMessage.newBuilder()
.setNonce(sn.incrementAndGet())
.setSource(m.getId().toDigeste())
.setContent(any)
.build();
return Any.pack(
SignedDefaultMessage.newBuilder().setContent(dm).setSignature(m.sign(dm.toByteString()).toSig()).build());
return SignedDefaultMessage.newBuilder()
.setContent(dm)
.setSignature(m.sign(dm.toByteString()).toSig())
.build()
.toByteString();
};
Function<AgedMessageOrBuilder, Any> extractor = am -> {
Function<AgedMessageOrBuilder, ByteString> extractor = am -> {
try {
return am.getContent().unpack(SignedDefaultMessage.class).getContent().getContent();
return SignedDefaultMessage.parseFrom(am.getContent()).getContent().getContent();
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
Expand Down Expand Up @@ -147,7 +150,7 @@ public void publish(Message message, boolean notifyLocal) {
if (!started.get()) {
return;
}
AgedMessage m = buffer.send(Any.pack(message), member);
AgedMessage m = buffer.send(message.toByteString(), member);
if (notifyLocal) {
deliver(Collections.singletonList(
new Msg(Collections.singletonList(member.getId()), adapter.extractor.apply(m),
Expand Down Expand Up @@ -214,8 +217,8 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) {
if (!started.get()) {
return null;
}
log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), link.getMember().getId(),
ring, member.getId());
log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(),
link.getMember().getId(), ring, member.getId());
try {
return link.gossip(
MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build());
Expand Down Expand Up @@ -282,12 +285,13 @@ public interface MessageHandler {
public record HashedContent(Digest hash, ByteString content) {
}

public record MessageAdapter(Predicate<Any> verifier, Function<Any, Digest> hasher,
Function<Any, List<Digest>> source, BiFunction<SigningMember, Any, Any> wrapper,
Function<AgedMessageOrBuilder, Any> extractor) {
public record MessageAdapter(Predicate<ByteString> verifier, Function<ByteString, Digest> hasher,
Function<ByteString, List<Digest>> source,
BiFunction<SigningMember, ByteString, ByteString> wrapper,
Function<AgedMessageOrBuilder, ByteString> extractor) {
}

public record Msg(List<Digest> source, Any content, Digest hash) {
public record Msg(List<Digest> source, ByteString content, Digest hash) {
}

public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate,
Expand Down Expand Up @@ -456,7 +460,7 @@ public int round() {
return round.get();
}

public AgedMessage send(Any msg, SigningMember member) {
public AgedMessage send(ByteString msg, SigningMember member) {
AgedMessage.Builder message = AgedMessage.newBuilder().setContent(adapter.wrapper.apply(member, msg));
var hash = adapter.hasher.apply(message.getContent());
state s = new state(hash, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -54,13 +53,14 @@
*/
public class RbcTest {

private static final Parameters.Builder parameters = Parameters.newBuilder()
.setMaxMessages(100)
.setFalsePositiveRate(0.0125)
.setBufferSize(500);
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;
private static final Parameters.Builder parameters = Parameters.newBuilder()
.setMaxMessages(100)
.setFalsePositiveRate(0.0125)
.setBufferSize(500);
final AtomicReference<CountDownLatch> round = new AtomicReference<>();
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;

@AfterEach
public void after() {
Expand All @@ -75,10 +75,14 @@ public void broadcast() throws Exception {
MetricRegistry registry = new MetricRegistry();

var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[]{6, 6, 6});
entropy.setSeed(new byte[] { 6, 6, 6 });
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);

List<SigningMember> members = IntStream.range(0, 50).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList();
List<SigningMember> members = IntStream.range(0, 50)
.mapToObj(i -> stereotomy.newIdentifier())
.map(cpk -> new ControlledIdentifierMember(cpk))
.map(e -> (SigningMember) e)
.toList();

Context<Member> context = Context.newBuilder().setCardinality(members.size()).build();
RbcMetrics metrics = new RbcMetricsImpl(context.getId(), "test", registry);
Expand All @@ -87,10 +91,11 @@ public void broadcast() throws Exception {
final var prefix = UUID.randomUUID().toString();
final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT);
messengers = members.stream().map(node -> {
var comms = new LocalServer(prefix, node).router(
ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(new ServerConnectionCacheMetricsImpl(registry)));
var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
registry)));
communications.add(comms);
comms.start();
return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication);
Expand Down Expand Up @@ -134,17 +139,16 @@ public void broadcast() throws Exception {
System.out.println();

ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
final AtomicReference<CountDownLatch> round = new AtomicReference<>();

class Receiver implements MessageHandler {
final Set<Digest> counted = Collections.newSetFromMap(new ConcurrentHashMap<>());
final Set<Digest> counted = Collections.newSetFromMap(new ConcurrentHashMap<>());
final AtomicInteger current;
final Digest memberId;
final Digest memberId;

Receiver(Digest memberId, int cardinality, AtomicInteger current) {
this.current = current;
Expand All @@ -157,7 +161,7 @@ public void message(Digest context, List<Msg> messages) {
assert m.source() != null : "null member";
ByteBuffer buf;
try {
buf = m.content().unpack(ByteMessage.class).getContents().asReadOnlyByteBuffer();
buf = ByteMessage.parseFrom(m.content()).getContents().asReadOnlyByteBuffer();
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException(e);
}
Expand Down

0 comments on commit 9189379

Please sign in to comment.