Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteChunk API for HTTP1 #220

Merged
merged 10 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Source/AwsCommonRuntimeKit/http/HTTP1Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,52 @@ public class HTTP1Stream: HTTPStream {
self.httpConnection = httpConnection
super.init(rawValue: rawValue, callbackData: callbackData)
}

/// Submit a chunk of data to be sent on an HTTP/1.1 stream.
/// The stream must have specified "chunked" in a "transfer-encoding" header and no body.
/// activate() must be called before any chunks are submitted.
/// - Parameters:
/// - chunk: The chunk to write. If the chunk is empty, it will signify the end of the stream.
/// - endOfStream: Set it true to end the stream and prevent any further write. The last chunk must be send with the value true.
/// - Throws: CommonRunTimeError.crtError
public override func writeChunk(chunk: Data, endOfStream: Bool) async throws {
if endOfStream && !chunk.isEmpty {
/// The HTTP/1.1 does not support writing a chunk and sending the end of the stream simultaneously.
/// It requires sending an empty chunk at the end to finish the stream.
/// To maintain consistency with HTTP/2, if there is data and `endOfStream` is true, then send the data and the empty stream in two calls.
try await writeChunk(chunk: chunk, endOfStream: false)
return try await writeChunk(chunk: Data(), endOfStream: true)
}

var options = aws_http1_chunk_options()
options.on_complete = onWriteComplete
return try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<(), Error>) in
let continuationCore = ContinuationCore(continuation: continuation)
let stream = IStreamCore(
iStreamable: ByteBuffer(data: chunk))
options.chunk_data = stream.rawValue
options.chunk_data_size = UInt64(chunk.count)
options.user_data = continuationCore.passRetained()
guard aws_http1_stream_write_chunk(
rawValue,
&options) == AWS_OP_SUCCESS else {
continuationCore.release()
continuation.resume(throwing: CommonRunTimeError.crtError(.makeFromLastError()))
return
}
})
}
}

private func onWriteComplete(stream: UnsafeMutablePointer<aws_http_stream>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer!) {
let continuation = Unmanaged<ContinuationCore<()>>.fromOpaque(userData).takeRetainedValue().continuation
guard errorCode == AWS_OP_SUCCESS else {
continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
return
}

// SUCCESS
continuation.resume()
}
6 changes: 3 additions & 3 deletions Source/AwsCommonRuntimeKit/http/HTTP2Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public class HTTP2Stream: HTTPStream {
/// - data: Data to write. It can be empty
/// - endOfStream: Set it true to end the stream and prevent any further write.
/// The last frame must be send with the value true.
/// - Throws:
public func writeData(data: Data, endOfStream: Bool) async throws {
/// - Throws: CommonRunTimeError.crtError
public override func writeChunk(chunk: Data, endOfStream: Bool) async throws {
var options = aws_http2_stream_write_data_options()
options.end_stream = endOfStream
options.on_complete = onWriteComplete
try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<(), Error>) in
let continuationCore = ContinuationCore(continuation: continuation)
let stream = IStreamCore(
iStreamable: ByteBuffer(data: data))
iStreamable: ByteBuffer(data: chunk))
options.data = stream.rawValue
options.user_data = continuationCore.passRetained()
guard aws_http2_stream_write_data(
Expand Down
11 changes: 9 additions & 2 deletions Source/AwsCommonRuntimeKit/http/HTTPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import AwsCHttp
import Foundation

/// An base class that represents a single Http Request/Response for both HTTP/1.1 and HTTP/2.
/// An base abstract class that represents a single Http Request/Response for both HTTP/1.1 and HTTP/2.
/// Can be used to update the Window size, and get status code.
public class HTTPStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about turning this class into a protocol to avoid the abstract class workaround with fatalAssert, but that would require making rawValue and callbackData variables public.

let rawValue: UnsafeMutablePointer<aws_http_stream>
Expand Down Expand Up @@ -41,7 +41,14 @@ public class HTTPStream {
throw CommonRunTimeError.crtError(.makeFromLastError())
}
}


/// This method must be overridden in each subclass because this function is specific to each subclass.
/// For HTTP/1.1 see ``HTTP1Stream/writeChunk(chunk:endOfStream:)``
/// For HTTP2: see ``HTTP2Stream/writeChunk(chunk:endOfStream:)``
public func writeChunk(chunk: Data, endOfStream: Bool) async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mildly annoyed by calling this "chunk"
"chunk" is an HTTP/1-specific term

fatalError("writeChunk is not implemented for HTTPStream base")
}

