diff --git a/src/main/java/com/meta/chatbridge/FBID.java b/src/main/java/com/meta/chatbridge/FBID.java deleted file mode 100644 index 575a880..0000000 --- a/src/main/java/com/meta/chatbridge/FBID.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -package com.meta.chatbridge; - -import java.util.Objects; - -public class FBID { - - private final long value; - - private FBID(long value) { - this.value = value; - } - - public static FBID from(long value) { - return new FBID(value); - } - - public static FBID from(String value) { - Objects.requireNonNull(value); - long longValue = Long.parseLong(value); - return new FBID(longValue); - } - - public long longValue() { - return value; - } - - @Override - public String toString() { - return Long.toString(value); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FBID fbid = (FBID) o; - return value == fbid.value; - } - - @Override - public int hashCode() { - return Objects.hash(value); - } -} diff --git a/src/main/java/com/meta/chatbridge/Identifier.java b/src/main/java/com/meta/chatbridge/Identifier.java new file mode 100644 index 0000000..9218358 --- /dev/null +++ b/src/main/java/com/meta/chatbridge/Identifier.java @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.chatbridge; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; + +public class Identifier implements Comparator { + + private final byte[] id; + + private Identifier(byte[] id) { + this.id = id; + } + + public static Identifier from(String id) { + return new Identifier(id.getBytes(StandardCharsets.UTF_8)); + } + + public static Identifier from(long id) { + return new Identifier(Long.toString(id).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public String toString() { + return new String(id, StandardCharsets.UTF_8); + } + + @Override + public int compare(Identifier o1, Identifier o2) { + return Comparator.naturalOrder().compare(o1.toString(), o2.toString()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Identifier that = (Identifier) o; + return Arrays.equals(id, that.id); + } + + @Override + public int hashCode() { + return Arrays.hashCode(id); + } +} diff --git a/src/main/java/com/meta/chatbridge/Pipeline.java b/src/main/java/com/meta/chatbridge/Pipeline.java index 59abe1f..afb4180 100644 --- a/src/main/java/com/meta/chatbridge/Pipeline.java +++ b/src/main/java/com/meta/chatbridge/Pipeline.java @@ -25,8 +25,6 @@ public class Pipeline { private static final Logger LOGGER = LoggerFactory.getLogger(Pipeline.class); private final ExecutorService executorService = Executors.newCachedThreadPool(); - private final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(); private final MessageHandler handler; private final ChatStore store; private final LLMHandler llmHandler; @@ -70,9 +68,7 @@ private void execute(MessageStack stack) { } catch (Exception e) { LOGGER.error("failed to respond to user", e); // TODO: create transactional store add - // TODO: implement exponential backoff - scheduledExecutorService.schedule( - () -> executorService.submit(() -> stack), 30, TimeUnit.SECONDS); + // TODO: implement retry with exponential backoff } } } diff --git a/src/main/java/com/meta/chatbridge/message/FBMessage.java b/src/main/java/com/meta/chatbridge/message/FBMessage.java index 01a5a2d..230f6da 100644 --- a/src/main/java/com/meta/chatbridge/message/FBMessage.java +++ b/src/main/java/com/meta/chatbridge/message/FBMessage.java @@ -8,14 +8,14 @@ package com.meta.chatbridge.message; -import com.meta.chatbridge.FBID; +import com.meta.chatbridge.Identifier; import java.time.Instant; public record FBMessage( Instant timestamp, - String instanceId, - FBID senderId, - FBID recipientId, + Identifier instanceId, + Identifier senderId, + Identifier recipientId, String message, Role role) implements Message {} diff --git a/src/main/java/com/meta/chatbridge/message/FBMessageHandler.java b/src/main/java/com/meta/chatbridge/message/FBMessageHandler.java index 8a7b991..6c5e6f9 100644 --- a/src/main/java/com/meta/chatbridge/message/FBMessageHandler.java +++ b/src/main/java/com/meta/chatbridge/message/FBMessageHandler.java @@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.meta.chatbridge.FBID; +import com.meta.chatbridge.Identifier; import io.javalin.http.BadRequestResponse; import io.javalin.http.Context; import io.javalin.http.HandlerType; @@ -34,6 +34,7 @@ import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.net.URIBuilder; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.common.returnsreceiver.qual.This; import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public class FBMessageHandler implements MessageHandler { private final String accessToken; private final Deduplicator bodyDeduplicator = new Deduplicator<>(10_000); - private Function baseURLFactory = + private Function baseURLFactory = pageId -> { try { return new URIBuilder() @@ -72,7 +73,8 @@ public FBMessageHandler(String verifyToken, String pageAccessToken, String appSe } @TestOnly - FBMessageHandler baseURLFactory(Function baseURLFactory) { + @This + FBMessageHandler baseURLFactory(Function baseURLFactory) { this.baseURLFactory = Objects.requireNonNull(baseURLFactory); return this; } @@ -170,12 +172,12 @@ private List postHandler(Context ctx) throws JsonProcessingException continue; } - FBID senderId = FBID.from(message.get("sender").get("id").asLong()); - FBID recipientId = FBID.from(message.get("recipient").get("id").asLong()); + Identifier senderId = Identifier.from(message.get("sender").get("id").asLong()); + Identifier recipientId = Identifier.from(message.get("recipient").get("id").asLong()); Instant timestamp = Instant.ofEpochMilli(message.get("timestamp").asLong()); @Nullable JsonNode messageObject = message.get("message"); if (messageObject != null) { - String messageId = messageObject.get("mid").textValue(); + Identifier messageId = Identifier.from(messageObject.get("mid").textValue()); String messageText = messageObject.get("text").textValue(); FBMessage m = new FBMessage( diff --git a/src/main/java/com/meta/chatbridge/message/Message.java b/src/main/java/com/meta/chatbridge/message/Message.java index 263df3b..e632871 100644 --- a/src/main/java/com/meta/chatbridge/message/Message.java +++ b/src/main/java/com/meta/chatbridge/message/Message.java @@ -8,20 +8,19 @@ package com.meta.chatbridge.message; -import com.meta.chatbridge.FBID; +import com.meta.chatbridge.Identifier; import java.time.Instant; public interface Message { - record ConversationId(FBID recipientId, FBID senderId) {} Instant timestamp(); - String instanceId(); + Identifier instanceId(); - FBID senderId(); + Identifier senderId(); - FBID recipientId(); + Identifier recipientId(); String message(); @@ -32,4 +31,11 @@ enum Role { USER, SYSTEM } + + default Identifier conversationId() { + if (senderId().compare(senderId(), recipientId()) >= 0) { + return Identifier.from(senderId().toString() + recipientId()); + } + return Identifier.from(recipientId().toString() + senderId()); + } } diff --git a/src/main/java/com/meta/chatbridge/store/MemoryStore.java b/src/main/java/com/meta/chatbridge/store/MemoryStore.java index 6ef94d0..36863f4 100644 --- a/src/main/java/com/meta/chatbridge/store/MemoryStore.java +++ b/src/main/java/com/meta/chatbridge/store/MemoryStore.java @@ -10,19 +10,19 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.meta.chatbridge.Identifier; import com.meta.chatbridge.message.Message; -import com.meta.chatbridge.message.Message.ConversationId; import java.time.Duration; public class MemoryStore implements ChatStore { - private final Cache> store; + private final Cache> store; public MemoryStore() { this.store = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofDays(3)) .maximumWeight((long) (10 * Math.pow(2, 20))) // 10 megabytes - .>weigher( + .>weigher( (k, v) -> v.messages().stream().map(m -> m.message().length()).reduce(0, Integer::sum)) .build(); @@ -33,7 +33,7 @@ public MessageStack add(T message) { return this.store .asMap() .compute( - new ConversationId(message.recipientId(), message.senderId()), + message.conversationId(), (k, v) -> { if (v == null) { return MessageStack.of(message); diff --git a/src/test/java/com/meta/chatbridge/llm/DummyFBMessageLLMHandler.java b/src/test/java/com/meta/chatbridge/llm/DummyFBMessageLLMHandler.java index fed79a3..5548b6e 100644 --- a/src/test/java/com/meta/chatbridge/llm/DummyFBMessageLLMHandler.java +++ b/src/test/java/com/meta/chatbridge/llm/DummyFBMessageLLMHandler.java @@ -8,6 +8,7 @@ package com.meta.chatbridge.llm; +import com.meta.chatbridge.Identifier; import com.meta.chatbridge.message.FBMessage; import com.meta.chatbridge.message.Message; import com.meta.chatbridge.store.MessageStack; @@ -53,7 +54,7 @@ public FBMessage handle(MessageStack messageStack) { messageStack.messages().stream().filter(m -> m.role() == Message.Role.USER).findAny().get(); return new FBMessage( Instant.now(), - "test_message", + Identifier.from("test_message"), inbound.recipientId(), inbound.senderId(), dummyLLMResponse, diff --git a/src/test/java/com/meta/chatbridge/message/FBMessageHandlerTest.java b/src/test/java/com/meta/chatbridge/message/FBMessageHandlerTest.java index 9e0ebf9..258c864 100644 --- a/src/test/java/com/meta/chatbridge/message/FBMessageHandlerTest.java +++ b/src/test/java/com/meta/chatbridge/message/FBMessageHandlerTest.java @@ -16,7 +16,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.ImmutableMap; -import com.meta.chatbridge.FBID; +import com.meta.chatbridge.Identifier; import com.meta.chatbridge.Pipeline; import com.meta.chatbridge.PipelinesRunner; import com.meta.chatbridge.llm.DummyFBMessageLLMHandler; @@ -165,7 +165,7 @@ private static Request createMessageRequest(String body, PipelinesRunner runner) return createMessageRequest(body, runner, true); } - private Function testURLFactoryFactory(FBID pageId) { + private Function testURLFactoryFactory(Identifier pageId) { return p -> { assertThat(p).isEqualTo(pageId); try { @@ -310,7 +310,7 @@ void invalidMessage( int timesToSendMessage) throws Exception { String path = "/testfbmessage"; - FBID pageId = FBID.from(106195825075770L); + Identifier pageId = Identifier.from(106195825075770L); String token = "243af3c6-9994-4869-ae13-ad61a38323f5"; // this is fake don't worry String secret = "f74a638462f975e9eadfcbb84e4aa06b"; // it's been rolled don't worry FBMessageHandler messageHandler = new FBMessageHandler("0", token, secret); @@ -341,13 +341,14 @@ void invalidMessage( JsonNode messageObject = PARSED_SAMPLE_MESSAGE.get("entry").get(0).get("messaging").get(0); String messageText = messageObject.get("message").get("text").textValue(); String mid = messageObject.get("message").get("mid").textValue(); - FBID recipientId = FBID.from(messageObject.get("recipient").get("id").textValue()); - FBID senderId = FBID.from(messageObject.get("sender").get("id").textValue()); + Identifier recipientId = + Identifier.from(messageObject.get("recipient").get("id").textValue()); + Identifier senderId = Identifier.from(messageObject.get("sender").get("id").textValue()); Instant timestamp = Instant.ofEpochMilli(messageObject.get("timestamp").longValue()); assertThat(stack.messages()) .hasSize(1) .allSatisfy(m -> assertThat(m.message()).isEqualTo(messageText)) - .allSatisfy(m -> assertThat(m.instanceId()).isEqualTo(mid)) + .allSatisfy(m -> assertThat(m.instanceId().toString()).isEqualTo(mid)) .allSatisfy(m -> assertThat(m.role()).isSameAs(Role.USER)) .allSatisfy(m -> assertThat(m.timestamp()).isEqualTo(timestamp)) .allSatisfy(m -> assertThat(m.recipientId()).isEqualTo(recipientId)) @@ -360,7 +361,7 @@ void invalidMessage( .allSatisfy(t -> assertThat(t).isEqualTo(token)); JsonNode body = MAPPER.readTree(r.body); assertThat(body.get("messaging_type").textValue()).isEqualTo("RESPONSE"); - assertThat(body.get("recipient").get("id").asLong()).isEqualTo(senderId.longValue()); + assertThat(body.get("recipient").get("id").textValue()).isEqualTo(senderId.toString()); assertThat(body.get("message").get("text").textValue()).isEqualTo(llmHandler.dummyResponse()); } } diff --git a/src/test/java/com/meta/chatbridge/store/MessageStackTest.java b/src/test/java/com/meta/chatbridge/store/MessageStackTest.java index fc337e4..0ce1f94 100644 --- a/src/test/java/com/meta/chatbridge/store/MessageStackTest.java +++ b/src/test/java/com/meta/chatbridge/store/MessageStackTest.java @@ -11,7 +11,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.Lists; -import com.meta.chatbridge.FBID; +import com.meta.chatbridge.Identifier; import com.meta.chatbridge.message.Message; import java.time.Instant; import java.util.List; @@ -22,18 +22,18 @@ class MessageStackTest { record TestMessage(Instant timestamp) implements Message { @Override - public String instanceId() { - return "0"; + public Identifier instanceId() { + return Identifier.from("0"); } @Override - public FBID senderId() { - return FBID.from(0); + public Identifier senderId() { + return Identifier.from(0); } @Override - public FBID recipientId() { - return FBID.from(0); + public Identifier recipientId() { + return Identifier.from(0); } @Override