diff --git a/Source/AwsCommonRuntimeKit/http/HTTP1Stream.swift b/Source/AwsCommonRuntimeKit/http/HTTP1Stream.swift index dd68131e0..089b3a8e3 100644 --- a/Source/AwsCommonRuntimeKit/http/HTTP1Stream.swift +++ b/Source/AwsCommonRuntimeKit/http/HTTP1Stream.swift @@ -20,4 +20,52 @@ public class HTTP1Stream: HTTPStream { self.httpConnection = httpConnection super.init(rawValue: rawValue, callbackData: callbackData) } + + /// Submit a chunk of data to be sent on an HTTP/1.1 stream. + /// The stream must have specified "chunked" in a "transfer-encoding" header and no body. + /// activate() must be called before any chunks are submitted. + /// - Parameters: + /// - chunk: The chunk to write. If the chunk is empty, it will signify the end of the stream. + /// - endOfStream: Set it true to end the stream and prevent any further write. The last chunk must be send with the value true. + /// - Throws: CommonRunTimeError.crtError + public override func writeChunk(chunk: Data, endOfStream: Bool) async throws { + if endOfStream && !chunk.isEmpty { + /// The HTTP/1.1 does not support writing a chunk and sending the end of the stream simultaneously. + /// It requires sending an empty chunk at the end to finish the stream. + /// To maintain consistency with HTTP/2, if there is data and `endOfStream` is true, then send the data and the empty stream in two calls. + try await writeChunk(chunk: chunk, endOfStream: false) + return try await writeChunk(chunk: Data(), endOfStream: true) + } + + var options = aws_http1_chunk_options() + options.on_complete = onWriteComplete + return try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<(), Error>) in + let continuationCore = ContinuationCore(continuation: continuation) + let stream = IStreamCore( + iStreamable: ByteBuffer(data: chunk)) + options.chunk_data = stream.rawValue + options.chunk_data_size = UInt64(chunk.count) + options.user_data = continuationCore.passRetained() + guard aws_http1_stream_write_chunk( + rawValue, + &options) == AWS_OP_SUCCESS else { + continuationCore.release() + continuation.resume(throwing: CommonRunTimeError.crtError(.makeFromLastError())) + return + } + }) + } +} + +private func onWriteComplete(stream: UnsafeMutablePointer?, + errorCode: Int32, + userData: UnsafeMutableRawPointer!) { + let continuation = Unmanaged>.fromOpaque(userData).takeRetainedValue().continuation + guard errorCode == AWS_OP_SUCCESS else { + continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode))) + return + } + + // SUCCESS + continuation.resume() } diff --git a/Source/AwsCommonRuntimeKit/http/HTTP2Stream.swift b/Source/AwsCommonRuntimeKit/http/HTTP2Stream.swift index c09c74b21..c51c24b14 100644 --- a/Source/AwsCommonRuntimeKit/http/HTTP2Stream.swift +++ b/Source/AwsCommonRuntimeKit/http/HTTP2Stream.swift @@ -44,15 +44,15 @@ public class HTTP2Stream: HTTPStream { /// - data: Data to write. It can be empty /// - endOfStream: Set it true to end the stream and prevent any further write. /// The last frame must be send with the value true. - /// - Throws: - public func writeData(data: Data, endOfStream: Bool) async throws { + /// - Throws: CommonRunTimeError.crtError + public override func writeChunk(chunk: Data, endOfStream: Bool) async throws { var options = aws_http2_stream_write_data_options() options.end_stream = endOfStream options.on_complete = onWriteComplete try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<(), Error>) in let continuationCore = ContinuationCore(continuation: continuation) let stream = IStreamCore( - iStreamable: ByteBuffer(data: data)) + iStreamable: ByteBuffer(data: chunk)) options.data = stream.rawValue options.user_data = continuationCore.passRetained() guard aws_http2_stream_write_data( diff --git a/Source/AwsCommonRuntimeKit/http/HTTPStream.swift b/Source/AwsCommonRuntimeKit/http/HTTPStream.swift index 79405f22c..6ee20ce66 100644 --- a/Source/AwsCommonRuntimeKit/http/HTTPStream.swift +++ b/Source/AwsCommonRuntimeKit/http/HTTPStream.swift @@ -3,7 +3,7 @@ import AwsCHttp import Foundation -/// An base class that represents a single Http Request/Response for both HTTP/1.1 and HTTP/2. +/// An base abstract class that represents a single Http Request/Response for both HTTP/1.1 and HTTP/2. /// Can be used to update the Window size, and get status code. public class HTTPStream { let rawValue: UnsafeMutablePointer @@ -41,7 +41,14 @@ public class HTTPStream { throw CommonRunTimeError.crtError(.makeFromLastError()) } } - + + /// This method must be overridden in each subclass because this function is specific to each subclass. + /// For HTTP/1.1 see ``HTTP1Stream/writeChunk(chunk:endOfStream:)`` + /// For HTTP2: see ``HTTP2Stream/writeChunk(chunk:endOfStream:)`` + public func writeChunk(chunk: Data, endOfStream: Bool) async throws { + fatalError("writeChunk is not implemented for HTTPStream base") + } + deinit { aws_http_stream_release(rawValue) } diff --git a/Test/AwsCommonRuntimeKitTests/http/HTTP2ClientConnectionTests.swift b/Test/AwsCommonRuntimeKitTests/http/HTTP2ClientConnectionTests.swift index dcb7a1a43..db6390efe 100644 --- a/Test/AwsCommonRuntimeKitTests/http/HTTP2ClientConnectionTests.swift +++ b/Test/AwsCommonRuntimeKitTests/http/HTTP2ClientConnectionTests.swift @@ -154,12 +154,11 @@ class HTTP2ClientConnectionTests: HTTPClientTestFixture { http2ManualDataWrites: true) let connection = try await connectionManager.acquireConnection() let streamBase = try connection.makeRequest(requestOptions: http2RequestOptions) - let stream = streamBase as! HTTP2Stream - try stream.activate() + try streamBase.activate() XCTAssertFalse(onCompleteCalled) let data = TEST_DOC_LINE.data(using: .utf8)! for chunk in data.chunked(into: 5) { - try await stream.writeData(data: chunk, endOfStream: false) + try await streamBase.writeChunk(chunk: chunk, endOfStream: false) XCTAssertFalse(onCompleteCalled) } @@ -167,7 +166,7 @@ class HTTP2ClientConnectionTests: HTTPClientTestFixture { // Sleep for 5 seconds to make sure onComplete is not triggerred until endOfStream is true try await Task.sleep(nanoseconds: 5_000_000_000) XCTAssertFalse(onCompleteCalled) - try await stream.writeData(data: Data(), endOfStream: true) + try await streamBase.writeChunk(chunk: Data(), endOfStream: true) semaphore.wait() XCTAssertTrue(onCompleteCalled) XCTAssertNil(httpResponse.error) diff --git a/Test/AwsCommonRuntimeKitTests/http/HTTP2StreamManagerTests.swift b/Test/AwsCommonRuntimeKitTests/http/HTTP2StreamManagerTests.swift index d07668359..5397a7372 100644 --- a/Test/AwsCommonRuntimeKitTests/http/HTTP2StreamManagerTests.swift +++ b/Test/AwsCommonRuntimeKitTests/http/HTTP2StreamManagerTests.swift @@ -127,7 +127,7 @@ class HTT2StreamManagerTests: HTTPClientTestFixture { XCTAssertFalse(onCompleteCalled) let data = TEST_DOC_LINE.data(using: .utf8)! for chunk in data.chunked(into: 5) { - try await stream.writeData(data: chunk, endOfStream: false) + try await stream.writeChunk(chunk: chunk, endOfStream: false) XCTAssertFalse(onCompleteCalled) } @@ -135,7 +135,7 @@ class HTT2StreamManagerTests: HTTPClientTestFixture { // Sleep for 5 seconds to make sure onComplete is not triggerred until endOfStream is true try await Task.sleep(nanoseconds: 5_000_000_000) XCTAssertFalse(onCompleteCalled) - try await stream.writeData(data: Data(), endOfStream: true) + try await stream.writeChunk(chunk: Data(), endOfStream: true) semaphore.wait() XCTAssertTrue(onCompleteCalled) XCTAssertNil(httpResponse.error) diff --git a/Test/AwsCommonRuntimeKitTests/http/HTTPClientTestFixture.swift b/Test/AwsCommonRuntimeKitTests/http/HTTPClientTestFixture.swift index 0c653e915..80b87f9d4 100644 --- a/Test/AwsCommonRuntimeKitTests/http/HTTPClientTestFixture.swift +++ b/Test/AwsCommonRuntimeKitTests/http/HTTPClientTestFixture.swift @@ -185,11 +185,17 @@ class HTTPClientTestFixture: XCBaseTestCase { headers: [HTTPHeader] = [HTTPHeader](), onResponse: HTTPRequestOptions.OnResponse? = nil, onBody: HTTPRequestOptions.OnIncomingBody? = nil, - onComplete: HTTPRequestOptions.OnStreamComplete? = nil + onComplete: HTTPRequestOptions.OnStreamComplete? = nil, + useChunkedEncoding: Bool = false ) throws -> HTTPRequestOptions { - let httpRequest: HTTPRequest = try HTTPRequest(method: method, path: path, body: ByteBuffer(data: body.data(using: .utf8)!)) + let httpRequest: HTTPRequest = try HTTPRequest(method: method, path: path, body: useChunkedEncoding ? nil : ByteBuffer(data: body.data(using: .utf8)!)) httpRequest.addHeader(header: HTTPHeader(name: "Host", value: endpoint)) - httpRequest.addHeader(header: HTTPHeader(name: "Content-Length", value: String(body.count))) + if (useChunkedEncoding) { + httpRequest.addHeader(header: HTTPHeader(name: "Transfer-Encoding", value: "chunked")) + } + else { + httpRequest.addHeader(header: HTTPHeader(name: "Content-Length", value: String(body.count))) + } httpRequest.addHeaders(headers: headers) return getRequestOptions( request: httpRequest, diff --git a/Test/AwsCommonRuntimeKitTests/http/HTTPTests.swift b/Test/AwsCommonRuntimeKitTests/http/HTTPTests.swift index 5b930444b..b811df57a 100644 --- a/Test/AwsCommonRuntimeKitTests/http/HTTPTests.swift +++ b/Test/AwsCommonRuntimeKitTests/http/HTTPTests.swift @@ -7,13 +7,13 @@ import AwsCCommon import AwsCHttp class HTTPTests: HTTPClientTestFixture { - let host = "httpbin.org" + let host = "postman-echo.com" let getPath = "/get" func testGetHTTPSRequest() async throws { let connectionManager = try await getHttpConnectionManager(endpoint: host, ssh: true, port: 443) _ = try await sendHTTPRequest(method: "GET", endpoint: host, path: getPath, connectionManager: connectionManager) - _ = try await sendHTTPRequest(method: "GET", endpoint: host, path: "/delete", expectedStatus: 405, connectionManager: connectionManager) + _ = try await sendHTTPRequest(method: "GET", endpoint: host, path: "/delete", expectedStatus: 404, connectionManager: connectionManager) } func testGetHTTPRequest() async throws { @@ -21,12 +21,12 @@ class HTTPTests: HTTPClientTestFixture { _ = try await sendHTTPRequest(method: "GET", endpoint: host, path: getPath, connectionManager: connectionManager) } - func testPutHttpRequest() async throws { + func testPutHTTPRequest() async throws { let connectionManager = try await getHttpConnectionManager(endpoint: host, ssh: true, port: 443) let response = try await sendHTTPRequest( method: "PUT", endpoint: host, - path: "/anything", + path: "/put", body: TEST_DOC_LINE, connectionManager: connectionManager) @@ -38,6 +38,97 @@ class HTTPTests: HTTPClientTestFixture { XCTAssertEqual(body.data, TEST_DOC_LINE) } + func testHTTPChunkTransferEncoding() async throws { + let connectionManager = try await getHttpConnectionManager(endpoint: host, alpnList: ["http/1.1"]) + let semaphore = DispatchSemaphore(value: 0) + var httpResponse = HTTPResponse() + var onCompleteCalled = false + let httpRequestOptions = try getHTTPRequestOptions( + method: "PUT", + endpoint: host, + path: "/put", + response: &httpResponse, + semaphore: semaphore, + onComplete: { _ in + onCompleteCalled = true + }, + useChunkedEncoding: true) + let connection = try await connectionManager.acquireConnection() + let streamBase = try connection.makeRequest(requestOptions: httpRequestOptions) + try streamBase.activate() + XCTAssertFalse(onCompleteCalled) + let data = TEST_DOC_LINE.data(using: .utf8)! + for chunk in data.chunked(into: 5) { + try await streamBase.writeChunk(chunk: chunk, endOfStream: false) + XCTAssertFalse(onCompleteCalled) + } + + XCTAssertFalse(onCompleteCalled) + // Sleep for 5 seconds to make sure onComplete is not triggerred + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTAssertFalse(onCompleteCalled) + try await streamBase.writeChunk(chunk: Data(), endOfStream: true) + semaphore.wait() + XCTAssertTrue(onCompleteCalled) + XCTAssertNil(httpResponse.error) + XCTAssertEqual(httpResponse.statusCode, 200) + + // Parse json body + struct Response: Codable { + let data: String + } + + let body: Response = try! JSONDecoder().decode(Response.self, from: httpResponse.body) + XCTAssertEqual(body.data, TEST_DOC_LINE) + } + + func testHTTPChunkTransferEncodingWithDataInLastChunk() async throws { + let connectionManager = try await getHttpConnectionManager(endpoint: host, alpnList: ["http/1.1"]) + let semaphore = DispatchSemaphore(value: 0) + var httpResponse = HTTPResponse() + var onCompleteCalled = false + let httpRequestOptions = try getHTTPRequestOptions( + method: "PUT", + endpoint: host, + path: "/put", + response: &httpResponse, + semaphore: semaphore, + onComplete: { _ in + onCompleteCalled = true + }, + useChunkedEncoding: true) + let connection = try await connectionManager.acquireConnection() + let streamBase = try connection.makeRequest(requestOptions: httpRequestOptions) + try streamBase.activate() + XCTAssertFalse(onCompleteCalled) + let data = TEST_DOC_LINE.data(using: .utf8)! + for chunk in data.chunked(into: 5) { + try await streamBase.writeChunk(chunk: chunk, endOfStream: false) + XCTAssertFalse(onCompleteCalled) + } + + XCTAssertFalse(onCompleteCalled) + // Sleep for 5 seconds to make sure onComplete is not triggerred + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTAssertFalse(onCompleteCalled) + + let lastChunkData = Data("last chunk data".utf8) + try await streamBase.writeChunk(chunk: lastChunkData, endOfStream: true) + semaphore.wait() + XCTAssertTrue(onCompleteCalled) + XCTAssertNil(httpResponse.error) + XCTAssertEqual(httpResponse.statusCode, 200) + + // Parse json body + struct Response: Codable { + let data: String + } + + let body: Response = try! JSONDecoder().decode(Response.self, from: httpResponse.body) + XCTAssertEqual(body.data, TEST_DOC_LINE + String(decoding: lastChunkData, as: UTF8.self)) + } + + func testHTTPStreamIsReleasedIfNotActivated() async throws { do { let httpRequestOptions = try getHTTPRequestOptions(method: "GET", endpoint: host, path: getPath)