deinit {
aws_http_stream_release(rawValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,19 @@ class HTTP2ClientConnectionTests: HTTPClientTestFixture {
http2ManualDataWrites: true)
let connection = try await connectionManager.acquireConnection()
let streamBase = try connection.makeRequest(requestOptions: http2RequestOptions)
let stream = streamBase as! HTTP2Stream
try stream.activate()
try streamBase.activate()
XCTAssertFalse(onCompleteCalled)
let data = TEST_DOC_LINE.data(using: .utf8)!
for chunk in data.chunked(into: 5) {
try await stream.writeData(data: chunk, endOfStream: false)
try await streamBase.writeChunk(chunk: chunk, endOfStream: false)
XCTAssertFalse(onCompleteCalled)
}

XCTAssertFalse(onCompleteCalled)
// Sleep for 5 seconds to make sure onComplete is not triggerred until endOfStream is true
try await Task.sleep(nanoseconds: 5_000_000_000)
XCTAssertFalse(onCompleteCalled)
try await stream.writeData(data: Data(), endOfStream: true)
try await streamBase.writeChunk(chunk: Data(), endOfStream: true)
semaphore.wait()
XCTAssertTrue(onCompleteCalled)
XCTAssertNil(httpResponse.error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ class HTT2StreamManagerTests: HTTPClientTestFixture {
XCTAssertFalse(onCompleteCalled)
let data = TEST_DOC_LINE.data(using: .utf8)!
for chunk in data.chunked(into: 5) {
try await stream.writeData(data: chunk, endOfStream: false)
try await stream.writeChunk(chunk: chunk, endOfStream: false)
XCTAssertFalse(onCompleteCalled)
}

XCTAssertFalse(onCompleteCalled)
// Sleep for 5 seconds to make sure onComplete is not triggerred until endOfStream is true
try await Task.sleep(nanoseconds: 5_000_000_000)
XCTAssertFalse(onCompleteCalled)
try await stream.writeData(data: Data(), endOfStream: true)
try await stream.writeChunk(chunk: Data(), endOfStream: true)
semaphore.wait()
XCTAssertTrue(onCompleteCalled)
XCTAssertNil(httpResponse.error)
Expand Down
12 changes: 9 additions & 3 deletions Test/AwsCommonRuntimeKitTests/http/HTTPClientTestFixture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,17 @@ class HTTPClientTestFixture: XCBaseTestCase {
headers: [HTTPHeader] = [HTTPHeader](),
onResponse: HTTPRequestOptions.OnResponse? = nil,
onBody: HTTPRequestOptions.OnIncomingBody? = nil,
onComplete: HTTPRequestOptions.OnStreamComplete? = nil
onComplete: HTTPRequestOptions.OnStreamComplete? = nil,
useChunkedEncoding: Bool = false
) throws -> HTTPRequestOptions {
let httpRequest: HTTPRequest = try HTTPRequest(method: method, path: path, body: ByteBuffer(data: body.data(using: .utf8)!))
let httpRequest: HTTPRequest = try HTTPRequest(method: method, path: path, body: useChunkedEncoding ? nil : ByteBuffer(data: body.data(using: .utf8)!))
httpRequest.addHeader(header: HTTPHeader(name: "Host", value: endpoint))
httpRequest.addHeader(header: HTTPHeader(name: "Content-Length", value: String(body.count)))
if (useChunkedEncoding) {
httpRequest.addHeader(header: HTTPHeader(name: "Transfer-Encoding", value: "chunked"))
}
else {
httpRequest.addHeader(header: HTTPHeader(name: "Content-Length", value: String(body.count)))
}
httpRequest.addHeaders(headers: headers)
return getRequestOptions(
request: httpRequest,
Expand Down
99 changes: 95 additions & 4 deletions Test/AwsCommonRuntimeKitTests/http/HTTPTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import AwsCCommon
import AwsCHttp

class HTTPTests: HTTPClientTestFixture {
let host = "httpbin.org"
let host = "postman-echo.com"
let getPath = "/get"

func testGetHTTPSRequest() async throws {
let connectionManager = try await getHttpConnectionManager(endpoint: host, ssh: true, port: 443)
_ = try await sendHTTPRequest(method: "GET", endpoint: host, path: getPath, connectionManager: connectionManager)
_ = try await sendHTTPRequest(method: "GET", endpoint: host, path: "/delete", expectedStatus: 405, connectionManager: connectionManager)
_ = try await sendHTTPRequest(method: "GET", endpoint: host, path: "/delete", expectedStatus: 404, connectionManager: connectionManager)
}

func testGetHTTPRequest() async throws {
let connectionManager = try await getHttpConnectionManager(endpoint: host, ssh: false, port: 80)
_ = try await sendHTTPRequest(method: "GET", endpoint: host, path: getPath, connectionManager: connectionManager)
}

func testPutHttpRequest() async throws {
func testPutHTTPRequest() async throws {
let connectionManager = try await getHttpConnectionManager(endpoint: host, ssh: true, port: 443)
let response = try await sendHTTPRequest(
method: "PUT",
endpoint: host,
path: "/anything",
path: "/put",
body: TEST_DOC_LINE,
connectionManager: connectionManager)

Expand All @@ -38,6 +38,97 @@ class HTTPTests: HTTPClientTestFixture {
XCTAssertEqual(body.data, TEST_DOC_LINE)
}

func testHTTPChunkTransferEncoding() async throws {
let connectionManager = try await getHttpConnectionManager(endpoint: host, alpnList: ["http/1.1"])
let semaphore = DispatchSemaphore(value: 0)
var httpResponse = HTTPResponse()
var onCompleteCalled = false
let httpRequestOptions = try getHTTPRequestOptions(
method: "PUT",
endpoint: host,
path: "/put",
response: &httpResponse,
semaphore: semaphore,
onComplete: { _ in
onCompleteCalled = true
},
useChunkedEncoding: true)
let connection = try await connectionManager.acquireConnection()
let streamBase = try connection.makeRequest(requestOptions: httpRequestOptions)
try streamBase.activate()
XCTAssertFalse(onCompleteCalled)
let data = TEST_DOC_LINE.data(using: .utf8)!
for chunk in data.chunked(into: 5) {
try await streamBase.writeChunk(chunk: chunk, endOfStream: false)
XCTAssertFalse(onCompleteCalled)
}

XCTAssertFalse(onCompleteCalled)
// Sleep for 5 seconds to make sure onComplete is not triggerred
try await Task.sleep(nanoseconds: 5_000_000_000)
XCTAssertFalse(onCompleteCalled)
try await streamBase.writeChunk(chunk: Data(), endOfStream: true)
semaphore.wait()
XCTAssertTrue(onCompleteCalled)
XCTAssertNil(httpResponse.error)
XCTAssertEqual(httpResponse.statusCode, 200)

// Parse json body
struct Response: Codable {
let data: String
}

let body: Response = try! JSONDecoder().decode(Response.self, from: httpResponse.body)
XCTAssertEqual(body.data, TEST_DOC_LINE)
}

func testHTTPChunkTransferEncodingWithDataInLastChunk() async throws {
let connectionManager = try await getHttpConnectionManager(endpoint: host, alpnList: ["http/1.1"])
let semaphore = DispatchSemaphore(value: 0)
var httpResponse = HTTPResponse()
var onCompleteCalled = false
let httpRequestOptions = try getHTTPRequestOptions(
method: "PUT",
endpoint: host,
path: "/put",
response: &httpResponse,
semaphore: semaphore,
onComplete: { _ in
onCompleteCalled = true
},
useChunkedEncoding: true)
let connection = try await connectionManager.acquireConnection()
let streamBase = try connection.makeRequest(requestOptions: httpRequestOptions)
try streamBase.activate()
XCTAssertFalse(onCompleteCalled)
let data = TEST_DOC_LINE.data(using: .utf8)!
for chunk in data.chunked(into: 5) {
try await streamBase.writeChunk(chunk: chunk, endOfStream: false)
XCTAssertFalse(onCompleteCalled)
}

XCTAssertFalse(onCompleteCalled)
// Sleep for 5 seconds to make sure onComplete is not triggerred
try await Task.sleep(nanoseconds: 5_000_000_000)
XCTAssertFalse(onCompleteCalled)

let lastChunkData = Data("last chunk data".utf8)
try await streamBase.writeChunk(chunk: lastChunkData, endOfStream: true)
semaphore.wait()
XCTAssertTrue(onCompleteCalled)
XCTAssertNil(httpResponse.error)
XCTAssertEqual(httpResponse.statusCode, 200)

// Parse json body
struct Response: Codable {
let data: String
}

let body: Response = try! JSONDecoder().decode(Response.self, from: httpResponse.body)
XCTAssertEqual(body.data, TEST_DOC_LINE + String(decoding: lastChunkData, as: UTF8.self))
}


func testHTTPStreamIsReleasedIfNotActivated() async throws {
do {
let httpRequestOptions = try getHTTPRequestOptions(method: "GET", endpoint: host, path: getPath)
Expand Down