Skip to content

Commit

Permalink
conditions for if this is first time creating the dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwart committed Feb 5, 2024
1 parent 652f187 commit 2ff70e9
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions drivers/update_stage3.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,65 @@ furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){
endpoint = config$endpoint,
bucket = driver_bucket)

stage3_df <- arrow::open_dataset(s3_stage3) |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()

max_date <- stage3_df |>
dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |>
dplyr::pull(max)
# case for if this is the first time creating stage3 drivers
stage3_dataset <- arrow::open_dataset(s3_stage3)
if(length(stage3_dataset$files) > 0){
stage3_df <- stage3_dataset |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()

max_date <- stage3_df |>
dplyr::summarise(max = as.character(lubridate::as_date(max(datetime)))) |>
dplyr::pull(max)
}else{
max_date <- NA
}

s3_pseudo <- gefs4cast::gefs_s3_dir(product = "pseudo",
path = driver_path,
endpoint = config$endpoint,
bucket = driver_bucket)

vars <- names(stage3_df)
if(length(stage3_dataset$files) > 0){
cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3))
}

cut_off <- as.character(lubridate::as_date(max_date) - lubridate::days(3))
if(length(stage3_dataset$files) > 0){
pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id,
reference_datetime >= cut_off) |>
dplyr::collect()
}else{
pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id) |>
dplyr::collect()
}

pseudo_df <- arrow::open_dataset(s3_pseudo) |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
dplyr::filter(site_id == curr_site_id,
reference_datetime >= cut_off) |>
dplyr::collect()

if(nrow(psuedo_df) > 0){
if(nrow(pseudo_df) > 0){

df2 <- psuedo_df |>
df2 <- pseudo_df |>
to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = TRUE) |>
pseudo = TRUE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |>
dplyr::rename(parameter = ensemble)

stage3_df_update <- stage3_df |>
dplyr::filter(datetime < min(df2$datetime))
if(length(stage3_dataset$files) > 0){
stage3_df_update <- stage3_df |>
dplyr::filter(datetime < min(df2$datetime))

df2 |>
dplyr::bind_rows(stage3_df_update) |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}else{
df2 |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}

df2 |>
dplyr::bind_rows(stage3_df_update) |>
dplyr::arrange(variable, datetime, parameter) |>
arrow::write_dataset(path = s3_stage3, partitioning = "site_id")
}
})

0 comments on commit 2ff70e9

Please sign in to comment.