Skip to content

Commit

Permalink
fix: response serialization failed
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinhermawan committed Mar 31, 2024
1 parent e9ea77c commit 33a7bec
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 42 deletions.
56 changes: 35 additions & 21 deletions Sources/OllamaKit/OllamaKit+Chat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ extension OllamaKit {
public func chat(data: OKChatRequestData) -> AsyncThrowingStream<OKChatResponse, Error> {
AsyncThrowingStream { continuation in
let request = AF.streamRequest(router.chat(data: data)).validate()
var buffer = Data()

request.responseStreamDecodable(of: OKChatResponse.self, using: decoder) { stream in
request.responseStream { stream in
switch stream.event {
case .stream(let result):
switch result {
case .success(let response):
continuation.yield(response)
case .success(let data):
buffer.append(data)

while let chunk = extractNextJSON(from: &buffer) {
do {
let response = try decoder.decode(OKChatResponse.self, from: chunk)
continuation.yield(response)
} catch {
continuation.finish(throwing: error)
return
}
}
case .failure(let error):
continuation.finish(throwing: error)
}
case .complete(let completion):
if let error = completion.error {
continuation.finish(throwing: error)
} else {
continuation.finish()
}
case .complete(_):
continuation.finish()
}
}
}
Expand All @@ -73,26 +80,33 @@ extension OllamaKit {
/// ```
///
/// - Parameter data: The ``OKChatRequestData`` used to initiate the chat streaming from the Ollama API.
/// - Returns: An `AnyPublisher<OKChatResponse, AFError>` emitting the live stream of chat responses from the Ollama API.
public func chat(data: OKChatRequestData) -> AnyPublisher<OKChatResponse, AFError> {
let subject = PassthroughSubject<OKChatResponse, AFError>()
/// - Returns: An `AnyPublisher<OKChatResponse, Error>` emitting the live stream of chat responses from the Ollama API.
public func chat(data: OKChatRequestData) -> AnyPublisher<OKChatResponse, Error> {
let subject = PassthroughSubject<OKChatResponse, Error>()
let request = AF.streamRequest(router.chat(data: data)).validate()
var buffer = Data()

request.responseStreamDecodable(of: OKChatResponse.self, using: decoder) { stream in
request.responseStream { stream in
switch stream.event {
case .stream(let result):
switch result {
case .success(let response):
subject.send(response)
case .success(let data):
buffer.append(data)

while let chunk = extractNextJSON(from: &buffer) {
do {
let response = try decoder.decode(OKChatResponse.self, from: chunk)
subject.send(response)
} catch {
subject.send(completion: .failure(error))
return
}
}
case .failure(let error):
subject.send(completion: .failure(error))
}
case .complete(let completion):
if completion.error != nil {
subject.send(completion: .failure(completion.error!))
} else {
subject.send(completion: .finished)
}
case .complete(_):
subject.send(completion: .finished)
}
}

Expand Down
56 changes: 35 additions & 21 deletions Sources/OllamaKit/OllamaKit+Generate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ extension OllamaKit {
public func generate(data: OKGenerateRequestData) -> AsyncThrowingStream<OKGenerateResponse, Error> {
AsyncThrowingStream { continuation in
let request = AF.streamRequest(router.generate(data: data)).validate()
var buffer = Data()

request.responseStreamDecodable(of: OKGenerateResponse.self, using: decoder) { stream in
request.responseStream { stream in
switch stream.event {
case .stream(let result):
switch result {
case .success(let response):
continuation.yield(response)
case .success(let data):
buffer.append(data)

while let chunk = extractNextJSON(from: &buffer) {
do {
let response = try decoder.decode(OKGenerateResponse.self, from: chunk)
continuation.yield(response)
} catch {
continuation.finish(throwing: error)
return
}
}
case .failure(let error):
continuation.finish(throwing: error)
}
case .complete(let completion):
if let error = completion.error {
continuation.finish(throwing: error)
} else {
continuation.finish()
}
case .complete(_):
continuation.finish()
}
}
}
Expand All @@ -73,26 +80,33 @@ extension OllamaKit {
/// ```
///
/// - Parameter data: The ``OKGenerateRequestData`` used to initiate the streaming from the Ollama API.
/// - Returns: An `AnyPublisher<OKGenerateResponse, AFError>` emitting the live stream of responses from the Ollama API.
public func generate(data: OKGenerateRequestData) -> AnyPublisher<OKGenerateResponse, AFError> {
let subject = PassthroughSubject<OKGenerateResponse, AFError>()
/// - Returns: An `AnyPublisher<OKGenerateResponse, Error>` emitting the live stream of responses from the Ollama API.
public func generate(data: OKGenerateRequestData) -> AnyPublisher<OKGenerateResponse, Error> {
let subject = PassthroughSubject<OKGenerateResponse, Error>()
let request = AF.streamRequest(router.generate(data: data)).validate()
var buffer = Data()

request.responseStreamDecodable(of: OKGenerateResponse.self, using: decoder) { stream in
request.responseStream { stream in
switch stream.event {
case .stream(let result):
switch result {
case .success(let response):
subject.send(response)
case .success(let data):
buffer.append(data)

while let chunk = extractNextJSON(from: &buffer) {
do {
let response = try decoder.decode(OKGenerateResponse.self, from: chunk)
subject.send(response)
} catch {
subject.send(completion: .failure(error))
return
}
}
case .failure(let error):
subject.send(completion: .failure(error))
}
case .complete(let completion):
if completion.error != nil {
subject.send(completion: .failure(completion.error!))
} else {
subject.send(completion: .finished)
}
case .complete(_):
subject.send(completion: .finished)
}
}

Expand Down
40 changes: 40 additions & 0 deletions Sources/OllamaKit/OllamaKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,44 @@ public struct OllamaKit {

self.router = router
}

internal func extractNextJSON(from buffer: inout Data) -> Data? {
var isEscaped = false
var isWithinString = false
var nestingDepth = 0
var objectStartIndex = buffer.startIndex

for (index, byte) in buffer.enumerated() {
let character = Character(UnicodeScalar(byte))

if isEscaped {
isEscaped = false
} else if character == "\\" {
isEscaped = true
} else if character == "\"" {
isWithinString.toggle()
} else if !isWithinString {
switch character {
case "{":
nestingDepth += 1
if nestingDepth == 1 {
objectStartIndex = index
}
case "}":
nestingDepth -= 1
if nestingDepth == 0 {
let range = objectStartIndex..<buffer.index(after: index)
let jsonObject = buffer.subdata(in: range)
buffer.removeSubrange(range)

return jsonObject
}
default:
break
}
}
}

return nil
}
}

0 comments on commit 33a7bec

Please sign in to comment.