diff --git a/docs/streaming.md b/docs/streaming.md new file mode 100644 index 000000000..629d9df4f --- /dev/null +++ b/docs/streaming.md @@ -0,0 +1,113 @@ +# Streaming + +As of `paws v-0.8.0+` streaming is supported in `paws`. + + +## Basic usage: + +Example taken from: https://docs.aws.amazon.com/code-library/latest/ug/python_3_bedrock-runtime_code_examples.html + + +### Internal function: + +Paws allows for a function to be passed to the returning stream for processing. + +```r +library(paws) + +client <- bedrockruntime() + +# Set the model ID, e.g., Titan Text Premier. +model_id <- "amazon.titan-text-premier-v1:0" + +# Start a conversation with the user message. +user_message <- "Describe the purpose of a 'hello world' program in one line." +conversation <- list( + list( + role = "user", + content = list(list(text= user_message)), + ) +) + +resp <- client.converse_stream( + modelId=model_id, + messages=conversation, + inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9) +) + +resp$stream(\(chunk) chunk$contentBlockDelta$delta$text) +``` + +### paws_connection: + +`paws` allows for the raw connection to be retrieved. The connection is a sub class of `httr2::httr2_response` class. +This allows paws_connection to be handle both a paws parser or httr2 stream parser. + +```r +library(paws) + +client <- bedrockruntime() + +# Set the model ID, e.g., Titan Text Premier. +model_id <- "amazon.titan-text-premier-v1:0" + +# Start a conversation with the user message. +user_message <- "Describe the purpose of a 'hello world' program in one line." +conversation <- list( + list( + role = "user", + content = list(list(text= user_message)), + ) +) + +resp <- client.converse_stream( + modelId=model_id, + messages=conversation, + inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9) +) + +con <- resp$stream(.connection = TRUE) + +while(!is.null(chunk <- paws_stream_parser(con))) { + print(chunk$contentBlockDelta$delta$text) +} +``` + +Note: the paws_stream_parser return the stream in the response syntax. In this case please check https://paws-r.github.io/docs/bedrockruntime_converse_stream/ + +For full flexibility you can use [httr2::resp_stream_aws](https://httr2.r-lib.org/reference/req_perform_stream.html?search-input=resp_stream_aws) to get the raw response from AWS. + +```r +library(paws) + +client <- bedrockruntime() + +# Set the model ID, e.g., Titan Text Premier. +model_id <- "amazon.titan-text-premier-v1:0" + +# Start a conversation with the user message. +user_message <- "Describe the purpose of a 'hello world' program in one line." +conversation <- list( + list( + role = "user", + content = list(list(text= user_message)), + ) +) + +resp <- client.converse_stream( + modelId=model_id, + messages=conversation, + inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9) +) + +con <- resp$stream(.connection = TRUE) +repeat{ + event <- resp_stream_aws(con) + if (is.null(event)) { + close(con) + break + } + + str(event) +} +``` diff --git a/make.paws/DESCRIPTION b/make.paws/DESCRIPTION index 22e405b2a..79ef2600a 100644 --- a/make.paws/DESCRIPTION +++ b/make.paws/DESCRIPTION @@ -1,7 +1,7 @@ Package: make.paws Type: Package Title: Generate Paws AWS SDKs for R -Version: 0.9.1 +Version: 0.9.2 Authors@R: c( person("David", "Kretch", email = "david.kretch@gmail.com", role = "aut"), person("Adam", "Banker", email = "adam.banker39@gmail.com", role = "aut"), diff --git a/make.paws/R/cran_category.R b/make.paws/R/cran_category.R index da51cd653..6cea60332 100644 --- a/make.paws/R/cran_category.R +++ b/make.paws/R/cran_category.R @@ -1,7 +1,7 @@ #' @include package.R service.R NULL -.paws.common.import.version <- "paws.common (>= 0.7.5)" +.paws.common.import.version <- "paws.common (>= 0.8.0)" # Make all category-level packages. make_categories <- function(sdk_dir, out_dir, categories, service_names) { diff --git a/make.paws/R/custom/s3.R b/make.paws/R/custom/s3.R index ffd2b7636..3d27698be 100644 --- a/make.paws/R/custom/s3.R +++ b/make.paws/R/custom/s3.R @@ -120,7 +120,8 @@ s3_download_file <- function(Bucket, Key, Filename, IfMatch = NULL, IfModifiedSi name = "GetObject", http_method = "GET", http_path = "/{Bucket}/{Key+}", - paginator = list() + paginator = list(), + stream_api = FALSE ) input <- .s3$get_object_input(Bucket = Bucket, IfMatch = IfMatch, IfModifiedSince = IfModifiedSince, IfNoneMatch = IfNoneMatch, IfUnmodifiedSince = IfUnmodifiedSince, Key = Key, Range = Range, ResponseCacheControl = ResponseCacheControl, ResponseContentDisposition = ResponseContentDisposition, ResponseContentEncoding = ResponseContentEncoding, ResponseContentLanguage = ResponseContentLanguage, ResponseContentType = ResponseContentType, ResponseExpires = ResponseExpires, VersionId = VersionId, SSECustomerAlgorithm = SSECustomerAlgorithm, SSECustomerKey = SSECustomerKey, SSECustomerKeyMD5 = SSECustomerKeyMD5, RequestPayer = RequestPayer, PartNumber = PartNumber, ExpectedBucketOwner = ExpectedBucketOwner) output <- .s3$get_object_output() diff --git a/make.paws/R/operations.R b/make.paws/R/operations.R index 7d929d173..21ca38779 100644 --- a/make.paws/R/operations.R +++ b/make.paws/R/operations.R @@ -38,7 +38,8 @@ operation_template <- template( http_method = ${http_method}, http_path = ${http_path}, host_prefix = ${host_prefix}, - paginator = ${paginator} + paginator = ${paginator}, + stream_api = ${stream_api} ) input <- .${service}$${operation_input} output <- .${service}$${operation_output} @@ -66,7 +67,8 @@ make_operation <- function(operation, api, doc_maker) { http_method = quoted(operation$http$method), http_path = quoted(operation$http$requestUri), host_prefix = quoted(operation[["endpoint"]][["hostPrefix"]] %||% ""), - paginator = set_paginator(operation$paginators) + paginator = set_paginator(operation$paginators), + stream_api = set_stream_api(operation) ) } @@ -91,6 +93,10 @@ set_paginator <- function(paginator) { } } +set_stream_api <- function(operation) { + as.character(operation$eventstream %||% FALSE) +} + # Override operation name from extdata/operation_name_override.yml operation_name_override <- function(operation_name) { path <- system_file( diff --git a/make.paws/R/process_api.R b/make.paws/R/process_api.R index 7c7aacac4..0c63916bf 100644 --- a/make.paws/R/process_api.R +++ b/make.paws/R/process_api.R @@ -8,12 +8,13 @@ TEST_DIR <- "tests/testthat" #' #' @keywords internal make_sdk_for_api <- function(api_name, in_dir) { - result <- list() api <- read_api(api_name, in_dir) - result$name <- package_name(api) - result$code <- make_code_files(api) - result$tests <- make_tests_files(api) - result$docs <- make_docs_files(api) + result <- list( + name = package_name(api), + code = make_code_files(api), + tests = make_tests_files(api), + docs = make_docs_files(api) + ) return(result) } @@ -21,12 +22,13 @@ make_sdk_for_api <- function(api_name, in_dir) { # Write code for a given API. make_code_files <- function(api) { - result <- list() - result$operations <- make_operations_files(api, doc_maker = make_docs_short) - result$interfaces <- make_interfaces_files(api) - result$service <- make_service_files(api) - result$custom <- make_custom_operations_files(api) - result$reexports <- make_reexports() + result <- list( + operations = make_operations_files(api, doc_maker = make_docs_short), + interfaces = make_interfaces_files(api), + service = make_service_files(api), + custom = make_custom_operations_files(api), + reexports = make_reexports() + ) return(result) } @@ -90,11 +92,12 @@ make_reexports <- function() { } make_docs_files <- function(api) { - result <- list() - result$operations <- make_operations_files(api, doc_maker = make_docs_long) - result$service <- make_service_files(api) - result$custom <- make_custom_operations_files(api) - result$reexports <- make_reexports() + result <- list( + operations = make_operations_files(api, doc_maker = make_docs_long), + service = make_service_files(api), + custom = make_custom_operations_files(api), + reexports = make_reexports() + ) return(result) } diff --git a/make.paws/R/read_api.R b/make.paws/R/read_api.R index 4e5de4a51..b8b56008d 100644 --- a/make.paws/R/read_api.R +++ b/make.paws/R/read_api.R @@ -1,4 +1,6 @@ # Read a given API's definition and documentation files. +# aws-sdk-js deprecated and apis is not being updated +# TODO: short term migrate to botocore jsons read_api <- function(api_name, path) { api_path <- file.path(path, "apis") region_config_path <- file.path(path, "lib/region_config_data.json") @@ -18,6 +20,7 @@ read_api <- function(api_name, path) { paginators <- jsonlite::read_json(files$paginators) api <- merge_paginators(api, paginators$pagination) } + api <- merge_eventstream(api) region_config <- jsonlite::read_json(region_config_path) api <- merge_region_config(api, region_config) api <- fix_region_config(api) @@ -62,6 +65,24 @@ merge_paginators <- function(api, paginators) { return(api) } +merge_eventstream <- function(api) { + flat_shape <- unlist(api$shapes) + eventstream <- flat_shape[endsWith(names(flat_shape),"eventstream")] + names(eventstream) <- stringr::str_extract(names(eventstream), "([a-zA-Z]+)") + + shape <- flat_shape[endsWith(names(flat_shape), "shape")] + shape <- shape[shape %in% names(eventstream)] + names(shape) <- gsub( + "Output$|Response$", "", stringr::str_extract(names(shape), "([a-zA-Z]+)") + ) + names(eventstream) <- names(shape) + + for (nms in names(eventstream)) { + api$operations[[nms]]$eventstream <- eventstream[nms] + } + return(api) +} + # Returns an API object with region config info attached. Region config info # lists endpoints for each service and region, if different from the default. merge_region_config <- function(api, region_config) { diff --git a/make.paws/inst/templates/reexports_paws.common.R b/make.paws/inst/templates/reexports_paws.common.R index e5cd3c22e..ef495c3c5 100644 --- a/make.paws/inst/templates/reexports_paws.common.R +++ b/make.paws/inst/templates/reexports_paws.common.R @@ -25,3 +25,7 @@ paws.common::credentials #' @importFrom paws.common creds #' @export paws.common::creds + +#' @importFrom paws.common paws_stream_parser +#' @export +paws.common::paws_stream_parser diff --git a/make.paws/tests/testthat/test_operations.R b/make.paws/tests/testthat/test_operations.R index 84772d0ec..e8671d77b 100644 --- a/make.paws/tests/testthat/test_operations.R +++ b/make.paws/tests/testthat/test_operations.R @@ -91,7 +91,8 @@ test_that("make_operation", { http_method = \"POST\", http_path = \"/abc\", host_prefix = \"\", - paginator = list() + paginator = list(), + stream_api = FALSE ) input <- .api$operation_input(Input1 = Input1, Input2 = Input2, Input3 = Input3) output <- .api$operation_output() diff --git a/make.paws/tests/testthat/test_read_api.R b/make.paws/tests/testthat/test_read_api.R index 454851aa0..dc58dd9c7 100644 --- a/make.paws/tests/testthat/test_read_api.R +++ b/make.paws/tests/testthat/test_read_api.R @@ -7,7 +7,10 @@ test_that("read_api", { write_json(list(foo = "examples"), file.path(api_path, "foo-2018-11-01.examples.json")) write_json(list(foo = "min"), file.path(api_path, "foo-2018-11-01.min.json")) - write_json(list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz")), file.path(api_path, "foo-2018-11-01.normal.json")) + write_json( + list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz"), shapes = list(foo = list(eventstream = "TRUE"))), + file.path(api_path, "foo-2018-11-01.normal.json") + ) write_json(list(foo = "paginators"), file.path(api_path, "foo-2018-11-01.paginators.json")) write_json(list(foo = "wrong1"), file.path(api_path, "foo-2017-11-01.examples.json")) diff --git a/paws.common/DESCRIPTION b/paws.common/DESCRIPTION index d9b380d91..6c66cdca9 100644 --- a/paws.common/DESCRIPTION +++ b/paws.common/DESCRIPTION @@ -1,7 +1,7 @@ Package: paws.common Type: Package Title: Paws Low-Level Amazon Web Services API -Version: 0.7.7.9000 +Version: 0.8.0 Authors@R: c( person("David", "Kretch", email = "david.kretch@gmail.com", role = "aut"), person("Adam", "Banker", email = "adam.banker39@gmail.com", role = "aut"), @@ -64,9 +64,6 @@ Collate: 'head_bucket.R' 'http_status.R' 'error.R' - 'tags.R' - 'xmlutil.R' - 'stream.R' 'custom_s3.R' 'handlers_core.R' 'handlers_ec2query.R' @@ -74,7 +71,10 @@ Collate: 'handlers_query.R' 'handlers_rest.R' 'handlers_restjson.R' + 'tags.R' + 'xmlutil.R' 'handlers_restxml.R' + 'handlers_stream.R' 'idempotency.R' 'jsonutil.R' 'onLoad.R' diff --git a/paws.common/NAMESPACE b/paws.common/NAMESPACE index cf206d3ca..259f5ac49 100644 --- a/paws.common/NAMESPACE +++ b/paws.common/NAMESPACE @@ -13,6 +13,7 @@ S3method(is_empty_xml,character) S3method(is_empty_xml,default) S3method(is_empty_xml,list) S3method(is_empty_xml,raw) +S3method(print,PawsStreamHandler) export(config) export(credentials) export(creds) @@ -31,6 +32,7 @@ export(paginate_lapply) export(paginate_sapply) export(paws_config_log) export(paws_reset_cache) +export(paws_stream_parser) export(populate) export(send_request) export(set_config) @@ -42,9 +44,10 @@ export(tag_has) export(type) importFrom(Rcpp,evalCpp) importFrom(curl,curl_unescape) -importFrom(httr2,req_body_raw) +importFrom(digest,digest) importFrom(httr2,req_options) importFrom(httr2,req_perform) +importFrom(httr2,req_perform_connection) importFrom(httr2,request) importFrom(stats,runif) importFrom(utils,flush.console) diff --git a/paws.common/NEWS.md b/paws.common/NEWS.md index 1380ac9b0..6d85b9eff 100644 --- a/paws.common/NEWS.md +++ b/paws.common/NEWS.md @@ -1,8 +1,11 @@ -# paws.common 0.7.7.9000 +# paws.common 0.8.0 * migrate backend from httr to httr2 * enrich sso message (#844). Thanks to @hadley for raising issue. * attempt to automatically set/refresh `sso` credentials by calling `aws cli` (#844) * moved api log level to `debug` and `trace`. This is to prevent `info` level being saturated by api calls. +* migrate backend `httr` to `httr2` +* new `PawsStreamHandler`, allows paws to handle aws stream event (#842). Thankyou to @hadley for developing the initial solution in `httr2`. +* deprecated custom handler for `s3_unmarshal_select_object_content` # paws.common 0.7.7 * fix unix time expiration check diff --git a/paws.common/R/custom_s3.R b/paws.common/R/custom_s3.R index d90df9a6e..eb9268210 100644 --- a/paws.common/R/custom_s3.R +++ b/paws.common/R/custom_s3.R @@ -1,5 +1,4 @@ #' @include service.R -#' @include stream.R #' @include util.R #' @include error.R #' @include head_bucket.R @@ -236,18 +235,6 @@ content_md5 <- function(request) { ################################################################################ -s3_unmarshal_select_object_content <- function(request) { - if (request$operation$name != "SelectObjectContent") { - return(request) - } - payload <- stream_decode(request$http_response$body) - request$data <- populate(list(Payload = payload), request$data) - request$http_response$body <- raw() - return(request) -} - -################################################################################ - s3_unmarshal_get_bucket_location <- function(request) { if (request$operation$name != "GetBucketLocation") { return(request) @@ -269,6 +256,9 @@ s3_unmarshal_get_bucket_location <- function(request) { ################################################################################ s3_unmarshal_error <- function(request) { + request$http_response$body <- get_connection_error( + request$http_response$body, request$operation$stream_api + ) data <- tryCatch( decode_xml(request$http_response$body), error = function(e) NULL @@ -349,13 +339,15 @@ s3_redirect_from_error <- function(request) { return(request) } error_code <- request$http_response$status_code - # Exit s3_redirect_from_error function if initial request is successful # https://docs.aws.amazon.com/waf/latest/developerguide/customizing-the-response-status-codes.html http_success_code <- c(200, 201, 202, 204, 206) if (error_code %in% http_success_code) { return(request) } + request$http_response$body <- get_connection_error( + request$http_response$body, request$operation$stream_api + ) error <- decode_xml(request$http_response$body)$Error if (!can_be_redirected(request, error_code, error)) { return(request) @@ -484,7 +476,6 @@ set_request_url <- function(original_endpoint, ################################################################################ handle_copy_source_param <- function(request) { - if (!(request$operation$name %in% c("CopyObject", "CopyPart")) | isTRUE(request$context$s3_redirect)) { return(request) } @@ -554,10 +545,6 @@ customizations$s3 <- function(handlers) { content_md5 ) handlers$send <- handlers_add_back(handlers$send, s3_redirect_from_error) - handlers$unmarshal <- handlers_add_front( - handlers$unmarshal, - s3_unmarshal_select_object_content - ) handlers$unmarshal <- handlers_add_back( handlers$unmarshal, s3_unmarshal_get_bucket_location diff --git a/paws.common/R/handlers_jsonrpc.R b/paws.common/R/handlers_jsonrpc.R index 6604ca0c0..188006496 100644 --- a/paws.common/R/handlers_jsonrpc.R +++ b/paws.common/R/handlers_jsonrpc.R @@ -34,14 +34,26 @@ jsonrpc_unmarshal_meta <- function(request) { jsonrpc_unmarshal <- function(request) { body <- request$http_response$body if (data_filled(request) && length(body) > 0) { - data <- decode_json(body) - request$data <- tag_del(json_parse(data, request$data)) + if (request$operation$stream_api) { + request$data <- stream_unmarshal(request, body, json_parse_stream) + } else { + data <- decode_json(body) + request$data <- tag_del(json_parse(data, request$data)) + } } return(request) } +json_parse_stream <- function(bytes, format) { + payload <- decode_json(bytes) + json_parse(payload, format) +} + # Unmarshal errors from a JSON RPC response. jsonrpc_unmarshal_error <- function(request) { + request$http_response$body <- get_connection_error( + request$http_response$body, request$operation$stream_api + ) error <- decode_json(request$http_response$body) if (length(error) == 0) { return(request) diff --git a/paws.common/R/handlers_restjson.R b/paws.common/R/handlers_restjson.R index 7f27ecf30..7ac3747d7 100644 --- a/paws.common/R/handlers_restjson.R +++ b/paws.common/R/handlers_restjson.R @@ -50,8 +50,11 @@ restjson_unmarshal <- function(request) { # Unmarshal errors from a REST JSON protocol API response. restjson_unmarshal_error <- function(request) { + if (request$operation$stream_api) { + con <- request$http_response$body + request$http_response$body <- stream_raw(con$body) + } error <- decode_json(request$http_response$body) - code <- request$http_response$header[["X-Amzn-Errortype"]] if (is.null(code)) code <- error$code if (is.null(code)) code <- error$`__type` diff --git a/paws.common/R/handlers_restxml.R b/paws.common/R/handlers_restxml.R index c8d37f06b..01a4e2544 100644 --- a/paws.common/R/handlers_restxml.R +++ b/paws.common/R/handlers_restxml.R @@ -1,3 +1,5 @@ +#' @include xmlutil.R + # Build the request body for the REST XML protocol. restxml_build <- function(request) { request <- rest_build(request) @@ -19,9 +21,13 @@ restxml_unmarshal <- function(request) { t <- rest_payload_type(request$data) if (t == "structure" || t == "") { data <- request$http_response$body - interface <- request$data - result_name <- paste0(request$operation$name, "Result") - request$data <- xml_unmarshal(data, interface, result_name) + if (request$operation$stream_api) { + request$data <- stream_unmarshal(request, data, xml_parse_stream) + } else { + interface <- request$data + result_name <- paste0(request$operation$name, "Result") + request$data <- xml_unmarshal(data, interface, result_name) + } } else { request <- rest_unmarshal(request) } @@ -30,6 +36,28 @@ restxml_unmarshal <- function(request) { # Unmarshal errors from a REST XML response. restxml_unmarshal_error <- function(request) { + request$http_response$body <- get_connection_error( + request$http_response$body, request$operation$stream_api + ) request <- query_unmarshal_error(request) return(request) } + +xml_parse_stream <- function(bytes, interface) { + if (isTRUE(tag_get(interface, "event"))) { + for (nms in names(interface)) { + inter <- interface[[nms]] + if (isTRUE(tag_get(inter, "eventpayload"))) { + type <- tag_get(inter, "type") + if (type == "blob") { + interface[[nms]] <- bytes + } else if (type == "string") { + interface[[nms]] <- raw_to_utf8(bytes) + } else { + interface <- xml_stream(bytes, interface) + } + } + } + } + return(interface) +} diff --git a/paws.common/R/handlers_stream.R b/paws.common/R/handlers_stream.R new file mode 100644 index 000000000..84ff4342d --- /dev/null +++ b/paws.common/R/handlers_stream.R @@ -0,0 +1,330 @@ +#' @importFrom digest digest + + +.PAYLOAD_KB <- 1024 * 65 + +#' @title Iterate over AWS Event Stream connection +#' @param FUN function to iterate over stream connection. +#' @param con A streaming response created by \code{paws_stream_handler}. +#' @param .connection return \code{paws_connection} object a subclass of \code{httr2::req_perform_connection} (default \code{FALSE}) +#' @name paws_stream +#' @return list of responses from the operation or a \code{paws_connection} object +#' @examples +#' \dontrun{ +#' # Developed from: +#' # https://docs.aws.amazon.com/code-library/latest/ug/python_3_bedrock-runtime_code_examples.html +#' library(paws) +#' +#' # Create a Bedrock Runtime client in the AWS Region you want to use. +#' client <- bedrockruntime(region = "us-east-1") +#' +#' # Set the model ID, e.g., Titan Text Premier. +#' model_id <- "amazon.titan-text-premier-v1:0" +#' +#' # Start a conversation with the user message. +#' user_message = "Describe the purpose of a 'hello world' program in one line." +#' conversation <- list( +#' list( +#' role = "user", +#' content = list(list(text=user_message)) +#' ) +#' ) +#' +#' resp <- client$converse_stream( +#' modelId = model_id, +#' messages = conversation, +#' inferenceConfig = list(maxTokens=512, temperature=0.5, topP=0.9) +#' ) +#' resp$stream(\(chunk) chunk$contentBlockDelta$delta$text) +#' +#' # Or handle stream utilising paws_stream_parser +#' while(!is.null(event <- paws_stream_parser(con))) { +#' print(chunk$contentBlockDelta$delta$text) +#' } +#' +#' # or return httr2 resp_perform_connection +#' con <- resp$stream(.connection = T) +#' +#' while (!is.null(event <- resp_stream_aws(con))) { +#' str(event) +#' } +#' close(con) +#' +#' } +NULL + +#' @name paws_stream +paws_stream_handler <- function(FUN, .connection = FALSE) { + stop("This is an abstract function please don't call directly", .call = FALSE) +} + +StreamHandler <- function(body, unmarshal, interface, metadata) { + con <- paws_con(body, unmarshal, interface) + paws_stream_handler <- function(FUN, .connection = FALSE) { + if (isTRUE(.connection)) { + return(con) + } + result <- list() + while(!is.null(resp <- paws_stream_parser(con))) { + result[[length(result) + 1]] <- FUN(resp) + } + return(result) + } + class(paws_stream_handler) <- "PawsStreamHandler" + return(paws_stream_handler) +} + +paws_con <- function(con, unmarshal, interface) { + con$paws_metadata <- list(unmarshal=unmarshal, interface=interface) + class(con) <- c("paws_connection", class(con)) + return(con) +} + +#' @export +print.PawsStreamHandler <- function(x, ...) { + metadata <- environment(x)$metadata + op_name <- tolower(gsub("(.)([A-Z])", "\\1_\\2", metadata$operation_name)) + msg <- sprintf(c( + "", + "Please check return object for: %1$s_%2$s", + "https://www.paws-r-sdk.com/docs/%1$s_%2$s/" + ), + metadata$service_name, + op_name + ) + cat(msg, file = stdout(), sep = "\n") +} + +#' @name paws_stream +#' @export +paws_stream_parser <- function(con) { + if (!isIncomplete(con$body)) { + close(con) + return(NULL) + } + buffer <- readBin(con$body, raw(), n = .PAYLOAD_KB) + if (is.null(boundary <- aws_boundary(buffer))) { + close(con) + return(NULL) + } + return(eventstream_parser( + buffer, + unmarshal = con$paws_metadata$unmarshal, + interface = con$paws_metadata$interface, + boundary = boundary + ) + ) +} + +################ stream unmarshal ################ +stream_unmarshal <- function(request, body, unmarshal) { + payload <- tag_get(request$data, "payload") + shape <- tag_del(request$data) + shape[[payload]] <- StreamHandler( + body, + unmarshal, + request$data[[payload]], + list( + operation_name = request$operation$name, + service_name = request$client_info$service_name + ) + ) + return(shape) +} + +############## stream error ############## +get_connection_error <- function(payload, stream_api) { + if (stream_api && !is.raw(payload)) { + return(stream_raw(payload$body)) + } + return(payload) +} + +stream_raw <- function(con) { + on.exit(close(con)) + total <- raw() + while (isIncomplete(con)) { + total <- c(total, readBin(con, raw(), n = .PAYLOAD_KB)) + } + return(total) +} + +################ parse event stream ################ +eventstream_parser <- function(buffer, unmarshal, interface, boundary) { + # chunk loop + while (!is.null(boundary)) { + result <- split_buffer(buffer, boundary) + data <- parse_aws_event(result$matched) + (nms <- data$headers[[":event-type"]]) + interface[[nms]] <- unmarshal( + data$payload, interface[[nms]] + ) + buffer <- result$remaining + boundary <- aws_boundary(buffer) + } + return(tag_del(interface)) +} + +aws_boundary <- function(buffer) { + len <- length(buffer) + # No valid AWS event message is less than 16 bytes + if (len < 16) { + return(NULL) + } + # Read first 4 bytes as a big endian number + event_size <- parse_int32(buffer[1:4]) + if (event_size > len) { + return(NULL) + } + event_size + 1 +} + +# Modified from httr2: +# https://github.com/r-lib/httr2/blob/e972770199f674eca4c64ca8161235e5745683dd/R/utils.R#L314C1-L326C2 +split_buffer <- function (buffer, split_at) { + list( + matched = slice(buffer, end = split_at), + remaining = slice(buffer, start = split_at) + ) +} + +slice <- function(vector, start = 1, end = length(vector) + 1) { + if (start == end) { + vector[FALSE] + } else { + vector[start:(end - 1)] + } +} + +################ aws parsers ################ +# Implementation from https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.js +# Modified from httr2: https://github.com/r-lib/httr2/blob/main/R/resp-stream.R +parse_aws_event <- function(bytes) { + i <- 1 + read_bytes <- function(n) { + if (n == 0) { + return(raw()) + } + out <- bytes[i:(i + n - 1)] + i <<- i + n + out + } + + # Parse prelude + # The prelude for an event stream message has the following format: + # [total_length][header_length][prelude_crc] + prelude_bytes <- read_bytes(8) + tot_hd <- parse_int32(prelude_bytes, 8) + total_length <- tot_hd[1] + header_length <- tot_hd[2] + + # validate perlude checksum + validate_checksum(prelude_bytes, paste(read_bytes(4), collapse = "")) + + if (total_length != length(bytes)) { + stop("AWS event metadata doesn't match supplied bytes") + } + + # Parse headers + headers <- list() + while(i <= 12 + header_length) { + name_length <- parse_int8(read_bytes(1), 1) + name <- rawToChar(read_bytes(name_length)) + type <- parse_int8(read_bytes(1), 1) + delayedAssign("len", parse_int16(read_bytes(2), 2)) + value <- switch( + type_enum(type), + 'TRUE' = TRUE, + 'FALSE' = FALSE, + BYTE = parse_int8(read_bytes(1), 1), + SHORT = parse_int16(read_bytes(2), 2), + INTEGER = parse_int32(read_bytes(4), 4), + LONG = parse_int64(read_bytes(8)), + BYTE_ARRAY = read_bytes(len), + CHARACTER = rawToChar(read_bytes(len)), + TIMESTAMP = parse_int64(read_bytes(8)), + UUID = paste(read_bytes(16), collapse = ""), + ) + headers[[name]] <- value + } + + # Parse message + payload_raw <- read_bytes(total_length - i - 4 + 1) + + # validate the message checksum + validate_checksum( + bytes[1:(total_length-4)], paste(read_bytes(4), collapse = "") + ) + + list( + total_length = total_length, + header_length = header_length, + payload = payload_raw, + headers = headers + ) +} + +################ Helpers ################ +type_enum <- function(value) { + if (value < 0 || value > 10) { + stopf("Unsupported type %s.", value) + } + switch(value + 1, + "TRUE", + "FALSE", + "BYTE", + "SHORT", + "INTEGER", + "LONG", + "BYTE_ARRAY", + "CHARACTER", + "TIMESTAMP", + "UUID", + ) +} + +big_endian <- function(vec) { + c( + vec[8:1], vec[16:9], vec[24:17], vec[32:25], + vec[40:33], vec[48:41], vec[56:49], vec[64:57] + ) +} + +# Convert raw vector into integers with big-endian +parse_int64 <- function(x) { + bits <- as.integer(big_endian(rawToBits(x))) + sum(bits[-1] * 2^(62:0)) - bits[[1]] * 2^63 +} + +parse_int32 <- function(x, len=length(x)) { + readBin(x, "integer", n=len, size=4, endian = "big") +} + +parse_int16 <- function(x, len=length(x)) { + readBin(x, "integer", n=len, size=2, endian = "big") +} + +parse_int8 <- function(x, len=length(x)) { + readBin(x, "integer", n=len, size=1, endian = "big") +} + +hex_to_raw <- function(x) { + x <- gsub("(\\s|\n)+", "", x) + + pairs <- substring(x, seq(1, nchar(x), by = 2), seq(2, nchar(x), by = 2)) + as.raw(strtoi(pairs, 16L)) +} + +# Get the CRC of a raw vector. +crc32 <- function(raw) { + return(digest(raw, algo = "crc32", serialize = FALSE)) +} + +validate_checksum <- function(data, crc) { + computed_checksum <- crc32(data) + if (computed_checksum != crc) { + stopf( + 'Checksum mismatch: expected %s, calculated %s', crc, computed_checksum + ) + } +} diff --git a/paws.common/R/net.R b/paws.common/R/net.R index 2cea12445..f1aa0a938 100644 --- a/paws.common/R/net.R +++ b/paws.common/R/net.R @@ -1,4 +1,4 @@ -#' @importFrom httr2 request req_options req_perform +#' @importFrom httr2 request req_options req_perform req_perform_connection #' @include struct.R #' @include url.R @@ -31,7 +31,8 @@ HttpRequest <- struct( timeout = NULL, response = NULL, ctx = list(), - dest = NULL + dest = NULL, + stream_api = FALSE ) # Construct an HTTP response object. @@ -43,7 +44,6 @@ HttpResponse <- struct( proto_minor = NA, header = list(), body = NULL, - connection = NULL, content_length = NA, transfer_encoding = list(), close = logical(0), @@ -63,7 +63,7 @@ HttpResponse <- struct( # @param timeout Timeout for the entire request. # @param dest Control where the response body is written # @param header list of HTTP headers to add to the request -new_http_request <- function(method, url, body = NULL, close = FALSE, connect_timeout = NULL, timeout = NULL, dest = NULL, header = list()) { +new_http_request <- function(method, url, body = NULL, close = FALSE, connect_timeout = NULL, timeout = NULL, dest = NULL, stream_api = FALSE, header = list()) { if (method == "") { method <- "GET" } @@ -84,7 +84,8 @@ new_http_request <- function(method, url, body = NULL, close = FALSE, connect_ti close = close, connect_timeout = connect_timeout, timeout = timeout, - dest = dest + dest = dest, + stream_api = stream_api ) return(req) } @@ -120,7 +121,7 @@ issue <- function(http_request) { header = resp$headers, content_length = as.integer(resp$headers$`content-length`), # Prevent reading in data when output is set - body = resp_body(resp, http_request$dest) + body = resp_body(resp, http_request$dest, http_request$stream_api) ) # Decode gzipped response bodies that are not automatically decompressed @@ -137,7 +138,9 @@ request_aws <- function(url, http_request) { req$method <- http_request$method req$headers <- http_request$header req$policies$error_is_error <- function(resp) FALSE - req$body <- list(data = http_request$body, type = "raw", content_type = "", params = list()) + if (!is.null(http_request$body)) { + req$body <- list(data = http_request$body, type = "raw", content_type = "", params = list()) + } req <- req_options( .req = req, timeout_ms = http_request$timeout * 1000, @@ -145,11 +148,17 @@ request_aws <- function(url, http_request) { debugfunction = paws_debug, verbose = isTRUE(getOption("paws.log_level") >= 3L) ) - return(req_perform(req, path = http_request$dest)) + if (http_request$stream_api) { + return(req_perform_connection(req)) + } else { + return(req_perform(req, path = http_request$dest)) + } } -resp_body <- function(resp, path) { - if (is.null(path)) { +resp_body <- function(resp, path, stream_api) { + if (stream_api) { + body <- resp + } else if (is.null(path)) { body <- resp$body # return error message if call has failed or needs redirecting } else if (resp$status_code %in% c(301, 400)) { diff --git a/paws.common/R/request.R b/paws.common/R/request.R index 4b2596c75..38ea57ce4 100644 --- a/paws.common/R/request.R +++ b/paws.common/R/request.R @@ -9,6 +9,7 @@ Operation <- struct( http_path = "", host_prefix = "", paginator = list(), + stream_api = FALSE, before_presign_fn = function() {} ) @@ -22,7 +23,8 @@ Operation <- struct( #' @param http_method The HTTP method, e.g. `"GET"` or `"POST"`. #' @param http_path The HTTP path. #' @param host_prefix The HTTP prefix -#' @param paginator Currently unused. +#' @param paginator List input_token and output_token. +#' @param stream_api Set if operation is stream api or not #' @param before_presign_fn Currently unused. #' #' @family API request functions @@ -37,7 +39,7 @@ Operation <- struct( #' ) #' #' @export -new_operation <- function(name, http_method, http_path, host_prefix, paginator, before_presign_fn = NULL) { +new_operation <- function(name, http_method, http_path, host_prefix, paginator, stream_api = FALSE, before_presign_fn = NULL) { args <- as.list(environment()) args[lengths(args) == 0] <- NULL return(do.call(Operation, args)) @@ -111,7 +113,8 @@ new_request <- function(client, operation, params, data, dest = NULL) { body = NULL, close = client$config$close_connection, connect_timeout = client$config$connect_timeout, - dest = dest + dest = dest, + stream_api = operation$stream_api ) http_req$url <- parse_url( @@ -120,7 +123,7 @@ new_request <- function(client, operation, params, data, dest = NULL) { http_req <- sanitize_host_for_header(http_req) - r <- Request( + req <- Request( config = client$config, client_info = client$client_info, handlers = client$handlers, @@ -137,7 +140,7 @@ new_request <- function(client, operation, params, data, dest = NULL) { # TODO: Custom initialization. - return(r) + return(req) } #' Send a request and handle the response diff --git a/paws.common/R/stream.R b/paws.common/R/stream.R deleted file mode 100644 index 1cc217526..000000000 --- a/paws.common/R/stream.R +++ /dev/null @@ -1,110 +0,0 @@ -#' @include util.R -#' @include xmlutil.R -NULL - -# Decode all messages in a HTTP response body using the format specfied in -# https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html -stream_decode <- function(data) { - return(stream_decode_messages(data)) -} - -# Decode a series of variable-length messages and return them as a list. -stream_decode_messages <- function(messages) { - result <- list() - offset <- 1 - records <- "" - while (offset < length(messages)) { - message <- stream_decode_message(messages, offset = offset) - message_length <- message$total_length - event_type <- message$header[["event-type"]] - if (event_type == "Records") { - records <- paste0(records, message$payload) - } else if (event_type %in% c("Stats", "Progress")) { - result[[event_type]] <- list(Details = xml_to_list(message$payload)[[1]]) - } - offset <- offset + message_length - } - result[["Records"]] <- list(Payload = records) - return(result) -} - -# Decode a single message starting at `offset`. -stream_decode_message <- function(messages, offset) { - template <- list( - list(name = "message_length", type = "number", length = 4), - list(name = "header_length", type = "number", length = 4), - list(name = "prelude_crc", type = "binary", length = 4), - list(name = "header", type = "binary", length = "header_length"), - list(name = "payload", type = "string", length = "message_length - header_length - 16"), - list(name = "message_crc", type = "binary", length = 4) - ) - start <- offset - message_length <- stream_decode_number(messages[start:(start + 4 - 1)]) - stop <- start + message_length - result <- stream_decode_chunk(messages[start:stop], template) - result$header <- stream_decode_headers(result$header) - return(result) -} - -# Decode a message's headers and return them as a list. -stream_decode_headers <- function(headers) { - result <- new.env() - offset <- 1 - while (offset < length(headers)) { - header <- stream_decode_header(headers, offset) - offset <- offset + header$total_length - name <- substring(header$name, 2) - result[[name]] <- header$value - } - return(as.list(result)) -} - -# Decode a single header. -stream_decode_header <- function(header, offset) { - template <- list( - list(name = "name_length", type = "number", length = 1), - list(name = "name", type = "string", length = "name_length"), - list(name = "value_type", type = "number", length = 1), - list(name = "value_length", type = "number", length = 2), - list(name = "value", type = "string", length = "value_length") - ) - result <- stream_decode_chunk(header[offset:length(header)], template) - return(result) -} - -# Decode a binary message given `template`. -stream_decode_chunk <- function(message, template) { - start <- 1 - result <- new.env() - for (element in template) { - name <- element$name - length <- element$length - if (is.character(length)) length <- eval(str2expression(length), envir = result) - stop <- start + length - 1 - data <- message[start:stop] - data <- switch(element$type, - "binary" = data, - "number" = stream_decode_number(data), - "string" = stream_decode_string(data) - ) - result[[name]] <- data - start <- stop + 1 - } - result$total_length <- start - 1 - return(as.list(result)) -} - -# Convert big-endian binary data to a number. -stream_decode_number <- function(raw) { - return(readBin(raw, what = "integer", endian = "big", size = length(raw))) -} - -# Convery binary data to a string. -stream_decode_string <- function(raw) { - return(rawToChar(raw)) -} - -# Get the CRC of a raw vector. -crc <- function(raw) { - return(digest::digest(raw, algo = "crc32", serialize = FALSE)) -} diff --git a/paws.common/R/xmlutil.R b/paws.common/R/xmlutil.R index 38cb5ecae..c0a96e44e 100644 --- a/paws.common/R/xmlutil.R +++ b/paws.common/R/xmlutil.R @@ -468,3 +468,23 @@ transpose <- function(x) { } .mapply(list, x, NULL) } + +############## stream ############## +xml_stream <- function(bytes, interface) { + payload <- xml2::read_xml(bytes) + xml_nms <- xml2::xml_name(payload) + + in_nms <- names(interface) + if (xml_nms %in% in_nms) { + return(xml_parse(payload, interface, xml_nms)) + } + root <- xml2::xml_contents(payload) + xml_nms <- xml2::xml_name(root) + + if (any(xml_nms %in% in_nms)) { + return(xml_parse(root, interface, xml_nms)) + } + result <- list(xml_parse(root, interface[[in_nms]], xml_nms)) + names(result) <- in_nms + return(result) +} diff --git a/paws.common/man/new_operation.Rd b/paws.common/man/new_operation.Rd index f159f0d4a..5228d7dbd 100644 --- a/paws.common/man/new_operation.Rd +++ b/paws.common/man/new_operation.Rd @@ -10,6 +10,7 @@ new_operation( http_path, host_prefix, paginator, + stream_api = FALSE, before_presign_fn = NULL ) } @@ -22,7 +23,9 @@ new_operation( \item{host_prefix}{The HTTP prefix} -\item{paginator}{Currently unused.} +\item{paginator}{List input_token and output_token.} + +\item{stream_api}{Set if operation is stream api or not} \item{before_presign_fn}{Currently unused.} } diff --git a/paws.common/man/paws_config_log.Rd b/paws.common/man/paws_config_log.Rd index ee4b34284..950a9488a 100644 --- a/paws.common/man/paws_config_log.Rd +++ b/paws.common/man/paws_config_log.Rd @@ -13,6 +13,7 @@ paws_config_log( \arguments{ \item{level}{(integer) to determine the level logging threshold. \itemize{ +\item \code{5L}: TRACE \item \code{4L}: DEBUG \item \code{3L}: INFO \item \code{2L}: WARNING diff --git a/paws.common/man/paws_stream.Rd b/paws.common/man/paws_stream.Rd new file mode 100644 index 000000000..ff86a1bab --- /dev/null +++ b/paws.common/man/paws_stream.Rd @@ -0,0 +1,68 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/handlers_stream.R +\name{paws_stream} +\alias{paws_stream} +\alias{paws_stream_handler} +\alias{paws_stream_parser} +\title{Iterate over AWS Event Stream connection} +\usage{ +paws_stream_handler(FUN, .connection = FALSE) + +paws_stream_parser(con) +} +\arguments{ +\item{FUN}{function to iterate over stream connection.} + +\item{.connection}{return \code{paws_connection} object a subclass of \code{httr2::req_perform_connection} (default \code{FALSE})} + +\item{con}{A streaming response created by \code{paws_stream_handler}.} +} +\value{ +list of responses from the operation or a \code{paws_connection} object +} +\description{ +Iterate over AWS Event Stream connection +} +\examples{ +\dontrun{ +# Developed from: +# https://docs.aws.amazon.com/code-library/latest/ug/python_3_bedrock-runtime_code_examples.html +library(paws) + +# Create a Bedrock Runtime client in the AWS Region you want to use. +client <- bedrockruntime(region = "us-east-1") + +# Set the model ID, e.g., Titan Text Premier. +model_id <- "amazon.titan-text-premier-v1:0" + +# Start a conversation with the user message. +user_message = "Describe the purpose of a 'hello world' program in one line." +conversation <- list( + list( + role = "user", + content = list(list(text=user_message)) + ) +) + +resp <- client$converse_stream( + modelId = model_id, + messages = conversation, + inferenceConfig = list(maxTokens=512, temperature=0.5, topP=0.9) +) +resp$stream(\(chunk) chunk$contentBlockDelta$delta$text) + +# Or handle stream utilising paws_stream_parser +while(!is.null(event <- paws_stream_parser(con))) { + print(chunk$contentBlockDelta$delta$text) +} + +# or return httr2 resp_perform_connection +con <- resp$stream(.connection = T) + +while (!is.null(event <- resp_stream_aws(con))) { + str(event) +} +close(con) + +} +} diff --git a/paws.common/tests/testthat/test_custom_s3.R b/paws.common/tests/testthat/test_custom_s3.R index 7c0f9db24..61084e4bc 100644 --- a/paws.common/tests/testthat/test_custom_s3.R +++ b/paws.common/tests/testthat/test_custom_s3.R @@ -161,90 +161,6 @@ test_that("s3_unmarshal_get_bucket_location", { expect_equal(out$LocationConstraint, "eu-west-1") }) -test_that("s3_unmarshal_select_object_content", { - op <- Operation(name = "SelectObjectContent") - svc <- Client() - svc$handlers$unmarshal <- HandlerList( - s3_unmarshal_select_object_content, - restxml_unmarshal - ) - - op_output2 <- Structure( - Payload = Structure( - Records = Structure( - Payload = Scalar(.tags = list(eventpayload = TRUE, type = "blob")) - ), - Stats = Structure( - Details = Structure( - BytesScanned = Scalar(.tags = list(type = "long")), - BytesProcessed = Scalar(.tags = list(type = "long")), - BytesReturned = Scalar(.tags = list(type = "long")), - .tags = list(eventpayload = TRUE) - ) - ), - Progress = Structure( - Details = Structure( - BytesScanned = Scalar(.tags = list(type = "long")), - BytesProcessed = Scalar(.tags = list(type = "long")), - BytesReturned = Scalar(.tags = list(type = "long")), - .tags = list(eventpayload = TRUE) - ) - ), - Cont = Scalar(.tags = list(event = TRUE)), - End = Scalar(.tags = list(event = TRUE)) - ), - .tags = list(payload = "Payload") - ) - - body <- as.raw( - c( - 0x00, 0x00, 0x00, 0x6b, 0x00, 0x00, 0x00, 0x55, 0x90, - 0xc1, 0x3c, 0x4e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, - 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x07, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x73, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, - 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x18, 0x61, 0x70, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6f, - 0x63, 0x74, 0x65, 0x74, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a, 0x60, 0x17, 0xc3, 0x4c, 0x00, - 0x00, 0x00, 0xcd, 0x00, 0x00, 0x00, 0x43, 0x9b, 0x72, 0xe3, 0x29, - 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, - 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, - 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, - 0x65, 0x07, 0x00, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x0d, 0x3a, - 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, - 0x65, 0x07, 0x00, 0x08, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x78, 0x6d, - 0x6c, 0x3c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x20, 0x78, 0x6d, 0x6c, - 0x6e, 0x73, 0x3d, 0x22, 0x22, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, - 0x73, 0x53, 0x63, 0x61, 0x6e, 0x6e, 0x65, 0x64, 0x3e, 0x31, 0x30, - 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x63, 0x61, 0x6e, - 0x6e, 0x65, 0x64, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x3e, 0x31, 0x30, - 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x63, - 0x65, 0x73, 0x73, 0x65, 0x64, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, - 0x73, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x65, 0x64, 0x3e, 0x36, - 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x52, 0x65, 0x74, 0x75, - 0x72, 0x6e, 0x65, 0x64, 0x3e, 0x3c, 0x2f, 0x53, 0x74, 0x61, 0x74, - 0x73, 0x3e, 0x40, 0xc6, 0x94, 0x33, 0x00, 0x00, 0x00, 0x38, 0x00, - 0x00, 0x00, 0x28, 0xc1, 0xc6, 0x84, 0xd4, 0x0d, 0x3a, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, - 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x0b, 0x3a, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x03, - 0x45, 0x6e, 0x64, 0xcf, 0x97, 0xd3, 0x92 - ) - ) - - req <- new_request(svc, op, NULL, op_output2) - req$http_response <- HttpResponse( - status_code = 200, - body = body - ) - req <- unmarshal(req) - out <- req$data - expect_equal(out$Payload$Records$Payload, "1\n2\n3\n", ignore_attr = TRUE) -}) - test_that("S3 access points", { access_point_arn <- "arn:aws:s3:us-west-2:123456789012:accesspoint/test" host <- "test-123456789012.s3-accesspoint.us-west-2.amazonaws.com" diff --git a/paws.common/tests/testthat/test_handlers_stream.R b/paws.common/tests/testthat/test_handlers_stream.R new file mode 100644 index 000000000..4aecb640f --- /dev/null +++ b/paws.common/tests/testthat/test_handlers_stream.R @@ -0,0 +1,279 @@ +test_that("StreamHandler", { + op <- Operation(name = "SelectObjectContent") + svc <- Client() + svc$client_info$service_name = "s3" + svc$handlers$unmarshal <- HandlerList( + restxml_unmarshal + ) + op_output <- Structure( + Payload = Structure( + Records = Structure( + Payload = Scalar(.tags = list(eventpayload = TRUE, type = "blob")) + ), + Stats = Structure( + Details = Structure( + BytesScanned = Scalar(.tags = list(type = "long")), + BytesProcessed = Scalar(.tags = list(type = "long")), + BytesReturned = Scalar(.tags = list(type = "long")), + .tags = list(eventpayload = TRUE) + ) + ), + Progress = Structure( + Details = Structure( + BytesScanned = Scalar(.tags = list(type = "long")), + BytesProcessed = Scalar(.tags = list(type = "long")), + BytesReturned = Scalar(.tags = list(type = "long")), + .tags = list(eventpayload = TRUE) + ) + ), + Cont = Scalar(.tags = list(event = TRUE)), + End = Scalar(.tags = list(event = TRUE)) + ), + .tags = list(payload = "Payload") + ) + + req <- new_request(svc, op, NULL, op_output) + req$operation$stream_api = TRUE + req$http_response <- HttpResponse( + status_code = 200, + body = list() + ) + req <- unmarshal(req) + expect_named(req$data, "Payload") + expect_s3_class(req$data$Payload, "PawsStreamHandler") + expect_equal( + capture.output(req$data$Payload), + c( + "", + "Please check return object for: s3_select_object_content", + "https://www.paws-r-sdk.com/docs/s3_select_object_content/" + ) + ) +}) + + +test_that("check xml paws_stream_parser", { + body <- as.raw( + c( + 0x00, 0x00, 0x00, 0x6b, 0x00, 0x00, 0x00, 0x55, 0x90, + 0xc1, 0x3c, 0x4e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, + 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x07, 0x52, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x73, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x18, 0x61, 0x70, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6f, + 0x63, 0x74, 0x65, 0x74, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a, 0x60, 0x17, 0xc3, 0x4c, 0x00, + 0x00, 0x00, 0xcd, 0x00, 0x00, 0x00, 0x43, 0x9b, 0x72, 0xe3, 0x29, + 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, + 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, + 0x65, 0x07, 0x00, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x0d, 0x3a, + 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, + 0x65, 0x07, 0x00, 0x08, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x78, 0x6d, + 0x6c, 0x3c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x20, 0x78, 0x6d, 0x6c, + 0x6e, 0x73, 0x3d, 0x22, 0x22, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, + 0x73, 0x53, 0x63, 0x61, 0x6e, 0x6e, 0x65, 0x64, 0x3e, 0x31, 0x30, + 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x63, 0x61, 0x6e, + 0x6e, 0x65, 0x64, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x3e, 0x31, 0x30, + 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x63, + 0x65, 0x73, 0x73, 0x65, 0x64, 0x3e, 0x3c, 0x42, 0x79, 0x74, 0x65, + 0x73, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x65, 0x64, 0x3e, 0x36, + 0x3c, 0x2f, 0x42, 0x79, 0x74, 0x65, 0x73, 0x52, 0x65, 0x74, 0x75, + 0x72, 0x6e, 0x65, 0x64, 0x3e, 0x3c, 0x2f, 0x53, 0x74, 0x61, 0x74, + 0x73, 0x3e, 0x40, 0xc6, 0x94, 0x33, 0x00, 0x00, 0x00, 0x38, 0x00, + 0x00, 0x00, 0x28, 0xc1, 0xc6, 0x84, 0xd4, 0x0d, 0x3a, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, + 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x0b, 0x3a, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x03, + 0x45, 0x6e, 0x64, 0xcf, 0x97, 0xd3, 0x92 + ) + ) + + interface <- structure(list(Records = structure(list(Payload = structure(logical(0), tags = list(eventpayload = TRUE, type = "blob"))), tags = list(type = "structure", event = TRUE)), Stats = structure(list(Details = structure(list(BytesScanned = structure(logical(0), tags = list(type = "long", box = TRUE)), BytesProcessed = structure(logical(0), tags = list(type = "long", box = TRUE)), BytesReturned = structure(logical(0), tags = list(type = "long", box = TRUE))), tags = list(eventpayload = TRUE, type = "structure"))), tags = list(type = "structure", event = TRUE)), Progress = structure(list(Details = structure(list(BytesScanned = structure(logical(0), tags = list(type = "long", box = TRUE)), BytesProcessed = structure(logical(0), tags = list(type = "long", box = TRUE)), BytesReturned = structure(logical(0), tags = list(type = "long", box = TRUE))), tags = list(eventpayload = TRUE, type = "structure"))), tags = list(type = "structure", event = TRUE)), Cont = structure(list(), tags = list(type = "structure", event = TRUE)), End = structure(list(), tags = list(type = "structure", event = TRUE))), tags = list(type = "structure", eventstream = TRUE)) + + # mock con object + con <- list( + body = rawConnection(body), + paws_metadata = list( + unmarshal = xml_parse_stream, + interface = interface + ) + ) + + mock_isIncomplete <- mock2(TRUE) + mockery::stub(paws_stream_parser, "isIncomplete", mock_isIncomplete) + + actual <- paws_stream_parser(con) + close(con$body) + + expect_equal(rawToChar(actual$Records$Payload), "1\n2\n3\n") + expect_equal( + actual$Stats, list( + Details = list( + BytesScanned = 10, BytesProcessed = 10, BytesReturned = 6 + ) + ) + ) +}) + + +test_that("check json paws_stream_parser", { + body <- as.raw( + c( + 0x00, 0x00, 0x00, 0xaa, 0x00, 0x00, 0x00, 0x52, 0xda, 0xd0, 0x68, + 0x86, 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, + 0x70, 0x65, 0x07, 0x00, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, + 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, + 0x10, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2f, 0x6a, 0x73, 0x6f, 0x6e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, + 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x7b, 0x22, 0x70, 0x22, 0x3a, + 0x22, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, + 0x6b, 0x6c, 0x6d, 0x6e, 0x6f, 0x70, 0x71, 0x72, 0x73, 0x74, 0x75, + 0x76, 0x77, 0x78, 0x79, 0x7a, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, + 0x47, 0x48, 0x49, 0x4a, 0x4b, 0x4c, 0x4d, 0x4e, 0x4f, 0x50, 0x51, + 0x52, 0x53, 0x22, 0x2c, 0x22, 0x72, 0x6f, 0x6c, 0x65, 0x22, 0x3a, + 0x22, 0x61, 0x73, 0x73, 0x69, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x22, + 0x7d, 0x72, 0x03, 0x3e, 0x7b, 0x00, 0x00, 0x01, 0x5d, 0x00, 0x00, + 0x00, 0x57, 0x9b, 0xf2, 0xc3, 0xe1, 0x0b, 0x3a, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x11, 0x63, + 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x44, 0x65, 0x6c, 0x74, 0x61, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, + 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x2f, 0x6a, 0x73, 0x6f, 0x6e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x7b, 0x22, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x6e, 0x64, + 0x65, 0x78, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x64, 0x65, 0x6c, 0x74, + 0x61, 0x22, 0x3a, 0x7b, 0x22, 0x74, 0x65, 0x78, 0x74, 0x22, 0x3a, + 0x22, 0x49, 0x20, 0x61, 0x6d, 0x20, 0x41, 0x6d, 0x61, 0x7a, 0x6f, + 0x6e, 0x20, 0x54, 0x69, 0x74, 0x61, 0x6e, 0x2c, 0x20, 0x61, 0x20, + 0x6c, 0x61, 0x72, 0x67, 0x65, 0x20, 0x6c, 0x61, 0x6e, 0x67, 0x75, + 0x61, 0x67, 0x65, 0x20, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x20, 0x62, + 0x75, 0x69, 0x6c, 0x74, 0x20, 0x62, 0x79, 0x20, 0x41, 0x57, 0x53, + 0x2e, 0x20, 0x49, 0x20, 0x77, 0x61, 0x73, 0x20, 0x64, 0x65, 0x73, + 0x69, 0x67, 0x6e, 0x65, 0x64, 0x20, 0x74, 0x6f, 0x20, 0x61, 0x73, + 0x73, 0x69, 0x73, 0x74, 0x20, 0x79, 0x6f, 0x75, 0x20, 0x77, 0x69, + 0x74, 0x68, 0x20, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x20, 0x6f, 0x72, + 0x20, 0x61, 0x6e, 0x73, 0x77, 0x65, 0x72, 0x20, 0x61, 0x6e, 0x79, + 0x20, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, + 0x79, 0x6f, 0x75, 0x20, 0x6d, 0x61, 0x79, 0x20, 0x68, 0x61, 0x76, + 0x65, 0x2e, 0x20, 0x48, 0x6f, 0x77, 0x20, 0x6d, 0x61, 0x79, 0x20, + 0x49, 0x20, 0x68, 0x65, 0x6c, 0x70, 0x20, 0x79, 0x6f, 0x75, 0x3f, + 0x22, 0x7d, 0x2c, 0x22, 0x70, 0x22, 0x3a, 0x22, 0x61, 0x62, 0x63, + 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, 0x6e, + 0x6f, 0x70, 0x71, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, + 0x7a, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a, + 0x4b, 0x4c, 0x4d, 0x4e, 0x4f, 0x50, 0x51, 0x22, 0x7d, 0x08, 0x31, + 0x0d, 0x02, 0x00, 0x00, 0x00, 0xb7, 0x00, 0x00, 0x00, 0x56, 0x45, + 0xcd, 0xff, 0xac, 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, + 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x53, 0x74, 0x6f, + 0x70, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, + 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, 0x61, 0x70, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6a, 0x73, 0x6f, + 0x6e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2d, + 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x7b, 0x22, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x22, 0x3a, + 0x30, 0x2c, 0x22, 0x70, 0x22, 0x3a, 0x22, 0x61, 0x62, 0x63, 0x64, + 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, 0x6e, 0x6f, + 0x70, 0x71, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7a, + 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a, 0x4b, + 0x4c, 0x4d, 0x4e, 0x4f, 0x50, 0x51, 0x52, 0x53, 0x54, 0x55, 0x56, + 0x57, 0x58, 0x59, 0x22, 0x7d, 0x4c, 0xb1, 0xd4, 0x91, 0x00, 0x00, + 0x00, 0xb4, 0x00, 0x00, 0x00, 0x51, 0x9c, 0x09, 0x10, 0xdf, 0x0b, + 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, + 0x07, 0x00, 0x0b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x53, + 0x74, 0x6f, 0x70, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, 0x61, 0x70, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6a, + 0x73, 0x6f, 0x6e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x7b, 0x22, 0x70, 0x22, 0x3a, 0x22, 0x61, 0x62, + 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, + 0x6e, 0x6f, 0x70, 0x71, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, + 0x79, 0x7a, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, + 0x4a, 0x4b, 0x4c, 0x4d, 0x4e, 0x4f, 0x50, 0x51, 0x52, 0x53, 0x54, + 0x55, 0x56, 0x57, 0x58, 0x59, 0x22, 0x2c, 0x22, 0x73, 0x74, 0x6f, + 0x70, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x3a, 0x22, 0x65, + 0x6e, 0x64, 0x5f, 0x74, 0x75, 0x72, 0x6e, 0x22, 0x7d, 0xbe, 0x58, + 0x0a, 0xc7, 0x00, 0x00, 0x00, 0xfe, 0x00, 0x00, 0x00, 0x4e, 0x03, + 0x42, 0x5d, 0xc2, 0x0b, 0x3a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2d, + 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, 0x61, + 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, + 0x6a, 0x73, 0x6f, 0x6e, 0x0d, 0x3a, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x7b, 0x22, 0x6d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x22, 0x3a, 0x7b, 0x22, 0x6c, 0x61, 0x74, 0x65, 0x6e, + 0x63, 0x79, 0x4d, 0x73, 0x22, 0x3a, 0x31, 0x34, 0x32, 0x30, 0x7d, + 0x2c, 0x22, 0x70, 0x22, 0x3a, 0x22, 0x61, 0x62, 0x63, 0x22, 0x2c, + 0x22, 0x75, 0x73, 0x61, 0x67, 0x65, 0x22, 0x3a, 0x7b, 0x22, 0x63, + 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x61, 0x64, 0x49, 0x6e, 0x70, + 0x75, 0x74, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x43, 0x6f, 0x75, 0x6e, + 0x74, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x63, 0x61, 0x63, 0x68, 0x65, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x54, + 0x6f, 0x6b, 0x65, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x3a, + 0x30, 0x2c, 0x22, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x73, 0x22, 0x3a, 0x38, 0x2c, 0x22, 0x6f, 0x75, 0x74, + 0x70, 0x75, 0x74, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x22, 0x3a, + 0x33, 0x39, 0x2c, 0x22, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x54, 0x6f, + 0x6b, 0x65, 0x6e, 0x73, 0x22, 0x3a, 0x34, 0x37, 0x7d, 0x7d, 0xff, + 0xaa, 0x2c, 0xb4 + ) + ) + interface <- structure(list( + contentBlockDelta = structure(list(delta = structure(list(text = structure(logical(0), tags = list(type = "string")), toolUse = structure(list(input = structure(logical(0), tags = list(type = "string"))), tags = list(type = "structure"))), tags = list(type = "structure", union = TRUE)), contentBlockIndex = structure(logical(0), tags = list(type = "integer", box = TRUE))), tags = list(type = "structure", event = TRUE)), + messageStop = structure(list(stopReason = structure(logical(0), tags = list(type = "string")), additionalModelResponseFields = structure(list(), tags = list(type = "structure", document = TRUE))), tags = list(type = "structure", event = TRUE)) + ) + ) + + # mock con object + con <- list( + body = rawConnection(body), + paws_metadata = list( + unmarshal = json_parse_stream, + interface = interface + ) + ) + + mock_isIncomplete <- mock2(TRUE) + mockery::stub(paws_stream_parser, "isIncomplete", mock_isIncomplete) + + actual <- paws_stream_parser(con) + close(con$body) + + expect_equal( + actual$contentBlockDelta$delta$text, + "I am Amazon Titan, a large language model built by AWS. I was designed to assist you with tasks or answer any questions you may have. How may I help you?" + ) + expect_equal(actual$messageStop$stopReason, "end_turn") +}) + +test_that("check stream_raw", { + mock_isIncomplete <- mock2(TRUE, FALSE) + mockery::stub(stream_raw, "isIncomplete", mock_isIncomplete) + + bytes <- as.raw(as.integer(runif(100) * 100)) + con <- rawConnection(bytes) + + actual <- stream_raw(con) + + expect_equal(actual, bytes) +}) + +test_that("check validate_checksum failure", { + data <- as.raw(c(0x1c, 0x48, 0x35, 0x5a)) + expect_error( + validate_checksum(data, "05321e05"), + "Checksum mismatch" + ) +}) +