Skip to content

Commit

Permalink
Fernet token auth (#189)
Browse files Browse the repository at this point in the history
* Fernet token auth

* delete unused

* add api for additional server interceptors, api for fernet token evaluation, E2E fernet token testing

* Consolidate duplication

* Consolidate duplication

* Fernet token AuthNZ available on all Apollo services. Token call credentials supplier is optionally available in the creation of the ServerConnectionCache. Clients now call the supplied ManagedServerChannel to wrap interceptors and call credentials appropriately.

* Provide optional furnet token validation for services. Router level validation, or individual service level validation provided.

* Provide optional furnet token validation on router creation

* npe protection

* debug log and fail with unauthenticated when null token
  • Loading branch information
Hellblazer authored Feb 19, 2024
1 parent a240c76 commit e817e30
Show file tree
Hide file tree
Showing 60 changed files with 1,163 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TerminalClient implements Terminal {

public TerminalClient(ManagedServerChannel channel, ChoamMetrics metrics) {
this.channel = channel;
this.client = TerminalGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(TerminalGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TxnSubmitClient implements TxnSubmission {

public TxnSubmitClient(ManagedServerChannel channel, ChoamMetrics metrics) {
this.channel = channel;
this.client = TransactionSubmissionGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(TransactionSubmissionGrpc.newBlockingStub(channel));
}

public static CreateClientCommunications<TxnSubmission> getCreate(ChoamMetrics metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.salesforce.apollo.ethereal.memberships;

import com.codahale.metrics.Timer;
import com.macasaet.fernet.Token;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.cryptography.Digest;
Expand Down Expand Up @@ -36,6 +37,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import static com.salesforce.apollo.ethereal.memberships.comm.GossiperClient.getCreate;

Expand All @@ -47,15 +49,16 @@
*/
public class ChRbcGossip {

private static final Logger log = LoggerFactory.getLogger(
private static final Logger log = LoggerFactory.getLogger(
ChRbcGossip.class);
private final CommonCommunications<Gossiper, GossiperService> comm;
private final Context<Member> context;
private final SigningMember member;
private final EtherealMetrics metrics;
private final Processor processor;
private final RingCommunications<Member, Gossiper> ring;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private final Terminal terminal = new Terminal();
private volatile ScheduledFuture<?> scheduled;

public ChRbcGossip(Context<Member> context, SigningMember member, Processor processor, Router communications,
Expand All @@ -64,7 +67,7 @@ public ChRbcGossip(Context<Member> context, SigningMember member, Processor proc
this.context = context;
this.member = member;
this.metrics = m;
comm = communications.create((Member) member, context.getId(), new Terminal(), getClass().getCanonicalName(),
comm = communications.create(member, context.getId(), terminal, getClass().getCanonicalName(),
r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r),
getCreate(metrics), Gossiper.getLocalLoopback(member));
ring = new RingCommunications<>(context, member, this.comm);
Expand All @@ -78,12 +81,19 @@ public Context<Member> getContext() {
* Start the receiver's gossip
*/
public void start(Duration duration) {
start(duration, null);
}

/**
* Start the receiver's gossip
*/
public void start(Duration duration, Predicate<Token> validator) {
if (!started.compareAndSet(false, true)) {
return;
}
Duration initialDelay = duration.plusMillis(Entropy.nextBitsStreamLong(duration.toMillis()));
log.trace("Starting GossipService[{}] on: {}", context.getId(), member.getId());
comm.register(context.getId(), new Terminal());
comm.register(context.getId(), terminal, validator);
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
Expand Down Expand Up @@ -121,11 +131,11 @@ private Update gossipRound(Gossiper link, int ring) {
try {
return link.gossip(processor.gossip(context.getId(), ring));
} catch (StatusRuntimeException e) {
log.debug("gossiping[{}] failed with: {} with {} ring: {} on {}", context.getId(), e.getMessage(),
log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on {}", context.getId(), e.getMessage(),
member.getId(), ring, link.getMember().getId(), member.getId(), e);
return null;
} catch (Throwable e) {
log.warn("gossiping[{}] failed from {} with {} ring: {} on {}", context.getId(), member.getId(), ring,
log.warn("gossiping[{}] failed: {} from {} with {} ring: {} on {}", context.getId(), member.getId(), ring,
link.getMember().getId(), ring, member.getId(), e);
return null;
}
Expand Down Expand Up @@ -168,7 +178,7 @@ private void handle(Optional<Update> result, RingCommunications.Destination<Memb
.setUpdate(processor.update(update))
.build());
} catch (StatusRuntimeException e) {
log.debug("gossiping[{}] failed with: {} with {} ring: {} on {}", context.getId(), e.getMessage(),
log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on {}", context.getId(), e.getMessage(),
member.getId(), ring, destination.member().getId(), member.getId(), e);
}
} finally {
Expand All @@ -191,7 +201,7 @@ private void oneRound(Duration duration, ScheduledExecutorService scheduler) {
return;
}
var timer = metrics == null ? null : metrics.gossipRoundDuration().time();
ring.execute((link, ring) -> gossipRound(link, ring),
ring.execute(this::gossipRound,
(result, destination) -> handle(result, destination, duration, scheduler, timer));
}

Expand All @@ -204,7 +214,8 @@ public Update gossip(Gossip request, Digest from) {
Member predecessor = context.ring(request.getRing()).predecessor(member);
if (predecessor == null || !from.equals(predecessor.getId())) {
log.debug("Invalid inbound gossip on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(),
member, from, request.getRing(), predecessor.getId());
member.getId(), from, request.getRing(),
predecessor == null ? "<null>" : predecessor.getId());
return Update.getDefaultInstance();
}
final var update = processor.gossip(request);
Expand All @@ -218,10 +229,11 @@ public void update(ContextUpdate request, Digest from) {
Member predecessor = context.ring(request.getRing()).predecessor(member);
if (predecessor == null || !from.equals(predecessor.getId())) {
log.debug("Invalid inbound update on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(),
member.getId(), from, request.getRing(), predecessor.getId());
member.getId(), from, request.getRing(),
predecessor == null ? "<null>" : predecessor.getId());
return;
}
log.trace("gossip update with {} on: {}", from, member);
log.trace("gossip update with {} on: {}", from, member.getId());
processor.updateFrom(request.getUpdate());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
package com.salesforce.apollo.ethereal.memberships.comm;

import com.codahale.metrics.Timer.Context;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.ethereal.proto.ContextUpdate;
import com.salesforce.apollo.ethereal.proto.Gossip;
import com.salesforce.apollo.ethereal.proto.GossiperGrpc;
import com.salesforce.apollo.ethereal.proto.Update;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.membership.Member;

/**
Expand All @@ -27,7 +27,7 @@ public class GossiperClient implements Gossiper {

public GossiperClient(ManagedServerChannel channel, EtherealMetrics metrics) {
this.channel = channel;
this.client = GossiperGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(GossiperGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class EntranceClient implements Entrance {

public EntranceClient(ManagedServerChannel channel, FireflyMetrics metrics) {
this.channel = channel;
this.client = EntranceGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(EntranceGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
package com.salesforce.apollo.fireflies.comm.gossip;

import com.codahale.metrics.Timer.Context;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.fireflies.proto.FirefliesGrpc;
import com.salesforce.apollo.fireflies.proto.Gossip;
import com.salesforce.apollo.fireflies.proto.SayWhat;
import com.salesforce.apollo.fireflies.proto.State;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.membership.Member;

/**
Expand All @@ -28,7 +28,7 @@ public class FfClient implements Fireflies {

public FfClient(ManagedServerChannel channel, FireflyMetrics metrics) {
this.channel = channel;
this.client = FirefliesGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(FirefliesGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
*/
package com.salesforce.apollo.gorgoneion.client.client.comm;

import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.gorgoneion.proto.AdmissionsGrpc;
import com.salesforce.apollo.gorgoneion.proto.Credentials;
import com.salesforce.apollo.gorgoneion.proto.SignedNonce;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.stereotomy.event.proto.Validations;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.membership.Member;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,7 +29,7 @@ public class AdmissionsClient implements Admissions {

public AdmissionsClient(ManagedServerChannel channel, GorgoneionClientMetrics metrics) {
this.channel = channel;
this.client = AdmissionsGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(AdmissionsGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
*/
package com.salesforce.apollo.gorgoneion.comm.endorsement;

import com.salesforce.apollo.gorgoneion.proto.*;
import com.salesforce.apollo.stereotomy.event.proto.Validation_;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.gorgoneion.comm.GorgoneionMetrics;
import com.salesforce.apollo.gorgoneion.proto.*;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.Validation_;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -28,7 +28,7 @@ public class EndorsementClient implements Endorsement {

public EndorsementClient(ManagedServerChannel channel, GorgoneionMetrics metrics) {
this.channel = channel;
this.client = EndorsementGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(EndorsementGrpc.newBlockingStub(channel));
this.metrics = metrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
*/
package com.salesforce.apollo.gorgoneion;

import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.gorgoneion.proto.AdmissionsGrpc;
import com.salesforce.apollo.gorgoneion.proto.Credentials;
import com.salesforce.apollo.gorgoneion.proto.SignedNonce;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.stereotomy.event.proto.Validations;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.membership.Member;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,7 +28,7 @@ public class AdmissionsClient implements Admissions {

public AdmissionsClient(ManagedServerChannel channel) {
this.channel = channel;
this.client = AdmissionsGrpc.newBlockingStub(channel).withCompression("gzip");
this.client = channel.wrap(AdmissionsGrpc.newBlockingStub(channel));
}

public static CreateClientCommunications<Admissions> getCreate() {
Expand Down
7 changes: 3 additions & 4 deletions grpc/src/main/proto/leyden.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ package leyden;

service Binder {
rpc bind(Binding) returns(google.protobuf.Empty) {}
rpc unbind(KeyAndToken) returns(google.protobuf.Empty) {}
rpc get(KeyAndToken) returns(Bound) {}
rpc unbind(Key) returns(google.protobuf.Empty) {}
rpc get(Key) returns(Bound) {}
}

service Reconciliation {
rpc reconcile (Intervals) returns (Update) {}
rpc update (Updating) returns (google.protobuf.Empty) {}
}

message KeyAndToken {
message Key {
bytes key = 1;
bytes token = 2;
}

message Update {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
return new SimpleForwardingClientCall<ReqT, RespT>(newCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Router.METADATA_CLIENT_ID_KEY, qb64(ctx));
headers.put(Constants.METADATA_CLIENT_ID_KEY, qb64(ctx));
super.start(responseListener, headers);
}
};
Expand Down Expand Up @@ -199,11 +199,11 @@ private ManagedChannel handler(DomainSocketAddress address) {
.build();
}

public static interface TestIt {
public interface TestIt {
void ping(Any request, StreamObserver<Any> responseObserver);
}

public static interface TestItService extends Link {
public interface TestItService extends Link {
Any ping(Any request);
}

Expand All @@ -226,7 +226,7 @@ public static class TestItClient implements TestItService {

public TestItClient(ManagedServerChannel c) {
this.connection = c;
client = TestItGrpc.newBlockingStub(c);
client = c.wrap(TestItGrpc.newBlockingStub(c));
}

@Override
Expand Down
Loading

0 comments on commit e817e30

Please sign in to comment.