Skip to content

Commit

Permalink
Merge pull request #848 from DyfanJones/handler_stream
Browse files Browse the repository at this point in the history
Handler stream
  • Loading branch information
DyfanJones authored Dec 9, 2024
2 parents 95adbf6 + 1db1cae commit 57a369f
Show file tree
Hide file tree
Showing 27 changed files with 971 additions and 264 deletions.
113 changes: 113 additions & 0 deletions docs/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Streaming

As of `paws v-0.8.0+` streaming is supported in `paws`.


## Basic usage:

Example taken from: https://docs.aws.amazon.com/code-library/latest/ug/python_3_bedrock-runtime_code_examples.html


### Internal function:

Paws allows for a function to be passed to the returning stream for processing.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

resp$stream(\(chunk) chunk$contentBlockDelta$delta$text)
```

### paws_connection:

`paws` allows for the raw connection to be retrieved. The connection is a sub class of `httr2::httr2_response` class.
This allows paws_connection to be handle both a paws parser or httr2 stream parser.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

con <- resp$stream(.connection = TRUE)

while(!is.null(chunk <- paws_stream_parser(con))) {
print(chunk$contentBlockDelta$delta$text)
}
```

Note: the paws_stream_parser return the stream in the response syntax. In this case please check https://paws-r.github.io/docs/bedrockruntime_converse_stream/

