Skip to content

Commit

Permalink
Merge pull request #1844 from digitallyinduced/nicolas/fix-openai-stream
Browse files Browse the repository at this point in the history
fix(openai): parsing of streaming completions
  • Loading branch information
mpscholten authored Oct 17, 2023
2 parents 4e9fc9c + 81a3139 commit e66f966
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions ihp-openai/IHP/OpenAI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,45 +120,56 @@ streamCompletion secretKey completionRequest' onStart callback = do

streamCompletionWithoutRetry :: ByteString -> CompletionRequest -> IO () -> (Text -> IO ()) -> IO (Either Text Text)
streamCompletionWithoutRetry secretKey completionRequest' onStart callback = do
let completionRequest = enableStream completionRequest'
modifyContextSSL (\context -> do
SSL.contextSetVerificationMode context SSL.VerifyNone
pure context
)
withOpenSSL do
withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do
let q = buildRequest1 do
http POST "/v1/chat/completions"
setContentType "application/json"
Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey)

sendRequest connection q (jsonBody completionRequest)
let completionRequest = enableStream completionRequest'
modifyContextSSL (\context -> do
SSL.contextSetVerificationMode context SSL.VerifyNone
pure context
)
withOpenSSL do
withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do
let q = buildRequest1 do
http POST "/v1/chat/completions"
setContentType "application/json"
Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey)
sendRequest connection q (jsonBody completionRequest)
onStart
receiveResponse connection handler

let handler = \p i -> do
let status = getStatusCode p
if status == 200
then do
x <- Streams.foldM (parseResponseChunk callback) ("", "") i
return (Right (snd x))
else do
x <- Streams.fold mappend mempty i
return (Left $ "an error happend: " <> Text.pack (show x))

onStart
receiveResponse connection handler
where
parseResponseChunk :: (Text -> IO ()) -> (ByteString, Text) -> ByteString -> IO (ByteString, Text)
parseResponseChunk callback (curBuffer, chunk) input = do
case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of
Just json -> do
case decodeStrict json of
Just CompletionResult { choices } -> do
let tokens :: Text = mconcat $ map (.text) choices
callback tokens
pure ("", chunk <> tokens)
otherwise -> do
pure (curBuffer <> json, chunk)
Nothing -> pure (curBuffer <> input, chunk)
handler :: Response -> Streams.InputStream ByteString -> IO (Either Text Text)
handler response stream = do
let status = getStatusCode response
if status == 200
then do
{-
parse stream line by line as event stream format according to API spec:
https://platform.openai.com/docs/api-reference/chat/create#chat/create-stream
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
-}
(_, _, output) <- Streams.lines stream >>= Streams.foldM (parseResponseChunk callback) ("", False, "")
return (Right output)
else do
x :: ByteString <- Streams.fold mappend mempty stream
return (Left $ "an error happend: " <> Text.pack (show x))

parseResponseChunk :: (Text -> IO ()) -> (ByteString, Bool, Text) -> ByteString -> IO (ByteString, Bool, Text)
parseResponseChunk callback (curBuffer, emptyLineFound, chunk) input
-- input line is empty, but previous was not, append newline to buffer
| ByteString.null input && not emptyLineFound = pure (curBuffer <> "\n", True, chunk)
-- input line is empty, previous line was already empty: message ended, clear buffer
| ByteString.null input && emptyLineFound = pure ("", True, chunk)
-- lines starting with : are comments, ignore
| ":" `ByteString.isPrefixOf` input = pure (curBuffer, False, chunk)
-- try to parse line together with buffer otherwise
| otherwise = case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of
Just json -> do
case eitherDecodeStrict json of
Right CompletionResult { choices } -> do
let tokens :: Text = mconcat $ map (.text) choices
callback tokens
pure ("", False, chunk <> tokens)
Left err -> pure (curBuffer <> json, False, chunk)
Nothing -> pure (curBuffer <> input, False, chunk)


fetchCompletion :: ByteString -> CompletionRequest -> IO Text
Expand Down Expand Up @@ -192,4 +203,4 @@ fetchCompletionWithoutRetry secretKey completionRequest = do
pure (mconcat $ map (.text) completionResult.choices)

enableStream :: CompletionRequest -> CompletionRequest
enableStream completionRequest = completionRequest { stream = True }
enableStream completionRequest = completionRequest { stream = True }

0 comments on commit e66f966

Please sign in to comment.