Skip to content

Commit

Permalink
update querying method for catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
addelany committed Dec 20, 2024
1 parent b519b4f commit c365260
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 119 deletions.
103 changes: 65 additions & 38 deletions catalog/forecasts/forecast_models.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,36 @@ forecast_description_create <- data.frame(datetime = 'datetime of the forecasted
# model_id <- 'climatology'

print('FIND FORECAST TABLE SCHEMA')
forecast_theme_df <- arrow::open_dataset(arrow::s3_bucket(config$forecasts_bucket, endpoint_override = config$endpoint, anonymous = TRUE)) #|>
forecast_theme_df <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) #|>

print('FIND INVENTORY BUCKET')
# forecast_s3 <- arrow::s3_bucket(glue::glue("{config$inventory_bucket}/catalog/forecasts/project_id={config$project_id}"),
# endpoint_override = "sdsc.osn.xsede.org",
# anonymous=TRUE)

forecast_s3 <- arrow::s3_bucket(glue::glue("{config$forecasts_bucket}/bundled-parquet/project_id={config$project_id}"),
endpoint_override = "sdsc.osn.xsede.org",
anonymous=TRUE)
# forecast_s3 <- arrow::s3_bucket(glue::glue("{config$forecasts_bucket}/bundled-parquet/project_id={config$project_id}"),
# endpoint_override = "sdsc.osn.xsede.org",
# anonymous=TRUE)

print('OPEN INVENTORY BUCKET')
forecast_data_df <- arrow::open_dataset(forecast_s3) |>
#filter(project_id == config$project_id) |>
#print('OPEN INVENTORY BUCKET')
# forecast_data_df <- arrow::open_dataset(forecast_s3) |>
# #filter(project_id == config$project_id) |>
# collect()

theme_models <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
distinct(model_id) |>
collect()

theme_models <- forecast_data_df |>
distinct(model_id)
forecast_sites <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
distinct(site_id) |>
collect()

forecast_sites <- forecast_data_df |>
distinct(site_id)
forecast_date_range <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
summarize(across(all_of(c('datetime')), list(min = min, max = max))) |>
collect()
forecast_min_date <- forecast_date_range$datetime_min
forecast_max_date <- forecast_date_range$datetime_max

forecast_date_range <- forecast_data_df |> dplyr::summarise(min(date),max(date))
forecast_min_date <- forecast_date_range$`min(date)`
forecast_max_date <- forecast_date_range$`max(date)`

build_description <- paste0("Forecasts are the raw forecasts that includes all ensemble members or distribution parameters. Due to the size of the raw forecasts, we recommend accessing the scores (summaries of the forecasts) to analyze forecasts (unless you need the individual ensemble members). You can access the forecasts at the top level of the dataset where all models, variables, and dates that forecasts were produced (reference_datetime) are available. The code to access the entire dataset is provided as an asset. Given the size of the forecast catalog, it can be time-consuming to access the data at the full dataset level. For quicker access to the forecasts for a particular model (model_id), we also provide the code to access the data at the model_id level as an asset for each model.")

Expand Down Expand Up @@ -106,8 +111,11 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
print(names(config$variable_groups)[i])

# check data and skip if no data found
var_group_data_check <- forecast_data_df |>
filter(variable %in% config$variable_groups[[i]]$variable)
var_group_data_check <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'),
endpoint_override = config$endpoint, anonymous=TRUE)) |>
filter(variable %in% c(config$variable_groups[[i]]$variable)) |>
summarise(n = n()) |>
collect()

