diff --git a/.github/workflows/drivers_stage1.yaml b/.github/workflows/drivers_stage1.yaml index 8e9140f947..0302ead383 100644 --- a/.github/workflows/drivers_stage1.yaml +++ b/.github/workflows/drivers_stage1.yaml @@ -31,7 +31,7 @@ jobs: - name: Update GEFS shell: Rscript {0} run: | - source("drivers/download_stage1_psuedo.R") + source("drivers/download_stage1_pseudo.R") - name: Generate stage 2 shell: Rscript {0} diff --git a/.github/workflows/drivers_stage3.yaml b/.github/workflows/drivers_stage3.yaml index f1f6f78399..94c6542176 100644 --- a/.github/workflows/drivers_stage3.yaml +++ b/.github/workflows/drivers_stage3.yaml @@ -9,6 +9,7 @@ name: gefs_osn_stage3 jobs: docker: + timeout-minutes: 2880 runs-on: [self-hosted] env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} diff --git a/drivers/download_stage1_psuedo.R b/drivers/download_stage1_pseudo.R similarity index 100% rename from drivers/download_stage1_psuedo.R rename to drivers/download_stage1_pseudo.R diff --git a/drivers/generate_stage2.R b/drivers/generate_stage2.R index 2d10efcf4b..7d23a2a3c9 100644 --- a/drivers/generate_stage2.R +++ b/drivers/generate_stage2.R @@ -67,7 +67,7 @@ if(length(missing_dates) > 0){ hourly_df <- to_hourly(site_df, site_list = dplyr::select(site_list, site_id, latitude, longitude), use_solar_geom = TRUE, - psuedo = FALSE) |> + pseudo = FALSE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5)), reference_datetime = lubridate::as_date(reference_datetime)) |> dplyr::rename(parameter = ensemble) diff --git a/drivers/generate_stage3.R b/drivers/generate_stage3.R index 7635372077..e4f588c379 100644 --- a/drivers/generate_stage3.R +++ b/drivers/generate_stage3.R @@ -47,8 +47,8 @@ furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){ df |> to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude), use_solar_geom = TRUE, - psuedo = TRUE) |> + pseudo = TRUE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |> dplyr::rename(parameter = ensemble) |> - arrow::write_dataset(path = s3, partitioning = "site_id") + arrow::write_dataset(path = s3_stage3, partitioning = "site_id") }) diff --git a/drivers/to_hourly.R b/drivers/to_hourly.R index b79e609949..53442dbbdc 100644 --- a/drivers/to_hourly.R +++ b/drivers/to_hourly.R @@ -3,13 +3,13 @@ #' @param df dataframe of stage1 NEON GEFS forecasts for sites to forecast at #' @param site_list a dataframe of the latitude and longitude for all site_ids in df #' @param use_solar_geom logical for using solar geometry for daily SW calculation -#' @param psuedo logical for something... +#' @param pseudo logical for something... to_hourly <- function(df, site_list, use_solar_geom = TRUE, - psuedo = FALSE){ + pseudo = FALSE){ - if(!psuedo){ + if(!pseudo){ reference_datetime <- lubridate::as_datetime(df$reference_datetime)[1] }else{ reference_datetime <- NA @@ -40,7 +40,7 @@ to_hourly <- function(df, states <- df |> dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |> - dplyr::filter(!psuedo | (psuedo & horizon != "006") | (psuedo & datetime == max(df$datetime))) |> + dplyr::filter(!pseudo | (pseudo & horizon != "006") | (pseudo & datetime == max(df$datetime))) |> dplyr::select(-horizon) |> dplyr::group_by(site_id, family, ensemble, variable) |> dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |> diff --git a/drivers/update_stage3.R b/drivers/update_stage3.R index bd5625a0a1..51a6e2ad69 100644 --- a/drivers/update_stage3.R +++ b/drivers/update_stage3.R @@ -23,44 +23,65 @@ furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){ endpoint = config$endpoint, bucket = driver_bucket) - stage3_df <- arrow::open_dataset(s3_stage3) |> - dplyr::filter(site_id == curr_site_id) |> - dplyr::collect() - - max_date <- stage3_df |> - dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |> - dplyr::pull(max) + # case for if this is the first time creating stage3 drivers + stage3_dataset <- arrow::open_dataset(s3_stage3) + if(length(stage3_dataset$files) > 0){ + stage3_df <- stage3_dataset |> + dplyr::filter(site_id == curr_site_id) |> + dplyr::collect() + + max_date <- stage3_df |> + dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |> + dplyr::pull(max) + }else{ + max_date <- NA + } s3_pseudo <- gefs4cast::gefs_s3_dir(product = "pseudo", path = driver_path, endpoint = config$endpoint, bucket = driver_bucket) - vars <- names(stage3_df) + if(length(stage3_dataset$files) > 0){ + cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3)) + } - cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3)) + if(length(stage3_dataset$files) > 0){ + pseudo_df <- arrow::open_dataset(s3_pseudo) |> + dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |> + dplyr::filter(site_id == curr_site_id, + reference_datetime >= cut_off) |> + dplyr::collect() + }else{ + pseudo_df <- arrow::open_dataset(s3_pseudo) |> + dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |> + dplyr::filter(site_id == curr_site_id) |> + dplyr::collect() + } - pseudo_df <- arrow::open_dataset(s3_pseudo) |> - dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |> - dplyr::filter(site_id == curr_site_id, - reference_datetime >= cut_off) |> - dplyr::collect() - if(nrow(psuedo_df) > 0){ + if(nrow(pseudo_df) > 0){ - df2 <- psuedo_df |> + df2 <- pseudo_df |> to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude), use_solar_geom = TRUE, - psuedo = TRUE) |> + pseudo = TRUE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |> dplyr::rename(parameter = ensemble) - stage3_df_update <- stage3_df |> - dplyr::filter(datetime < min(df2$datetime)) + if(length(stage3_dataset$files) > 0){ + stage3_df_update <- stage3_df |> + dplyr::filter(datetime < min(df2$datetime)) + + df2 |> + dplyr::bind_rows(stage3_df_update) |> + dplyr::arrange(variable, datetime, parameter) |> + arrow::write_dataset(path = s3_stage3, partitioning = "site_id") + }else{ + df2 |> + dplyr::arrange(variable, datetime, parameter) |> + arrow::write_dataset(path = s3_stage3, partitioning = "site_id") + } - df2 |> - dplyr::bind_rows(stage3_df_update) |> - dplyr::arrange(variable, datetime, parameter) |> - arrow::write_dataset(path = s3_stage3, partitioning = "site_id") } })