For full flexibility you can use [httr2::resp_stream_aws](https://httr2.r-lib.org/reference/req_perform_stream.html?search-input=resp_stream_aws) to get the raw response from AWS.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

con <- resp$stream(.connection = TRUE)
repeat{
event <- resp_stream_aws(con)
if (is.null(event)) {
close(con)
break
}

str(event)
}
```
2 changes: 1 addition & 1 deletion make.paws/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: make.paws
Type: Package
Title: Generate Paws AWS SDKs for R
Version: 0.9.1
Version: 0.9.2
Authors@R: c(
person("David", "Kretch", email = "[email protected]", role = "aut"),
person("Adam", "Banker", email = "[email protected]", role = "aut"),
Expand Down
2 changes: 1 addition & 1 deletion make.paws/R/cran_category.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' @include package.R service.R
NULL

.paws.common.import.version <- "paws.common (>= 0.7.5)"
.paws.common.import.version <- "paws.common (>= 0.8.0)"

# Make all category-level packages.
make_categories <- function(sdk_dir, out_dir, categories, service_names) {
Expand Down
3 changes: 2 additions & 1 deletion make.paws/R/custom/s3.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ s3_download_file <- function(Bucket, Key, Filename, IfMatch = NULL, IfModifiedSi
name = "GetObject",
http_method = "GET",
http_path = "/{Bucket}/{Key+}",
paginator = list()
paginator = list(),
stream_api = FALSE
)
input <- .s3$get_object_input(Bucket = Bucket, IfMatch = IfMatch, IfModifiedSince = IfModifiedSince, IfNoneMatch = IfNoneMatch, IfUnmodifiedSince = IfUnmodifiedSince, Key = Key, Range = Range, ResponseCacheControl = ResponseCacheControl, ResponseContentDisposition = ResponseContentDisposition, ResponseContentEncoding = ResponseContentEncoding, ResponseContentLanguage = ResponseContentLanguage, ResponseContentType = ResponseContentType, ResponseExpires = ResponseExpires, VersionId = VersionId, SSECustomerAlgorithm = SSECustomerAlgorithm, SSECustomerKey = SSECustomerKey, SSECustomerKeyMD5 = SSECustomerKeyMD5, RequestPayer = RequestPayer, PartNumber = PartNumber, ExpectedBucketOwner = ExpectedBucketOwner)
output <- .s3$get_object_output()
Expand Down
10 changes: 8 additions & 2 deletions make.paws/R/operations.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ operation_template <- template(
http_method = ${http_method},
http_path = ${http_path},
host_prefix = ${host_prefix},
paginator = ${paginator}
paginator = ${paginator},
stream_api = ${stream_api}
)
input <- .${service}$${operation_input}
output <- .${service}$${operation_output}
Expand Down Expand Up @@ -66,7 +67,8 @@ make_operation <- function(operation, api, doc_maker) {
http_method = quoted(operation$http$method),
http_path = quoted(operation$http$requestUri),
host_prefix = quoted(operation[["endpoint"]][["hostPrefix"]] %||% ""),
paginator = set_paginator(operation$paginators)
paginator = set_paginator(operation$paginators),
stream_api = set_stream_api(operation)
)
}

Expand All @@ -91,6 +93,10 @@ set_paginator <- function(paginator) {
}
}

set_stream_api <- function(operation) {
as.character(operation$eventstream %||% FALSE)
}

# Override operation name from extdata/operation_name_override.yml
operation_name_override <- function(operation_name) {
path <- system_file(
Expand Down
35 changes: 19 additions & 16 deletions make.paws/R/process_api.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ TEST_DIR <- "tests/testthat"
#'
#' @keywords internal
make_sdk_for_api <- function(api_name, in_dir) {
result <- list()
api <- read_api(api_name, in_dir)
result$name <- package_name(api)
result$code <- make_code_files(api)
result$tests <- make_tests_files(api)
result$docs <- make_docs_files(api)
result <- list(
name = package_name(api),
code = make_code_files(api),
tests = make_tests_files(api),
docs = make_docs_files(api)
)
return(result)
}

#-------------------------------------------------------------------------------

# Write code for a given API.
make_code_files <- function(api) {
result <- list()
result$operations <- make_operations_files(api, doc_maker = make_docs_short)
result$interfaces <- make_interfaces_files(api)
result$service <- make_service_files(api)
result$custom <- make_custom_operations_files(api)
result$reexports <- make_reexports()
result <- list(
operations = make_operations_files(api, doc_maker = make_docs_short),
interfaces = make_interfaces_files(api),
service = make_service_files(api),
custom = make_custom_operations_files(api),
reexports = make_reexports()
)
return(result)
}

Expand Down Expand Up @@ -90,11 +92,12 @@ make_reexports <- function() {
}

make_docs_files <- function(api) {
result <- list()
result$operations <- make_operations_files(api, doc_maker = make_docs_long)
result$service <- make_service_files(api)
result$custom <- make_custom_operations_files(api)
result$reexports <- make_reexports()
result <- list(
operations = make_operations_files(api, doc_maker = make_docs_long),
service = make_service_files(api),
custom = make_custom_operations_files(api),
reexports = make_reexports()
)
return(result)
}

Expand Down
21 changes: 21 additions & 0 deletions make.paws/R/read_api.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Read a given API's definition and documentation files.
# aws-sdk-js deprecated and apis is not being updated
# TODO: short term migrate to botocore jsons
read_api <- function(api_name, path) {
api_path <- file.path(path, "apis")
region_config_path <- file.path(path, "lib/region_config_data.json")
Expand All @@ -18,6 +20,7 @@ read_api <- function(api_name, path) {
paginators <- jsonlite::read_json(files$paginators)
api <- merge_paginators(api, paginators$pagination)
}
api <- merge_eventstream(api)
region_config <- jsonlite::read_json(region_config_path)
api <- merge_region_config(api, region_config)
api <- fix_region_config(api)
Expand Down Expand Up @@ -62,6 +65,24 @@ merge_paginators <- function(api, paginators) {
return(api)
}

merge_eventstream <- function(api) {
flat_shape <- unlist(api$shapes)
eventstream <- flat_shape[endsWith(names(flat_shape),"eventstream")]
names(eventstream) <- stringr::str_extract(names(eventstream), "([a-zA-Z]+)")

shape <- flat_shape[endsWith(names(flat_shape), "shape")]
shape <- shape[shape %in% names(eventstream)]
names(shape) <- gsub(
"Output$|Response$", "", stringr::str_extract(names(shape), "([a-zA-Z]+)")
)
names(eventstream) <- names(shape)

for (nms in names(eventstream)) {
api$operations[[nms]]$eventstream <- eventstream[nms]
}
return(api)
}

# Returns an API object with region config info attached. Region config info
# lists endpoints for each service and region, if different from the default.
merge_region_config <- function(api, region_config) {
Expand Down
4 changes: 4 additions & 0 deletions make.paws/inst/templates/reexports_paws.common.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ paws.common::credentials
#' @importFrom paws.common creds
#' @export
paws.common::creds

#' @importFrom paws.common paws_stream_parser
#' @export
paws.common::paws_stream_parser
3 changes: 2 additions & 1 deletion make.paws/tests/testthat/test_operations.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ test_that("make_operation", {
http_method = \"POST\",
http_path = \"/abc\",
host_prefix = \"\",
paginator = list()
paginator = list(),
stream_api = FALSE
)
input <- .api$operation_input(Input1 = Input1, Input2 = Input2, Input3 = Input3)
output <- .api$operation_output()
Expand Down
5 changes: 4 additions & 1 deletion make.paws/tests/testthat/test_read_api.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ test_that("read_api", {

write_json(list(foo = "examples"), file.path(api_path, "foo-2018-11-01.examples.json"))
write_json(list(foo = "min"), file.path(api_path, "foo-2018-11-01.min.json"))
write_json(list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz")), file.path(api_path, "foo-2018-11-01.normal.json"))
write_json(
list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz"), shapes = list(foo = list(eventstream = "TRUE"))),
file.path(api_path, "foo-2018-11-01.normal.json")
)
write_json(list(foo = "paginators"), file.path(api_path, "foo-2018-11-01.paginators.json"))

write_json(list(foo = "wrong1"), file.path(api_path, "foo-2017-11-01.examples.json"))
Expand Down
8 changes: 4 additions & 4 deletions paws.common/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: paws.common
Type: Package
Title: Paws Low-Level Amazon Web Services API
Version: 0.7.7.9000
Version: 0.8.0
Authors@R: c(
person("David", "Kretch", email = "[email protected]", role = "aut"),
person("Adam", "Banker", email = "[email protected]", role = "aut"),
Expand Down Expand Up @@ -64,17 +64,17 @@ Collate:
'head_bucket.R'
'http_status.R'
'error.R'
'tags.R'
'xmlutil.R'
'stream.R'
'custom_s3.R'
'handlers_core.R'
'handlers_ec2query.R'
'handlers_jsonrpc.R'
'handlers_query.R'
'handlers_rest.R'
'handlers_restjson.R'
'tags.R'
'xmlutil.R'
'handlers_restxml.R'
'handlers_stream.R'
'idempotency.R'
'jsonutil.R'
'onLoad.R'
Expand Down
5 changes: 4 additions & 1 deletion paws.common/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ S3method(is_empty_xml,character)
S3method(is_empty_xml,default)
S3method(is_empty_xml,list)
S3method(is_empty_xml,raw)
S3method(print,PawsStreamHandler)
export(config)
export(credentials)
export(creds)
Expand All @@ -31,6 +32,7 @@ export(paginate_lapply)
export(paginate_sapply)
export(paws_config_log)
export(paws_reset_cache)
export(paws_stream_parser)
export(populate)
export(send_request)
export(set_config)
Expand All @@ -42,9 +44,10 @@ export(tag_has)
export(type)
importFrom(Rcpp,evalCpp)
importFrom(curl,curl_unescape)
importFrom(httr2,req_body_raw)
importFrom(digest,digest)
importFrom(httr2,req_options)
importFrom(httr2,req_perform)
importFrom(httr2,req_perform_connection)
importFrom(httr2,request)
importFrom(stats,runif)
importFrom(utils,flush.console)
Expand Down
5 changes: 4 additions & 1 deletion paws.common/NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# paws.common 0.7.7.9000
# paws.common 0.8.0
* migrate backend from httr to httr2
* enrich sso message (#844). Thanks to @hadley for raising issue.
* attempt to automatically set/refresh `sso` credentials by calling `aws cli` (#844)
* moved api log level to `debug` and `trace`. This is to prevent `info` level being saturated by api calls.
* migrate backend `httr` to `httr2`
* new `PawsStreamHandler`, allows paws to handle aws stream event (#842). Thankyou to @hadley for developing the initial solution in `httr2`.
* deprecated custom handler for `s3_unmarshal_select_object_content`

# paws.common 0.7.7
* fix unix time expiration check
Expand Down
Loading

0 comments on commit 57a369f

Please sign in to comment.