if (nrow(var_group_data_check) == 0){
print('No data available for group')
Expand Down Expand Up @@ -135,9 +143,10 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
group_description <- paste0('All variables for the ',names(config$variable_groups[i]),' group.')

## find group sites
find_group_sites <- forecast_data_df |>
find_group_sites <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(variable %in% var_values) |>
distinct(site_id)
distinct(site_id) |>
collect()

## create empty vector to track publication information
citation_build <- c()
Expand Down Expand Up @@ -167,8 +176,11 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
var_formal_name <- paste0(duration_value,'_',var_name_full[j])

# check data and skip if no data found
var_data_check <- forecast_data_df |>
filter(variable == var_name)
var_data_check <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'),
endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(variable == var_name, duration == duration_name) |>
summarise(n = n()) |>
collect()

if (nrow(var_data_check) == 0){
print('No data available for variable')
Expand All @@ -179,22 +191,30 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
dir.create(file.path(catalog_config$forecast_path,names(config$variable_groups)[i],var_formal_name))
}

var_data <- forecast_data_df |>
# var_data <- forecast_data_df |>
# filter(variable == var_name,
# duration == duration_name)

var_date_range <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(variable == var_name,
duration == duration_name)
duration == duration_name) |>
summarize(across(all_of(c('datetime')), list(min = min, max = max))) |>
collect()

var_date_range <- var_data |> dplyr::summarise(min(date),max(date))
var_min_date <- var_date_range$`min(date)`
var_max_date <- var_date_range$`max(date)`
var_min_date <- var_date_range$datetime_min
var_max_date <- var_date_range$datetime_max

var_models <- var_data |>
var_models <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(variable == var_name, duration == duration_name) |>
distinct(model_id) |>
collect() |>
filter(model_id %in% registered_model_id$model_id,
!grepl("example",model_id))

find_var_sites <- forecast_data_df |>
find_var_sites <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE))|>
filter(variable == var_name) |>
distinct(site_id)
distinct(site_id) |>
collect()

var_metadata <- variable_gsheet |>
filter(`"official" targets name` == var_name,
Expand Down Expand Up @@ -258,23 +278,26 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
}

print(m)
model_date_range <- forecast_data_df |>

model_date_range <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(model_id == m,
variable == var_name,
duration == duration_name) |>
dplyr::summarise(min(date),max(date),max(reference_date),max(pub_date))
summarize(across(all_of(c('datetime','reference_date','pub_datetime')), list(min = min, max = max))) |>
collect()

model_min_date <- model_date_range$`min(date)`
model_max_date <- model_date_range$`max(date)`
model_min_date <- model_date_range$datetime_min
model_max_date <- model_date_range$datetime_max

model_reference_date <- model_date_range$`max(reference_date)`
model_pub_date <- model_date_range$`max(pub_date)`
model_reference_date <- model_date_range$reference_date_max
model_pub_date <- model_date_range$pub_datetime_max

model_var_duration_df <- forecast_data_df |>
model_var_duration_df <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'), endpoint_override = config$endpoint, anonymous = TRUE)) |>
filter(model_id == m,
variable == var_name,
duration == duration_name) |>
distinct(variable,duration, project_id) |>
collect() |>
mutate(duration_name = ifelse(duration == 'P1D', 'Daily', duration)) |>
mutate(duration_name = ifelse(duration == 'PT1H', 'Hourly', duration_name)) |>
mutate(duration_name = ifelse(duration == 'PT30M', '30min', duration_name)) |>
Expand All @@ -285,19 +308,23 @@ for (i in 1:length(config$variable_groups)){ ## organize variable groups
select(variable = `"official" targets name`, full_name = `Variable name`) |>
distinct(variable, .keep_all = TRUE)), by = c('variable'))

model_sites <- forecast_data_df |>
model_sites <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'),
endpoint_override = config$endpoint, anonymous=TRUE)) |>
filter(model_id == m,
variable == var_name,
duration == duration_name) |>
distinct(site_id)
distinct(site_id) |>
collect()

model_site_text <- paste(as.character(model_sites$site_id), sep="' '", collapse=", ")

model_vars <- forecast_data_df |>
model_vars <- arrow::open_dataset(arrow::s3_bucket(paste0(config$forecasts_bucket,'/bundled-parquet'),
endpoint_override = config$endpoint, anonymous=TRUE)) |>
filter(model_id == m,
variable == var_name,
duration == duration_name) |>
distinct(variable) |>
collect() |>
left_join(model_var_full_name, by = 'variable')

model_vars$var_duration_name <- paste0(model_vars$duration_name, " ", model_vars$full_name)
Expand Down
Loading

0 comments on commit c365260

Please sign in to comment.