Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

~Any #160

Merged
merged 1 commit into from
Nov 15, 2023
Merged

~Any #160

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public CHOAM(Parameters params) {
combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
new MessageAdapter(any -> true,
(Function<Any, Digest>) any -> signatureHash(any),
(Function<Any, List<Digest>>) any -> Collections.emptyList(),
(Function<ByteString, Digest>) any -> signatureHash(any),
(Function<ByteString, List<Digest>>) any -> Collections.emptyList(),
(m, any) -> any,
(Function<AgedMessageOrBuilder, Any>) am -> am.getContent()));
(Function<AgedMessageOrBuilder, ByteString>) am -> am.getContent()));
linear = Executors.newSingleThreadExecutor(
Thread.ofVirtual().name("Linear " + params.member().getId()).factory());
combine.registerHandler((ctx, messages) -> {
Expand Down Expand Up @@ -449,7 +449,7 @@ private void combine(List<Msg> messages) {
private void combine(Msg m) {
CertifiedBlock block;
try {
block = m.content().unpack(CertifiedBlock.class);
block = CertifiedBlock.parseFrom(m.content());
} catch (InvalidProtocolBufferException e) {
log.debug("unable to parse block content from {} on: {}", m.source(), params.member().getId());
return;
Expand Down Expand Up @@ -768,10 +768,10 @@ private Function<SubmittedTransaction, SubmitResult> service() {
};
}

private Digest signatureHash(Any any) {
private Digest signatureHash(ByteString any) {
CertifiedBlock cb;
try {
cb = any.unpack(CertifiedBlock.class);
cb = CertifiedBlock.parseFrom(any);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException(e);
}
Expand Down
4 changes: 2 additions & 2 deletions grpc/src/main/proto/messaging.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ message ReconcileContext {

message AgedMessage {
int32 age = 1;
google.protobuf.Any content = 3;
bytes content = 3;
}

message DefaultMessage {
utils.Digeste source = 1;
int32 nonce = 2;
google.protobuf.Any content = 3;
bytes content = 3;
}

message SignedDefaultMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.salesfoce.apollo.messaging.proto.*;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.crypto.JohnHancock;
Expand All @@ -24,8 +26,6 @@
import com.salesforce.apollo.membership.messaging.rbc.comms.ReliableBroadcast;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,10 +77,10 @@ public ReliableBroadcaster(Context<Member> context, SigningMember member, Parame
}

public static MessageAdapter defaultMessageAdapter(Context<Member> context, DigestAlgorithm algo) {
final Predicate<Any> verifier = any -> {
final Predicate<ByteString> verifier = any -> {
SignedDefaultMessage sdm;
try {
sdm = any.unpack(SignedDefaultMessage.class);
sdm = SignedDefaultMessage.parseFrom(any);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
Expand All @@ -91,34 +91,37 @@ public static MessageAdapter defaultMessageAdapter(Context<Member> context, Dige
}
return member.verify(JohnHancock.from(sdm.getSignature()), dm.toByteString());
};
final Function<Any, Digest> hasher = any -> {
final Function<ByteString, Digest> hasher = any -> {
try {
return JohnHancock.from(any.unpack(SignedDefaultMessage.class).getSignature()).toDigest(algo);
return JohnHancock.from(SignedDefaultMessage.parseFrom(any).getSignature()).toDigest(algo);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
};
Function<Any, List<Digest>> source = any -> {
Function<ByteString, List<Digest>> source = any -> {
try {
return Collections.singletonList(
Digest.from(any.unpack(SignedDefaultMessage.class).getContent().getSource()));
Digest.from(SignedDefaultMessage.parseFrom(any).getContent().getSource()));
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
};
var sn = new AtomicInteger();
BiFunction<SigningMember, Any, Any> wrapper = (m, any) -> {
BiFunction<SigningMember, ByteString, ByteString> wrapper = (m, any) -> {
final var dm = DefaultMessage.newBuilder()
.setNonce(sn.incrementAndGet())
.setSource(m.getId().toDigeste())
.setContent(any)
.build();
return Any.pack(
SignedDefaultMessage.newBuilder().setContent(dm).setSignature(m.sign(dm.toByteString()).toSig()).build());
return SignedDefaultMessage.newBuilder()
.setContent(dm)
.setSignature(m.sign(dm.toByteString()).toSig())
.build()
.toByteString();
};
Function<AgedMessageOrBuilder, Any> extractor = am -> {
Function<AgedMessageOrBuilder, ByteString> extractor = am -> {
try {
return am.getContent().unpack(SignedDefaultMessage.class).getContent().getContent();
return SignedDefaultMessage.parseFrom(am.getContent()).getContent().getContent();
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Cannot unwrap", e);
}
Expand Down Expand Up @@ -147,7 +150,7 @@ public void publish(Message message, boolean notifyLocal) {
if (!started.get()) {
return;
}
AgedMessage m = buffer.send(Any.pack(message), member);
AgedMessage m = buffer.send(message.toByteString(), member);
if (notifyLocal) {
deliver(Collections.singletonList(
new Msg(Collections.singletonList(member.getId()), adapter.extractor.apply(m),
Expand Down Expand Up @@ -214,8 +217,8 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) {
if (!started.get()) {
return null;
}
log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), link.getMember().getId(),
ring, member.getId());
log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(),
link.getMember().getId(), ring, member.getId());
try {
return link.gossip(
MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build());
Expand Down Expand Up @@ -282,12 +285,13 @@ public interface MessageHandler {
public record HashedContent(Digest hash, ByteString content) {
}

public record MessageAdapter(Predicate<Any> verifier, Function<Any, Digest> hasher,
Function<Any, List<Digest>> source, BiFunction<SigningMember, Any, Any> wrapper,
Function<AgedMessageOrBuilder, Any> extractor) {
public record MessageAdapter(Predicate<ByteString> verifier, Function<ByteString, Digest> hasher,
Function<ByteString, List<Digest>> source,
BiFunction<SigningMember, ByteString, ByteString> wrapper,
Function<AgedMessageOrBuilder, ByteString> extractor) {
}

public record Msg(List<Digest> source, Any content, Digest hash) {
public record Msg(List<Digest> source, ByteString content, Digest hash) {
}

public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate,
Expand Down Expand Up @@ -456,7 +460,7 @@ public int round() {
return round.get();
}

public AgedMessage send(Any msg, SigningMember member) {
public AgedMessage send(ByteString msg, SigningMember member) {
AgedMessage.Builder message = AgedMessage.newBuilder().setContent(adapter.wrapper.apply(member, msg));
var hash = adapter.hasher.apply(message.getContent());
state s = new state(hash, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -54,13 +53,14 @@
*/
public class RbcTest {

private static final Parameters.Builder parameters = Parameters.newBuilder()
.setMaxMessages(100)
.setFalsePositiveRate(0.0125)
.setBufferSize(500);
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;
private static final Parameters.Builder parameters = Parameters.newBuilder()
.setMaxMessages(100)
.setFalsePositiveRate(0.0125)
.setBufferSize(500);
final AtomicReference<CountDownLatch> round = new AtomicReference<>();
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;

@AfterEach
public void after() {
Expand All @@ -75,10 +75,14 @@ public void broadcast() throws Exception {
MetricRegistry registry = new MetricRegistry();

var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[]{6, 6, 6});
entropy.setSeed(new byte[] { 6, 6, 6 });
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);

List<SigningMember> members = IntStream.range(0, 50).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList();
List<SigningMember> members = IntStream.range(0, 50)
.mapToObj(i -> stereotomy.newIdentifier())
.map(cpk -> new ControlledIdentifierMember(cpk))
.map(e -> (SigningMember) e)
.toList();

Context<Member> context = Context.newBuilder().setCardinality(members.size()).build();
RbcMetrics metrics = new RbcMetricsImpl(context.getId(), "test", registry);
Expand All @@ -87,10 +91,11 @@ public void broadcast() throws Exception {
final var prefix = UUID.randomUUID().toString();
final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT);
messengers = members.stream().map(node -> {
var comms = new LocalServer(prefix, node).router(
ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(new ServerConnectionCacheMetricsImpl(registry)));
var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
registry)));
communications.add(comms);
comms.start();
return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication);
Expand Down Expand Up @@ -134,17 +139,16 @@ public void broadcast() throws Exception {
System.out.println();

ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
.report();
}
final AtomicReference<CountDownLatch> round = new AtomicReference<>();

class Receiver implements MessageHandler {
final Set<Digest> counted = Collections.newSetFromMap(new ConcurrentHashMap<>());
final Set<Digest> counted = Collections.newSetFromMap(new ConcurrentHashMap<>());
final AtomicInteger current;
final Digest memberId;
final Digest memberId;

Receiver(Digest memberId, int cardinality, AtomicInteger current) {
this.current = current;
Expand All @@ -157,7 +161,7 @@ public void message(Digest context, List<Msg> messages) {
assert m.source() != null : "null member";
ByteBuffer buf;
try {
buf = m.content().unpack(ByteMessage.class).getContents().asReadOnlyByteBuffer();
buf = ByteMessage.parseFrom(m.content()).getContents().asReadOnlyByteBuffer();
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException(e);
}
Expand Down