Skip to content

Commit

Permalink
get code back to passing all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hunterjackson committed Dec 18, 2023
1 parent 8bab6be commit 997ba46
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 28 deletions.
2 changes: 1 addition & 1 deletion CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Examples of unacceptable behavior by participants include:

## Our Responsibilities

Project maintainers are responsible for clarifying the standards of acceptable
Project maintainers are responsible for clarifying the standards of acceptor
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.

Expand Down
43 changes: 43 additions & 0 deletions src/main/java/com/meta/cp4m/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import com.meta.cp4m.llm.LLMPlugin;
import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.MessageHandler;
import com.meta.cp4m.message.RequestProcessor;
import com.meta.cp4m.message.ThreadState;
import com.meta.cp4m.routing.Acceptor;
import com.meta.cp4m.routing.Handler;
import com.meta.cp4m.routing.Route;
import com.meta.cp4m.store.ChatStore;
import io.javalin.Javalin;
import io.javalin.http.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,6 +55,25 @@ void handle(Context ctx) {
}
}

<IN> void handler(Context ctx, IN in, RequestProcessor<IN, T> processor) {
List<T> messages = null;
try {
messages = processor.process(ctx, in);
} catch (Exception e) {
LOGGER
.atError()
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.setMessage("unable to process request")
.log();
}
// TODO: once we have a non-volatile store, on startup send stored but not replied to messages
for (T m : messages) {
ThreadState<T> thread = store.add(m);
executorService.submit(() -> execute(thread));
}
}

public void register(Javalin app) {
handler.handlers().forEach(m -> app.addHandler(m, path, this::handle));
}
Expand Down Expand Up @@ -79,4 +104,22 @@ private void execute(ThreadState<T> thread) {
LOGGER.error("an error occurred while attempting to respond", e);
}
}

private <E> Route<E> toRoute(MessageHandler.RouteDetails<E, T> routeDetails) {
return new Route<>(
path,
routeDetails.handlerType(),
routeDetails.acceptor(),
(ctx, in) -> handler(ctx, in, routeDetails.requestProcessor()));
}

List<Route<?>> routes() {
List<MessageHandler.RouteDetails<?, T>> routeDetails = handler.routeDetails();
List<Route<?>> routes = new ArrayList<>(routeDetails.size());
for (MessageHandler.RouteDetails<?, T> routeDetail : routeDetails) {
Route<?> route = toRoute(routeDetail);
routes.add(route);
}
return routes;
}
}
74 changes: 66 additions & 8 deletions src/main/java/com/meta/cp4m/ServicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package com.meta.cp4m;

import com.google.common.base.Preconditions;
import com.meta.cp4m.routing.Route;
import io.javalin.Javalin;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import java.util.*;

import io.javalin.http.BadRequestResponse;
import io.javalin.http.Context;
import io.javalin.http.HandlerType;
import org.checkerframework.common.returnsreceiver.qual.This;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServicesRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ServicesRunner.class);
private final Javalin app = Javalin.create();
private final Set<Service<?>> services = new HashSet<>();
private final Set<Service<?>> services = new LinkedHashSet<>();
private boolean started = false;
private int port = 8080;

Expand All @@ -28,7 +35,59 @@ public static ServicesRunner newInstance() {
return new ServicesRunner();
}

private <T> boolean didAcceptAndHandle(Context ctx, Route<T> route) {
Optional<T> acceptorOutput = route.acceptor().accept(ctx);
if (acceptorOutput.isPresent()) {
try {
route.handler().handle(ctx, acceptorOutput.get());
} catch (Exception e) {
throw new BadRequestResponse("Unable to process request");
}
return true;
}
return false;
}

/**
* Find the first route that will accept this payload and then handle the payload
*
* @param ctx context from Javalin
* @param routes the routes to check for acceptability and process if accepted
*/
private void routeSelectorAndHandler(Context ctx, List<Route<?>> routes) {
for (Route<?> route : routes) {
if (didAcceptAndHandle(ctx, route)) {
return;
}
}
LOGGER
.atError()
.setMessage("Unable to handle incoming webhook")
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("unable to handle webhook");
}

