Skip to content

Commit

Permalink
Merge pull request #25 from eco4cast/stage3-patch
Browse files Browse the repository at this point in the history
correct s3 bucket object
  • Loading branch information
jzwart authored Feb 5, 2024
2 parents 63eba13 + 2ff70e9 commit 75efc1d
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/drivers_stage1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Update GEFS
shell: Rscript {0}
run: |
source("drivers/download_stage1_psuedo.R")
source("drivers/download_stage1_pseudo.R")
- name: Generate stage 2
shell: Rscript {0}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/drivers_stage3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ name: gefs_osn_stage3

jobs:
docker:
timeout-minutes: 2880
runs-on: [self-hosted]
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion drivers/generate_stage2.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ if(length(missing_dates) > 0){
hourly_df <- to_hourly(site_df,
site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = FALSE) |>
pseudo = FALSE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5)),
reference_datetime = lubridate::as_date(reference_datetime)) |>
dplyr::rename(parameter = ensemble)
Expand Down
4 changes: 2 additions & 2 deletions drivers/generate_stage3.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){
df |>
to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = TRUE) |>
pseudo = TRUE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |>
dplyr::rename(parameter = ensemble) |>
arrow::write_dataset(path = s3, partitioning = "site_id")
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
})
8 changes: 4 additions & 4 deletions drivers/to_hourly.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
#' @param df dataframe of stage1 NEON GEFS forecasts for sites to forecast at
#' @param site_list a dataframe of the latitude and longitude for all site_ids in df
#' @param use_solar_geom logical for using solar geometry for daily SW calculation
#' @param psuedo logical for something...
#' @param pseudo logical for something...
to_hourly <- function(df,
site_list,
use_solar_geom = TRUE,
psuedo = FALSE){
pseudo = FALSE){

if(!psuedo){
if(!pseudo){
reference_datetime <- lubridate::as_datetime(df$reference_datetime)[1]
}else{
reference_datetime <- NA
Expand Down Expand Up @@ -40,7 +40,7 @@ to_hourly <- function(df,

states <- df |>
dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |>
dplyr::filter(!psuedo | (psuedo & horizon != "006") | (psuedo & datetime == max(df$datetime))) |>
dplyr::filter(!pseudo | (pseudo & horizon != "006") | (pseudo & datetime == max(df$datetime))) |>
dplyr::select(-horizon) |>
dplyr::group_by(site_id, family, ensemble, variable) |>
dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |>
Expand Down
67 changes: 44 additions & 23 deletions drivers/update_stage3.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,65 @@ furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){
endpoint = config$endpoint,
bucket = driver_bucket)

stage3_df <- arrow::open_dataset(s3_stage3) |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()

max_date <- stage3_df |>
dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |>
dplyr::pull(max)
# case for if this is the first time creating stage3 drivers
stage3_dataset <- arrow::open_dataset(s3_stage3)
if(length(stage3_dataset$files) > 0){
stage3_df <- stage3_dataset |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()

max_date <- stage3_df |>
dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |>
dplyr::pull(max)
}else{
max_date <- NA
}

s3_pseudo <- gefs4cast::gefs_s3_dir(product = "pseudo",
path = driver_path,
endpoint = config$endpoint,
bucket = driver_bucket)

vars <- names(stage3_df)
if(length(stage3_dataset$files) > 0){
cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3))
}

cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3))
if(length(stage3_dataset$files) > 0){
pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id,
reference_datetime >= cut_off) |>
dplyr::collect()
}else{
pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()
}

pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id,
reference_datetime >= cut_off) |>
dplyr::collect()

if(nrow(psuedo_df) > 0){
if(nrow(pseudo_df) > 0){

df2 <- psuedo_df |>
df2 <- pseudo_df |>
to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = TRUE) |>
pseudo = TRUE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |>
dplyr::rename(parameter = ensemble)

stage3_df_update <- stage3_df |>
dplyr::filter(datetime < min(df2$datetime))
if(length(stage3_dataset$files) > 0){
stage3_df_update <- stage3_df |>
dplyr::filter(datetime < min(df2$datetime))

df2 |>
dplyr::bind_rows(stage3_df_update) |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}else{
df2 |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}

df2 |>
dplyr::bind_rows(stage3_df_update) |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}
})

0 comments on commit 75efc1d

Please sign in to comment.