Skip to content

Commit

Permalink
basic skeleton of Binder and Reconciliation service infra
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Dec 10, 2023
1 parent 4565621 commit 6d2ea93
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 25 deletions.
8 changes: 6 additions & 2 deletions grpc/src/main/proto/leyden.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package leyden;

service Binder {
rpc bind(Binding) returns(google.protobuf.Empty) {}
rpc unbind(stereotomy.Ident) returns(google.protobuf.Empty) {}
rpc unbind(Key_) returns(google.protobuf.Empty) {}
}

service Reconciliation {
Expand Down Expand Up @@ -46,8 +46,12 @@ message Interval {
crypto.Digeste end = 2;
}

message Bound {
message Key_ {
bytes key = 1;
}

message Bound {
Key_ key = 1;
bytes value = 2;
}

Expand Down
47 changes: 36 additions & 11 deletions leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
package com.salesforce.apollo.leyden;

import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.leyden.comm.*;
import com.salesforce.apollo.leyden.comm.binding.*;
import com.salesforce.apollo.leyden.comm.reconcile.*;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;

/**
* @author hal.hildebrand
**/
public class LeydenJar {

private final Context<Member> context;
private final RouterImpl.CommonCommunications<ReconciliationClient, ReconciliationService> comms;
private final RouterImpl.CommonCommunications<ReconciliationClient, ReconciliationService> reconComms;
private final RouterImpl.CommonCommunications<BinderClient, BinderService> binderComms;
private final double fpr;
private final SigningMember member;

public LeydenJar(SigningMember member, Context<Member> context, Router communications, double fpr,
ReconciliationMetrics metrics) {
ReconciliationMetrics metrics, BinderMetrics binderMetrics) {
this.context = context;
ReconciliationService service = new Reconciled();
this.member = member;
var recon = new Reconciled();
reconComms = communications.create(member, context.getId(), recon,
ReconciliationService.class.getCanonicalName(),
r -> new ReconciliationServer(r, communications.getClientIdentityProvider(),
metrics), c -> Reckoning.getCreate(c, metrics),
Reckoning.getLocalLoopback(recon, member));

comms = communications.create(member, context.getId(), service, service.getClass().getCanonicalName(),
r -> new ReconciliationServer(r, communications.getClientIdentityProvider(),
metrics), c -> Reckoning.getCreate(c, metrics),
Reckoning.getLocalLoopback(service, member));
var borders = new Borders();
binderComms = communications.create(member, context.getId(), borders, BinderService.class.getCanonicalName(),
r -> new BinderServer(r, communications.getClientIdentityProvider(),
binderMetrics), c -> Bind.getCreate(c, binderMetrics),
Bind.getLocalLoopback(borders, member));
this.fpr = fpr;
}

Expand All @@ -44,4 +56,17 @@ public void update(Updating request, Digest from) {

}
}

private class Borders implements BinderService {

@Override
public void bind(Binding request, Digest from) {

}

@Override
public void unbind(Key_ request, Digest from) {

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.salesforce.apollo.leyden.comm.binding;

import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.leyden.proto.BinderGrpc;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;

import java.io.IOException;

/**
* @author hal.hildebrand
**/
public class Bind implements BinderClient {
private final ManagedServerChannel channel;
private final BinderMetrics metrics;
private final BinderGrpc.BinderBlockingStub client;

public Bind(ManagedServerChannel channel, BinderMetrics metrics) {
this.channel = channel;
this.metrics = metrics;
this.client = BinderGrpc.newBlockingStub(channel);
}

public static BinderClient getCreate(ManagedServerChannel c, BinderMetrics binderMetrics) {
return new Bind(c, binderMetrics);
}

public static BinderClient getLocalLoopback(BinderService service, SigningMember member) {
return new BinderClient() {
@Override
public void bind(Binding binding) {
service.bind(binding, member.getId());
}

@Override
public void close() throws IOException {
// no op
}

@Override
public Member getMember() {
return member;
}

@Override
public void unbind(Key_ key) {
service.unbind(key, member.getId());
}
};
}

@Override
public void bind(Binding binding) {
client.bind(binding);
}

@Override
public void close() throws IOException {
channel.shutdown();
}

@Override
public Member getMember() {
return channel.getMember();
}

@Override
public void unbind(Key_ key) {
client.unbind(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.salesforce.apollo.leyden.comm.binding;

import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;

/**
* @author hal.hildebrand
**/
public interface BinderClient extends Link {

void bind(Binding binding);

void unbind(Key_ key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.salesforce.apollo.leyden.comm.binding;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.salesforce.apollo.protocols.EndpointMetrics;

/**
* @author hal.hildebrand
**/
public interface BinderMetrics extends EndpointMetrics {
Histogram inboundBind();

Timer inboundBindTimer();

Histogram inboundUnbind();

Timer inboundUnbindTimer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.salesforce.apollo.leyden.comm.binding;

import com.codahale.metrics.Timer;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.leyden.proto.BinderGrpc;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;
import com.salesforce.apollo.protocols.ClientIdentity;
import io.grpc.stub.StreamObserver;

/**
* @author hal.hildebrand
**/
public class BinderServer extends BinderGrpc.BinderImplBase {

private final RoutableService<BinderService> routing;
private final ClientIdentity identity;
private final BinderMetrics metrics;

public BinderServer(RoutableService<BinderService> r, ClientIdentity clientIdentityProvider,
BinderMetrics binderMetrics) {
routing = r;
this.identity = clientIdentityProvider;
this.metrics = binderMetrics;
}

@Override
public void bind(Binding request, StreamObserver<Empty> responseObserver) {
Timer.Context timer = metrics == null ? null : metrics.inboundBindTimer().time();
if (metrics != null) {
var serializedSize = request.getSerializedSize();
metrics.inboundBandwidth().mark(serializedSize);
metrics.inboundBind().update(serializedSize);
}
Digest from = identity.getFrom();
if (from == null) {
responseObserver.onError(new IllegalStateException("Member has been removed"));
return;
}
routing.evaluate(responseObserver, s -> {
try {
s.bind(request, from);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} finally {
if (timer != null) {
timer.stop();
}
}
});
}

@Override
public void unbind(Key_ request, StreamObserver<Empty> responseObserver) {
Timer.Context timer = metrics == null ? null : metrics.inboundUnbindTimer().time();
if (metrics != null) {
var serializedSize = request.getSerializedSize();
metrics.inboundBandwidth().mark(serializedSize);
metrics.inboundUnbind().update(serializedSize);
}
Digest from = identity.getFrom();
if (from == null) {
responseObserver.onError(new IllegalStateException("Member has been removed"));
return;
}
routing.evaluate(responseObserver, s -> {
try {
s.unbind(request, from);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} finally {
if (timer != null) {
timer.stop();
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.salesforce.apollo.leyden.comm.binding;

import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.leyden.proto.Binding;
import com.salesforce.apollo.leyden.proto.Key_;

/**
* @author hal.hildebrand
**/
public interface BinderService {
void bind(Binding request, Digest from);

void unbind(Key_ request, Digest from);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.salesforce.apollo.leyden.comm;
package com.salesforce.apollo.leyden.comm.reconcile;

import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.leyden.proto.Intervals;
import com.salesforce.apollo.leyden.proto.ReconciliationGrpc;
import com.salesforce.apollo.leyden.proto.Update;
import com.salesforce.apollo.leyden.proto.Updating;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;

Expand All @@ -25,7 +25,7 @@ public Reckoning(ManagedServerChannel channel, Member member, ReconciliationMetr
}

public static ReconciliationClient getCreate(ManagedServerChannel channel, ReconciliationMetrics metrics) {
return null;
return new Reckoning(channel, channel.getMember(), metrics);
}

public static ReconciliationClient getLocalLoopback(ReconciliationService service, SigningMember member) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.salesforce.apollo.leyden.comm;
package com.salesforce.apollo.leyden.comm.reconcile;

import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.leyden.proto.Intervals;
import com.salesforce.apollo.leyden.proto.Update;
import com.salesforce.apollo.leyden.proto.Updating;
import com.salesforce.apollo.archipelago.Link;

/**
* @author hal.hildebrand
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.salesforce.apollo.leyden.comm;
package com.salesforce.apollo.leyden.comm.reconcile;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.salesforce.apollo.leyden.comm;
package com.salesforce.apollo.leyden.comm.reconcile;

import com.codahale.metrics.Timer;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.protocols.ClientIdentity;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.ReconciliationGrpc;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import com.salesforce.apollo.archipelago.RoutableService;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.protocols.ClientIdentity;
import io.grpc.stub.StreamObserver;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.salesforce.apollo.leyden.comm;
package com.salesforce.apollo.leyden.comm.reconcile;

import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.thoth.proto.Intervals;
import com.salesforce.apollo.thoth.proto.Update;
import com.salesforce.apollo.thoth.proto.Updating;
import com.salesforce.apollo.cryptography.Digest;

/**
* @author hal.hildebrand
Expand Down

0 comments on commit 6d2ea93

Please sign in to comment.