Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(openai): parsing of streaming completions #1844

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 49 additions & 38 deletions ihp-openai/IHP/OpenAI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,45 +120,56 @@ streamCompletion secretKey completionRequest' onStart callback = do

streamCompletionWithoutRetry :: ByteString -> CompletionRequest -> IO () -> (Text -> IO ()) -> IO (Either Text Text)
streamCompletionWithoutRetry secretKey completionRequest' onStart callback = do
let completionRequest = enableStream completionRequest'
modifyContextSSL (\context -> do
SSL.contextSetVerificationMode context SSL.VerifyNone
pure context
)
withOpenSSL do
withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do
let q = buildRequest1 do
http POST "/v1/chat/completions"
setContentType "application/json"
Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey)

sendRequest connection q (jsonBody completionRequest)
let completionRequest = enableStream completionRequest'
modifyContextSSL (\context -> do
SSL.contextSetVerificationMode context SSL.VerifyNone
pure context
)
withOpenSSL do
withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do
let q = buildRequest1 do
http POST "/v1/chat/completions"
setContentType "application/json"
Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey)
sendRequest connection q (jsonBody completionRequest)
onStart
receiveResponse connection handler

let handler = \p i -> do
let status = getStatusCode p
if status == 200
then do
x <- Streams.foldM (parseResponseChunk callback) ("", "") i
return (Right (snd x))
else do
x <- Streams.fold mappend mempty i
return (Left $ "an error happend: " <> Text.pack (show x))

onStart
receiveResponse connection handler
where
parseResponseChunk :: (Text -> IO ()) -> (ByteString, Text) -> ByteString -> IO (ByteString, Text)
parseResponseChunk callback (curBuffer, chunk) input = do
case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of
Just json -> do
case decodeStrict json of
Just CompletionResult { choices } -> do
let tokens :: Text = mconcat $ map (.text) choices
callback tokens
pure ("", chunk <> tokens)
otherwise -> do
pure (curBuffer <> json, chunk)
Nothing -> pure (curBuffer <> input, chunk)
handler :: Response -> Streams.InputStream ByteString -> IO (Either Text Text)
handler response stream = do
let status = getStatusCode response
if status == 200
then do
{-
parse stream line by line as event stream format according to API spec:
https://platform.openai.com/docs/api-reference/chat/create#chat/create-stream
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
-}
(_, _, output) <- Streams.lines stream >>= Streams.foldM (parseResponseChunk callback) ("", False, "")
return (Right output)
else do
x :: ByteString <- Streams.fold mappend mempty stream
return (Left $ "an error happend: " <> Text.pack (show x))

parseResponseChunk :: (Text -> IO ()) -> (ByteString, Bool, Text) -> ByteString -> IO (ByteString, Bool, Text)
parseResponseChunk callback (curBuffer, emptyLineFound, chunk) input
-- input line is empty, but previous was not, append newline to buffer
| ByteString.null input && not emptyLineFound = pure (curBuffer <> "\n", True, chunk)
-- input line is empty, previous line was already empty: message ended, clear buffer
| ByteString.null input && emptyLineFound = pure ("", True, chunk)
-- lines starting with : are comments, ignore
| ":" `ByteString.isPrefixOf` input = pure (curBuffer, False, chunk)
-- try to parse line together with buffer otherwise
| otherwise = case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of
Just json -> do
case eitherDecodeStrict json of
Right CompletionResult { choices } -> do
let tokens :: Text = mconcat $ map (.text) choices
callback tokens
pure ("", False, chunk <> tokens)
Left err -> pure (curBuffer <> json, False, chunk)
Nothing -> pure (curBuffer <> input, False, chunk)


fetchCompletion :: ByteString -> CompletionRequest -> IO Text
Expand Down Expand Up @@ -192,4 +203,4 @@ fetchCompletionWithoutRetry secretKey completionRequest = do
pure (mconcat $ map (.text) completionResult.choices)

enableStream :: CompletionRequest -> CompletionRequest
enableStream completionRequest = completionRequest { stream = True }
enableStream completionRequest = completionRequest { stream = True }
Loading