public @This ServicesRunner start() {
record RouteGroup(String path, HandlerType handlerType) {}
Map<RouteGroup, List<Route<?>>> routeGroups = new HashMap<>();
for (Service<?> s : services) { // this is not a stream because order matters here
s.routes()
.forEach(
r ->
routeGroups
.computeIfAbsent(
new RouteGroup(r.path(), r.handlerType()), k -> new ArrayList<>())
.add(r));
}
routeGroups.forEach(
(routeGroup, routes) ->
app.addHandler(
routeGroup.handlerType(),
routeGroup.path(),
ctx -> this.routeSelectorAndHandler(ctx, routes)));

if (!started) {
started = true;
app.start(port);
Expand All @@ -38,9 +97,8 @@ public static ServicesRunner newInstance() {

public @This ServicesRunner service(Service<?> service) {
Preconditions.checkState(!started, "cannot add service, server already started");
if (services.add(service)) {
service.register(app);
}

services.add(service);
return this;
}

Expand Down
85 changes: 84 additions & 1 deletion src/main/java/com/meta/cp4m/message/FBMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package com.meta.cp4m.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -88,7 +89,7 @@ public List<FBMessage> processRequest(Context ctx) {
} catch (JsonProcessingException | NullPointerException e) {
LOGGER
.atWarn()
.setMessage("Unable to parse message form Meta webhook")
.setMessage("Unable to parse message from Meta webhook")
.setCause(e)
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
Expand Down Expand Up @@ -183,6 +184,59 @@ private List<FBMessage> postHandler(Context ctx) throws JsonProcessingException
return output;
}

private List<FBMessage> postHandler(Context ctx, JsonNode body) {
JsonNode entries = body.get("entry");
ArrayList<FBMessage> output = new ArrayList<>();
for (JsonNode entry : entries) {
@Nullable JsonNode messaging = entry.get("messaging");
if (messaging == null) {
continue;
}
for (JsonNode message : messaging) {

Identifier senderId = Identifier.from(message.get("sender").get("id").asLong());
Identifier recipientId = Identifier.from(message.get("recipient").get("id").asLong());
Instant timestamp = Instant.ofEpochMilli(message.get("timestamp").asLong());
@Nullable JsonNode messageObject = message.get("message");
if (messageObject != null) {
// https://developers.facebook.com/docs/messenger-platform/reference/webhook-events/messages
Identifier messageId = Identifier.from(messageObject.get("mid").textValue());
if (messageDeduplicator.addAndGetIsDuplicate(messageId)) {
continue;
}

@Nullable JsonNode textObject = messageObject.get("text");
if (textObject != null && textObject.isTextual()) {
FBMessage m =
new FBMessage(
timestamp,
messageId,
senderId,
recipientId,
textObject.textValue(),
Message.Role.USER);
output.add(m);
} else {
LOGGER
.atWarn()
.setMessage("received message without text, unable to handle this")
.addKeyValue("body", body)
.log();
}
} else {
LOGGER
.atWarn()
.setMessage(
"received a message without a 'message' key, unable to handle this message type")
.addKeyValue("body", body)
.log();
}
}
}

return output;
}

@TestOnly
public @This FBMessageHandler baseURLFactory(Function<Identifier, URI> baseURLFactory) {
this.baseURLFactory = Objects.requireNonNull(baseURLFactory);
Expand Down Expand Up @@ -237,4 +291,33 @@ private void send(String message, Identifier recipient, Identifier sender) throw
public Collection<HandlerType> handlers() {
return List.of(HandlerType.GET, HandlerType.POST);
}

@Override
public List<RouteDetails<?, FBMessage>> routeDetails() {
RouteDetails<JsonNode, FBMessage> postDetails =
new RouteDetails<>(
HandlerType.POST,
ctx -> {
@Nullable String contentType = ctx.contentType();
if (contentType != null
&& ContentType.parse(contentType).isSameMimeType(ContentType.APPLICATION_JSON)
&& MetaHandlerUtils.postHeaderValid(ctx, appSecret)) {
JsonNode body;
try {
body = MAPPER.readTree(ctx.body());
} catch (JsonProcessingException e) {
throw new BadRequestResponse("unable to parse body");
}
// TODO: need better validation
@Nullable JsonNode objectNode = body.get("object");
if (objectNode != null && objectNode.textValue().equals("page")) {
return Optional.of(body);
}
}
return Optional.empty();
},
this::postHandler);

return List.of(MetaHandlerUtils.subscriptionVerificationRouteDetails(verifyToken), postDetails);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/meta/cp4m/message/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

package com.meta.cp4m.message;

import com.meta.cp4m.routing.Acceptor;
import io.javalin.http.Context;
import io.javalin.http.HandlerType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public interface MessageHandler<T extends Message> {
record RouteDetails<IN, OUT extends Message>(
HandlerType handlerType, Acceptor<IN> acceptor, RequestProcessor<IN, OUT> requestProcessor) {}

/**
* Process incoming requests from the messaging service, including messages from the user.
Expand All @@ -35,4 +38,6 @@ public interface MessageHandler<T extends Message> {
* @return The different {@link HandlerType}s that this handler expects to receive
*/
Collection<HandlerType> handlers();

List<RouteDetails<?, T>> routeDetails();
}
42 changes: 42 additions & 0 deletions src/main/java/com/meta/cp4m/message/MetaHandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

import io.javalin.http.HandlerType;
import org.apache.hc.client5.http.utils.Hex;
import org.checkerframework.checker.nullness.qual.Nullable;

class MetaHandlerUtils {
static void subscriptionVerification(Context ctx, String verifyToken) {
Expand All @@ -27,6 +34,26 @@ static void subscriptionVerification(Context ctx, String verifyToken) {
ctx.result(String.valueOf(challenge));
}

static <T extends Message>
MessageHandler.RouteDetails<Integer, T> subscriptionVerificationRouteDetails(
String verifyToken) {
return new MessageHandler.RouteDetails<>(
HandlerType.GET,
ctx ->
// validateSubscription handles putting challenge into context response if it succeeds
{
if (Objects.equals(ctx.queryParam("hub.mode"), "subscribe")
&& Objects.equals(ctx.queryParam("hub.verify_token"), verifyToken)) {
return Optional.of(ctx.queryParamAsClass("hub.challenge", Integer.class).get());
}
return Optional.empty();
},
(ctx, challenge) -> {
ctx.result(String.valueOf(challenge));
return List.of();
});
}

static String hmac(String body, String appSecret) {
Mac sha256HMAC;
SecretKeySpec secretKey;
Expand Down Expand Up @@ -65,4 +92,19 @@ static void postHeaderValidator(Context ctx, String appSecret) {
"X-Hub-Signature-256 could not be validated")
.getOrThrow(ignored -> new ForbiddenResponse("X-Hub-Signature-256 could not be validated"));
}

static boolean postHeaderValid(Context ctx, String appSecret) {
@Nullable String sig = ctx.headerMap().get("X-Hub-Signature-256");
if (sig == null) {
return false;
}

String[] hashParts = sig.strip().split("=");
if (hashParts.length != 2) {
return false;
}

String calculatedHmac = hmac(ctx.body(), appSecret);
return hashParts[1].equals(calculatedHmac);
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/meta/cp4m/message/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.meta.cp4m.message;

import com.meta.cp4m.routing.Acceptor;
import io.javalin.http.HandlerType;

public record RequestHandler<IN, OUT extends Message>(
HandlerType type, Acceptor<IN> acceptor, RequestProcessor<IN, OUT> processor) {}
Loading

0 comments on commit 997ba46

Please sign in to comment.