diff --git a/src/main/java/com/epam/aidial/core/log/GfLogStore.java b/src/main/java/com/epam/aidial/core/log/GfLogStore.java index 767fcae3c..22d1793c7 100644 --- a/src/main/java/com/epam/aidial/core/log/GfLogStore.java +++ b/src/main/java/com/epam/aidial/core/log/GfLogStore.java @@ -9,6 +9,10 @@ import com.epam.deltix.gflog.api.LogFactory; import com.epam.deltix.gflog.api.LogLevel; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.buffer.ByteBufInputStream; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; @@ -22,6 +26,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Scanner; @Slf4j public class GfLogStore implements LogStore { @@ -92,6 +97,10 @@ private void append(ProxyContext context, LogEntry entry) throws JsonProcessingE append(entry, "}", false); } + append(entry, ",\"deployment\":\"", false); + append(entry, context.getDeployment().getName(), true); + append(entry, "\"", false); + String sourceDeployment = context.getSourceDeployment(); if (sourceDeployment != null) { append(entry, ",\"parent_deployment\":\"", false); @@ -105,6 +114,15 @@ private void append(ProxyContext context, LogEntry entry) throws JsonProcessingE append(entry, ProxyUtil.MAPPER.writeValueAsString(executionPath), false); } + append(entry, ",\"assembled_response\":\"", false); + Buffer responseBody = context.getResponseBody(); + if (isStreamingResponse(responseBody)) { + append(entry, assembleStreamingResponse(responseBody), true); + } else { + append(entry, responseBody); + } + append(entry, "\"", false); + append(entry, ",\"trace\":{\"trace_id\":\"", false); append(entry, context.getTraceId(), true); @@ -146,7 +164,6 @@ private void append(ProxyContext context, LogEntry entry) throws JsonProcessingE append(entry, "\"}}", false); } - private static void append(LogEntry entry, Buffer buffer) { if (buffer != null) { byte[] bytes = buffer.getBytes(); @@ -199,4 +216,139 @@ private static String formatTimestamp(long timestamp) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")) .format(DateTimeFormatter.ISO_DATE_TIME); } + + /** + * Assembles streaming response into a single one. + * The assembling process merges chunks of the streaming response one by one using separator: \n*data: * + * + * @param response byte array response to be assembled. + * @return assembled streaming response + */ + static String assembleStreamingResponse(Buffer response) { + try (Scanner scanner = new Scanner(new ByteBufInputStream(response.getByteBuf()))) { + StringBuilder content = new StringBuilder(); + ObjectNode last = null; + ObjectNode choice = ProxyUtil.MAPPER.createObjectNode(); + ObjectNode message = ProxyUtil.MAPPER.createObjectNode(); + choice.set("message", message); + JsonNode usage = null; + JsonNode statistics = null; + JsonNode systemFingerprint = null; + JsonNode model = null; + // each chunk is separated by one or multiple new lines with the prefix: 'data:' + scanner.useDelimiter("\n*data: *"); + while (scanner.hasNext()) { + String chunk = scanner.next(); + if (chunk.startsWith("[DONE]")) { + break; + } + ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(chunk); + if (tree.get("usage") != null) { + usage = tree.get("usage"); + } + if (tree.get("statistics") != null) { + statistics = tree.get("statistics"); + } + if (tree.get("system_fingerprint") != null) { + systemFingerprint = tree.get("system_fingerprint"); + } + if (model == null && tree.get("model") != null) { + model = tree.get("model"); + } + last = tree; + ArrayNode choices = (ArrayNode) tree.get("choices"); + if (choices == null) { + // skip error message + continue; + } + JsonNode curChoice = choices.get(0); + choice.set("finish_reason", curChoice.get("finish_reason")); + JsonNode delta = curChoice.get("delta"); + if (delta.get("custom_content") != null) { + message.set("custom_content", delta.get("custom_content")); + } + if (delta.get("tool_calls") != null) { + message.set("tool_calls", delta.get("tool_calls")); + } + if (delta.get("function_call") != null) { + message.set("function_call", delta.get("function_call")); + } + JsonNode contentNode = delta.get("content"); + if (contentNode != null) { + content.append(contentNode.textValue()); + } + } + + if (last == null) { + log.warn("no chunk is found in streaming response"); + return "{}"; + } + + ObjectNode result = ProxyUtil.MAPPER.createObjectNode(); + result.set("id", last.get("id")); + result.put("object", "chat.completion"); + result.set("created", last.get("created")); + result.set("model", model); + + if (usage != null) { + result.set("usage", usage); + } + if (statistics != null) { + result.set("statistics", statistics); + } + if (systemFingerprint != null) { + result.set("system_fingerprint", systemFingerprint); + } + + if (content.isEmpty()) { + // error + return ProxyUtil.convertToString(result); + } + + ArrayNode choices = ProxyUtil.MAPPER.createArrayNode(); + result.set("choices", choices); + choices.add(choice); + choice.put("index", 0); + message.put("role", "assistant"); + message.put("content", content.toString()); + + return ProxyUtil.convertToString(result); + } catch (Throwable e) { + log.warn("Can't assemble streaming response", e); + return "{}"; + } + } + + /** + * Determines if the given response is streaming. + *

+ * Streaming response is spitted into chunks. Each chunk starts with a new line and has a prefix: 'data:'. + * For example
+ * + * data: {content: "some text"} + * \n\ndata: {content: "some text"} + * \ndata: [DONE] + * + *

+ * + * @param response byte array response. + * @return true is the response is streaming. + */ + static boolean isStreamingResponse(Buffer response) { + int i = 0; + for (; i < response.length(); i++) { + byte b = response.getByte(i); + if (!Character.isWhitespace(b)) { + break; + } + } + String dataToken = "data:"; + int j = 0; + for (; i < response.length() && j < dataToken.length(); i++, j++) { + if (dataToken.charAt(j) != response.getByte(i)) { + break; + } + } + return j == dataToken.length(); + } } diff --git a/src/test/java/com/epam/aidial/core/log/GfLogStoreTest.java b/src/test/java/com/epam/aidial/core/log/GfLogStoreTest.java new file mode 100644 index 000000000..307a3b850 --- /dev/null +++ b/src/test/java/com/epam/aidial/core/log/GfLogStoreTest.java @@ -0,0 +1,172 @@ +package com.epam.aidial.core.log; + +import io.vertx.core.buffer.Buffer; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("checkstyle:LineLength") +public class GfLogStoreTest { + + @Test + public void testIsStreamingResponse() { + String batchResponse = """ + { + "id": "chatcmpl-7VfMTgj3ljKdGKS2BEIwloII3IoO0", + "object": "chat.completion", + "created": 1687781517, + "model": "gpt-35-turbo", + "choices": [ + { + "index": 0, + "finish_reason": "stop", + "message": { + "role": "assistant", + "content": "As an AI language model, I do not have emotions like humans. However, I am functioning well and ready to assist you. How can I help you today?" + } + } + ], + "usage" \t\r : \t\r { + "junk_string": "junk", + "junk_integer" : 1, + "junk_float" : 1.0, + "junk_null" : null, + "junk_true" : true, + "junk_false" : false, + "completion_tokens": 33, + "prompt_tokens": 19, + "total_tokens": 52 + } + } + """; + assertFalse(GfLogStore.isStreamingResponse(Buffer.buffer(batchResponse))); + String streamingResponse = """ + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"role":"assistant"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"As"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":"stop","delta":{}}], + "usage" \n\t\r : \n\t\r { + "junk_string": "junk", + "junk_integer" : 1, + "junk_float" : 1.0, + "junk_null" : null, + "junk_true" : true, + "junk_false" : false, + "completion_tokens": 10, + "prompt_tokens": 20, + "total_tokens": 30 + } + } + data: [DONE] + """; + assertTrue(GfLogStore.isStreamingResponse(Buffer.buffer(streamingResponse))); + } + + @Test + public void testAssembleStreamingResponse() { + String streamingResponse = """ + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"role":"assistant"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"As"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" an"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" AI"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" language"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" model"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":","}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" I"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" don"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"'t"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" have"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" emotions"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":","}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" but"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" I"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"'m"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" functioning"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" perfectly"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" well"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"."}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" How"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" can"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" I"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" assist"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" you"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":" today"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":null,"delta":{"content":"?"}}],"usage":null} + + data: {"id":"chatcmpl-7VfCSOSOS1gYQbDFiEMyh71RJSy1m","object":"chat.completion.chunk","created":1687780896,"model":"gpt-35-turbo","choices":[{"index":0,"finish_reason":"stop","delta":{}}], + "usage" \n\t\r : \n\t\r { + "junk_string": "junk", + "junk_integer" : 1, + "junk_float" : 1.0, + "junk_null" : null, + "junk_true" : true, + "junk_false" : false, + "completion_tokens": 10, + "prompt_tokens": 20, + "total_tokens": 30 + } + } + data: + { + "id": "1d84aa54-e476-405d-9713-386bdfc85993", + "object": "chat.completion.chunk", + "created": "1687222196", + "statistics": { + "usage_per_model": [ + { + "index": 0, + "name": "text-embedding-ada-002", + "prompt_tokens": 23, + "total_tokens": 23 + }, + { + "index": 1, + "name": "gpt-4", + "prompt_tokens": 123, + "completion_tokens": 17, + "total_tokens": 140 + } + ] + } + } + + data: [DONE] + + """; + String res = GfLogStore.assembleStreamingResponse(Buffer.buffer(streamingResponse)); + assertNotNull(res); + String expected = """ + {"id":"1d84aa54-e476-405d-9713-386bdfc85993","object":"chat.completion","created":"1687222196","model":"gpt-35-turbo","usage":{"junk_string":"junk","junk_integer":1,"junk_float":1.0,"junk_null":null,"junk_true":true,"junk_false":false,"completion_tokens":10,"prompt_tokens":20,"total_tokens":30},"statistics":{"usage_per_model":[{"index":0,"name":"text-embedding-ada-002","prompt_tokens":23,"total_tokens":23},{"index":1,"name":"gpt-4","prompt_tokens":123,"completion_tokens":17,"total_tokens":140}]},"choices":[{"message":{"role":"assistant","content":"As an AI language model, I don't have emotions, but I'm functioning perfectly well. How can I assist you today?"},"finish_reason":"stop","index":0}]}"""; + assertEquals(expected, res); + } +}