Skip to content

Commit

Permalink
Further flesh on the bone
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Dec 10, 2023
1 parent 655f8f8 commit 9eb2ec4
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 34 deletions.
5 changes: 3 additions & 2 deletions grpc/src/main/proto/leyden.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package leyden;
service Binder {
rpc bind(Binding) returns(google.protobuf.Empty) {}
rpc unbind(Key_) returns(google.protobuf.Empty) {}
rpc get(Key_) returns(Bound) {}
}

service Reconciliation {
Expand All @@ -25,14 +26,14 @@ service Reconciliation {
}

message Update {
repeated bytes keyValues = 1;
repeated Binding bindings = 1;
repeated Interval intervals = 2;
crypto.Biff have = 3;
}

message Updating {
int32 ring = 1;
repeated bytes keyValues = 2;
repeated Binding bindings = 2;
}

message Intervals {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

package com.salesforce.apollo.leyden;

import com.salesforce.apollo.thoth.proto.Interval;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.leyden.proto.Interval;

import java.util.*;
import java.util.function.Predicate;
Expand Down
117 changes: 93 additions & 24 deletions leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,45 @@
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.leyden.comm.binding.*;
import com.salesforce.apollo.leyden.comm.reconcile.*;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;
import com.salesforce.apollo.leyden.proto.*;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author hal.hildebrand
**/
public class LeydenJar {

public static final String LEYDEN_JAR = "Leyden-Jar";
private final Context<Member> context;
private final RouterImpl.CommonCommunications<ReconciliationClient, ReconciliationService> reconComms;
private final RouterImpl.CommonCommunications<BinderClient, BinderService> binderComms;
private final double fpr;
private final SigningMember member;
private final MVMap<Key_, Binding> bottled;
private final AtomicBoolean started = new AtomicBoolean();
public static final String LEYDEN_JAR = "Leyden-Jar";
private static final Logger log = LoggerFactory.getLogger(
LeydenJar.class);
private final Context<Member> context;
private final RouterImpl.CommonCommunications<ReconciliationClient, ReconciliationService> reconComms;
private final RouterImpl.CommonCommunications<BinderClient, BinderService> binderComms;
private final double fpr;
private final SigningMember member;
private final MVMap<byte[], Binding> bottled;
private final AtomicBoolean started = new AtomicBoolean();
private final RingCommunications<Member, ReconciliationClient> reconcile;

public LeydenJar(SigningMember member, Context<Member> context, Router communications, double fpr, MVStore store,
ReconciliationMetrics metrics, BinderMetrics binderMetrics) {
Expand All @@ -50,19 +61,77 @@ public LeydenJar(SigningMember member, Context<Member> context, Router communica
binderMetrics), c -> Bind.getCreate(c, binderMetrics),
Bind.getLocalLoopback(borders, member));
this.fpr = fpr;
bottled = store.openMap(LEYDEN_JAR, new MVMap.Builder<Key_, Binding>().keyType(new ProtobufDatatype<Key_>(b -> {
try {
return Key_.parseFrom(b);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
})).valueType(new ProtobufDatatype<Binding>(b -> {
bottled = store.openMap(LEYDEN_JAR,
new MVMap.Builder<byte[], Binding>().valueType(new ProtobufDatatype<Binding>(b -> {
try {
return Binding.parseFrom(b);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
})));
reconcile = new RingCommunications<>(this.context, member, reconComms);
}

public void start(Duration gossip) {
if (!started.compareAndSet(false, true)) {
return;
}
reconcile(Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), gossip);
}

private Biff populate(long l, CombinedIntervals keyIntervals, double fpr) {
return null;
}

private Update reconcile(ReconciliationClient link, Integer ring) {
if (member.equals(link.getMember())) {
return null;
}
CombinedIntervals keyIntervals = null;
log.trace("Interval reconciliation on ring: {} with: {} on: {} intervals: {}", ring, link.getMember(),
member.getId(), keyIntervals);
return link.reconcile(Intervals.newBuilder()
.setRing(ring)
.addAllIntervals(keyIntervals.toIntervals())
.setHave(populate(Entropy.nextBitsStreamLong(), keyIntervals, fpr))
.build());
}

private void reconcile(Optional<Update> result,
RingCommunications.Destination<Member, ReconciliationClient> destination,
ScheduledExecutorService scheduler, Duration duration) {
if (!started.get()) {
return;
}
if (!result.isEmpty()) {
try {
return Binding.parseFrom(b);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
Update update = result.get();
log.trace("Received: {} events in interval reconciliation from: {} on: {}", update.getBindingsCount(),
destination.member().getId(), member.getId());
update(update.getBindingsList());
} catch (NoSuchElementException e) {
log.debug("null interval reconciliation with {} : {} on: {}", destination.member().getId(),
member.getId(), e.getCause());
}
})));
} else {
log.trace("Received no events in interval reconciliation from: {} on: {}", destination.member().getId(),
member.getId());
}
if (started.get()) {
scheduler.schedule(() -> reconcile(scheduler, duration), duration.toMillis(), TimeUnit.MILLISECONDS);
}
}

private void reconcile(ScheduledExecutorService scheduler, Duration duration) {
if (!started.get()) {
return;
}
reconcile.execute((link, ring) -> reconcile(link, ring),
(futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration));

}

private void update(List<Binding> bindings) {
}

private class Reconciled implements ReconciliationService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.leyden.proto.Intervals;
import com.salesforce.apollo.leyden.proto.ReconciliationGrpc;
import com.salesforce.apollo.leyden.proto.Update;
import com.salesforce.apollo.leyden.proto.Updating;
import com.salesforce.apollo.protocols.ClientIdentity;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.ReconciliationGrpc;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import io.grpc.stub.StreamObserver;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.salesforce.apollo.leyden.comm.reconcile;

import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import com.salesforce.apollo.leyden.proto.Intervals;
import com.salesforce.apollo.leyden.proto.Update;
import com.salesforce.apollo.leyden.proto.Updating;

/**
* @author hal.hildebrand
Expand Down

0 comments on commit 9eb2ec4

Please sign in to comment.