Skip to content

Commit

Permalink
updating inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
rqthomas committed Nov 10, 2023
1 parent 724d7b6 commit 6b3f657
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 6 deletions.
33 changes: 32 additions & 1 deletion R/rebuild_inventory.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ inventory_df <- arrow::open_dataset(s3) |>
distinct(duration, model_id, site_id, reference_date, variable, date, project_id) |>
collect() |>
mutate(path = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}"),
path_full = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}/model_id={model_id}/reference_date={reference_date}/part-0.parquet"),
endpoint =config$endpoint)

s3_inventory <- arrow::s3_bucket(config$inventory_bucket,
endpoint_override = config$endpoint,
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))

arrow::write_dataset(inventory_df, path = s3_inventory$path("catalog"))
arrow::write_dataset(inventory_df, path = s3_inventory$path(glue::glue("catalog/forecasts/project_id={config$project_id}")))

s3_inventory <- arrow::s3_bucket(config$inventory_bucket,
endpoint_override = config$endpoint,
Expand All @@ -26,3 +27,33 @@ s3_inventory <- arrow::s3_bucket(config$inventory_bucket,

inventory_df |> distinct(model_id, project_id) |>
arrow::write_csv_arrow(s3_inventory$path("model_id/model_id-project_id-inventory.csv"))

###

library(tidyverse)
config <- yaml::read_yaml("challenge_configuration.yaml")

s3 <- arrow::s3_bucket(paste0(config$scores_bucket, "/parquet"), endpoint_override = config$endpoint, anonymous = TRUE)

bucket <- config$scores_bucket
inventory_df <- arrow::open_dataset(s3) |>
mutate(reference_date = lubridate::as_date(reference_datetime),
date = lubridate::as_date(datetime)) |>
distinct(duration, model_id, site_id, reference_date, variable, date, project_id) |>
collect() |>
mutate(path = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}"),
path_full = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}/model_id={model_id}/date={date}/part-0.parquet"),
endpoint =config$endpoint)

s3_inventory <- arrow::s3_bucket(config$inventory_bucket,
endpoint_override = config$endpoint,
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))

arrow::write_dataset(inventory_df, path = s3_inventory$path(glue::glue("catalog/scores/project_id={config$project_id}")))






2 changes: 1 addition & 1 deletion catalog/forecasts/forecast_models.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ forecast_theme_df <- arrow::open_dataset(arrow::s3_bucket(config$forecasts_bucke
#(USE strsplit(forecast_theme_df$ToString(), "\n") INSTEAD OF strsplit(forecast_theme_df[[1]]$ToString(), "\n"))

## identify model ids from bucket -- used in generate model items function
forecast_data_df <- duckdbfs::open_dataset(glue::glue("s3://{config$inventory_bucket}/catalog"),
forecast_data_df <- duckdbfs::open_dataset(glue::glue("s3://{config$inventory_bucket}/catalog/forecasts/project_id={config$project_id}"),
s3_endpoint = config$endpoint, anonymous=TRUE) |>
collect()

Expand Down
2 changes: 1 addition & 1 deletion catalog/scores/scores_models.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ scores_theme_df <- arrow::open_dataset(arrow::s3_bucket(config$scores_bucket, en
#filter(model_id == model_id, site_id = site_id, reference_datetime = reference_datetime)

## identify model ids from bucket -- used in generate model items function
scores_data_df <- duckdbfs::open_dataset(glue::glue("s3://{config$inventory_bucket}/catalog"),
scores_data_df <- duckdbfs::open_dataset(glue::glue("s3://{config$inventory_bucket}/catalog/forecasts/project_id={config$project_id}"),
s3_endpoint = config$endpoint, anonymous=TRUE) |>
collect()

Expand Down
2 changes: 1 addition & 1 deletion dashboard/index.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ library(leaflet)
library(tidyverse)
config <- yaml::read_yaml("../challenge_configuration.yaml")
sites <- suppressMessages(sf::st_read("sites.json"))
s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket, "/catalog"), endpoint_override = config$endpoint, anonymous = TRUE)
s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket, "/catalog/forecasts/project_id=", config$project_id), endpoint_override = config$endpoint, anonymous = TRUE)
n_forecasts <- arrow::open_dataset(s3_inventory) |> distinct(model_id, reference_date) |> collect() |> nrow()
n_models <- arrow::open_dataset(s3_inventory) |> distinct(model_id) |> collect() |> nrow()
Expand Down
5 changes: 5 additions & 0 deletions scoring/scoring.R
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ furrr::future_walk(1:nrow(variable_duration), function(k, variable_duration, con

new_prov <- purrr::map_dfr(1:nrow(groupings), function(j, groupings, prov_df, s3_scores_path, curr_variable){

for(j in 1:nrow(groupings)){

print(j)

group <- groupings[j,]
ref <- group$date

Expand Down Expand Up @@ -139,6 +143,7 @@ furrr::future_walk(1:nrow(variable_duration), function(k, variable_duration, con
}else{
curr_prov <- NULL
}
}
},
groupings, prov_df, s3_scores_path,curr_variable
)
Expand Down
5 changes: 3 additions & 2 deletions submission_processing/process_submissions.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ if(length(submissions) > 0){
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))

s3_inventory$CreateDir("inventory/catalog")
s3_inventory$CreateDir(paste0("inventory/catalog/forecasts/project_id=", config$project_id))

s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket,"/catalog"),
s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket,"/catalog/forecasts/project_id=", config$project_id),
endpoint_override = config$endpoint,
access_key = Sys.getenv("OSN_KEY"),
secret_key = Sys.getenv("OSN_SECRET"))
Expand Down Expand Up @@ -131,6 +131,7 @@ if(length(submissions) > 0){
curr_inventory <- fc |>
dplyr::mutate(date = lubridate::as_date(datetime),
path = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}"),
path_full = glue::glue("{bucket}/parquet/project_id={project_id}/duration={duration}/variable={variable}/model_id={model_id}/reference_date={reference_date}/part-0.parquet"),
endpoint = config$endpoint) |>
dplyr::distinct(project_id, duration, model_id, site_id, reference_date, variable, date, path, endpoint)

Expand Down

0 comments on commit 6b3f657

Please sign in to comment.