From e3b65f1b752961a9bd8e587d88d50a631df13a31 Mon Sep 17 00:00:00 2001 From: Zwart Date: Mon, 5 Feb 2024 10:16:24 -0800 Subject: [PATCH 1/3] adding to hourly function because there are neon specific code in neon4cast version --- drivers/to_hourly.R | 136 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 drivers/to_hourly.R diff --git a/drivers/to_hourly.R b/drivers/to_hourly.R new file mode 100644 index 0000000000..b79e609949 --- /dev/null +++ b/drivers/to_hourly.R @@ -0,0 +1,136 @@ + +#' +#' @param df dataframe of stage1 NEON GEFS forecasts for sites to forecast at +#' @param site_list a dataframe of the latitude and longitude for all site_ids in df +#' @param use_solar_geom logical for using solar geometry for daily SW calculation +#' @param psuedo logical for something... +to_hourly <- function(df, + site_list, + use_solar_geom = TRUE, + psuedo = FALSE){ + + if(!psuedo){ + reference_datetime <- lubridate::as_datetime(df$reference_datetime)[1] + }else{ + reference_datetime <- NA + } + + var_order <- names(df) + + ensemble_maxtime <- df |> + dplyr::group_by(site_id, family, ensemble) |> + dplyr::summarise(max_time = max(datetime), .groups = "drop") + + ensembles <- unique(df$ensemble) + datetime <- seq(min(df$datetime), max(df$datetime), by = "1 hour") + variables <- unique(df$variable) + sites <- unique(df$site_id) + + full_time <- expand.grid(sites, ensembles, datetime, variables) |> + dplyr::rename(site_id = Var1, + ensemble = Var2, + datetime = Var3, + variable = Var4) |> + dplyr::mutate(datetime = lubridate::as_datetime(datetime)) |> + dplyr::arrange(site_id, ensemble, variable, datetime) |> + dplyr::left_join(ensemble_maxtime, by = c("site_id","ensemble")) |> + dplyr::filter(datetime <= max_time) |> + dplyr::select(-c("max_time")) |> + dplyr::distinct() + + states <- df |> + dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |> + dplyr::filter(!psuedo | (psuedo & horizon != "006") | (psuedo & datetime == max(df$datetime))) |> + dplyr::select(-horizon) |> + dplyr::group_by(site_id, family, ensemble, variable) |> + dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |> + dplyr::filter(variable %in% c("PRES", "RH", "TMP", "UGRD", "VGRD")) |> + dplyr::arrange(site_id, family, ensemble, datetime) |> + dplyr::mutate(prediction = imputeTS::na_interpolation(prediction, option = "linear")) |> + dplyr::mutate(prediction = ifelse(variable == "TMP", prediction + 273, prediction)) |> + dplyr::mutate(prediction = ifelse(variable == "RH", prediction/100, prediction)) |> + dplyr::ungroup() + + fluxes <- df |> + dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |> + dplyr::filter(horizon != "003") |> + dplyr::select(-horizon) |> + dplyr::group_by(site_id, family, ensemble, variable) |> + dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |> + dplyr::filter(variable %in% c("APCP","DSWRF","DLWRF")) |> + dplyr::arrange(site_id, family, ensemble, datetime) |> + tidyr::fill(prediction, .direction = "up") |> + dplyr::mutate(prediction = ifelse(variable == "APCP", prediction / (6 * 60 * 60), prediction), + variable = ifelse(variable == "APCP", "PRATE", variable)) |> + dplyr::ungroup() + + if(use_solar_geom){ + + fluxes <- fluxes |> + dplyr::left_join(site_list, by = "site_id") |> + dplyr::mutate(hour = lubridate::hour(datetime), + date = lubridate::as_date(datetime), + doy = lubridate::yday(datetime) + hour/24, + longitude = ifelse(longitude < 0, 360 + longitude, longitude), + rpot = downscale_solar_geom(doy, longitude, latitude)) |> # hourly sw flux calculated using solar geometry + dplyr::group_by(site_id, family, ensemble, date, variable) |> + dplyr::mutate(avg.rpot = mean(rpot, na.rm = TRUE), + avg.SW = mean(prediction, na.rm = TRUE))|> # daily sw mean from solar geometry + dplyr::ungroup() |> + dplyr::mutate(prediction = ifelse(variable == "DSWRF" & avg.rpot > 0.0, rpot * (avg.SW/avg.rpot),prediction)) |> + dplyr::select(any_of(var_order)) + } + + hourly_df <- dplyr::bind_rows(states, fluxes) |> + dplyr::arrange(site_id, family, ensemble, datetime) |> + dplyr::mutate(variable = ifelse(variable == "TMP", "air_temperature", variable), + variable = ifelse(variable == "PRES", "air_pressure", variable), + variable = ifelse(variable == "RH", "relative_humidity", variable), + variable = ifelse(variable == "DLWRF", "surface_downwelling_longwave_flux_in_air", variable), + variable = ifelse(variable == "DSWRF", "surface_downwelling_shortwave_flux_in_air", variable), + variable = ifelse(variable == "PRATE", "precipitation_flux", variable), + variable = ifelse(variable == "VGRD", "eastward_wind", variable), + variable = ifelse(variable == "UGRD", "northward_wind", variable), + variable = ifelse(variable == "APCP", "precipitation_amount", variable), + reference_datetime = reference_datetime) |> + dplyr::select(any_of(var_order)) + + return(hourly_df) + +} + +cos_solar_zenith_angle <- function(doy, lat, lon, dt, hr) { + et <- equation_of_time(doy) + merid <- floor(lon / 15) * 15 + merid[merid < 0] <- merid[merid < 0] + 15 + lc <- (lon - merid) * -4/60 ## longitude correction + tz <- merid / 360 * 24 ## time zone + midbin <- 0.5 * dt / 86400 * 24 ## shift calc to middle of bin + t0 <- 12 + lc - et - tz - midbin ## solar time + h <- pi/12 * (hr - t0) ## solar hour + dec <- -23.45 * pi / 180 * cos(2 * pi * (doy + 10) / 365) ## declination + cosz <- sin(lat * pi / 180) * sin(dec) + cos(lat * pi / 180) * cos(dec) * cos(h) + cosz[cosz < 0] <- 0 + return(cosz) +} + +equation_of_time <- function(doy) { + stopifnot(doy <= 367) + f <- pi / 180 * (279.5 + 0.9856 * doy) + et <- (-104.7 * sin(f) + 596.2 * sin(2 * f) + 4.3 * + sin(4 * f) - 429.3 * cos(f) - 2 * + cos(2 * f) + 19.3 * cos(3 * f)) / 3600 # equation of time -> eccentricity and obliquity + return(et) +} + +downscale_solar_geom <- function(doy, lon, lat) { + + dt <- median(diff(doy)) * 86400 # average number of seconds in time interval + hr <- (doy - floor(doy)) * 24 # hour of day for each element of doy + + ## calculate potential radiation + cosz <- cos_solar_zenith_angle(doy, lat, lon, dt, hr) + rpot <- 1366 * cosz + return(rpot) +} + From 223c9ca4a4326257dc67ca1c4544c26c1f925695 Mon Sep 17 00:00:00 2001 From: Zwart Date: Mon, 5 Feb 2024 10:17:06 -0800 Subject: [PATCH 2/3] site list to new to hourly function --- drivers/generate_stage2.R | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/drivers/generate_stage2.R b/drivers/generate_stage2.R index 31d77718d0..2d10efcf4b 100644 --- a/drivers/generate_stage2.R +++ b/drivers/generate_stage2.R @@ -1,7 +1,8 @@ ## setup library(gdalcubes) library(gefs4cast) -source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R") +# need to source to_hourly.R instead of from neon4cast because there are neon-specific code in neon4cast +source("drivers/to_hourly.R") Sys.setenv("GEFS_VERSION"="v12") @@ -21,19 +22,28 @@ s3_stage2 <- gefs4cast::gefs_s3_dir(product = "stage2", endpoint = config$endpoint, bucket = driver_bucket) -df <- arrow::open_dataset(s3_stage2) |> - dplyr::distinct(reference_datetime) |> - dplyr::collect() - +# if there aren't any data (i.e., this is the first time we're creating this dataset), +# then skip the distinct(reference_datetime) filter +df <- arrow::open_dataset(s3_stage2) +if(length(df$files) > 0){ + df <- arrow::open_dataset(s3_stage2) |> + dplyr::distinct(reference_datetime) |> + dplyr::collect() +} curr_date <- Sys.Date() last_week <- dplyr::tibble(reference_datetime = as.character(seq(curr_date - lubridate::days(7), curr_date - lubridate::days(1), by = "1 day"))) -missing_dates <- dplyr::anti_join(last_week, df, - by = "reference_datetime") |> - dplyr::pull(reference_datetime) +if(length(df$files) > 0){ + missing_dates <- dplyr::anti_join(last_week, df, + by = "reference_datetime") |> + dplyr::pull(reference_datetime) +}else{ + missing_dates <- dplyr::pull(last_week, reference_datetime) +} + if(length(missing_dates) > 0){ for(i in 1:length(missing_dates)){ @@ -55,6 +65,7 @@ if(length(missing_dates) > 0){ dplyr::mutate(reference_datetime = missing_dates[i]) hourly_df <- to_hourly(site_df, + site_list = dplyr::select(site_list, site_id, latitude, longitude), use_solar_geom = TRUE, psuedo = FALSE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5)), From d8ea4677959617edfc47a7dab5a589a650feaa4d Mon Sep 17 00:00:00 2001 From: Zwart Date: Mon, 5 Feb 2024 10:26:49 -0800 Subject: [PATCH 3/3] update to new to hourly function usage --- drivers/generate_stage3.R | 11 ++++++----- drivers/update_stage3.R | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/drivers/generate_stage3.R b/drivers/generate_stage3.R index d70a426f70..7635372077 100644 --- a/drivers/generate_stage3.R +++ b/drivers/generate_stage3.R @@ -2,7 +2,7 @@ library(minioclient) library(gdalcubes) library(gefs4cast) -source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R") +source("drivers/to_hourly.R") config <- yaml::read_yaml("challenge_configuration.yaml") driver_bucket <- stringr::word(config$driver_bucket, 1, sep = "/") @@ -21,8 +21,7 @@ df <- arrow::open_dataset("pseudo") |> site_list <- readr::read_csv(paste0("https://github.com/eco4cast/usgsrc4cast-ci/", "raw/prod/USGS_site_metadata.csv"), - show_col_types = FALSE) |> - dplyr::pull(site_id) + show_col_types = FALSE) s3_stage3 <- gefs4cast::gefs_s3_dir(product = "stage3", @@ -32,7 +31,7 @@ s3_stage3 <- gefs4cast::gefs_s3_dir(product = "stage3", future::plan("future::multisession", workers = 8) -furrr::future_walk(site_list, function(curr_site_id){ +furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){ df <- arrow::open_dataset("pseudo") |> dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |> @@ -46,7 +45,9 @@ furrr::future_walk(site_list, function(curr_site_id){ print(curr_site_id) df |> - to_hourly(use_solar_geom = TRUE, psuedo = TRUE) |> + to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude), + use_solar_geom = TRUE, + psuedo = TRUE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |> dplyr::rename(parameter = ensemble) |> arrow::write_dataset(path = s3, partitioning = "site_id") diff --git a/drivers/update_stage3.R b/drivers/update_stage3.R index 05d86b0fea..bd5625a0a1 100644 --- a/drivers/update_stage3.R +++ b/drivers/update_stage3.R @@ -1,11 +1,10 @@ library(gdalcubes) library(gefs4cast) -source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R") +source("drivers/to_hourly.R") site_list <- readr::read_csv(paste0("https://github.com/eco4cast/usgsrc4cast-ci/", "raw/prod/USGS_site_metadata.csv"), - show_col_types = FALSE) |> - dplyr::pull(site_id) + show_col_types = FALSE) Sys.setenv("GEFS_VERSION"="v12") @@ -15,7 +14,7 @@ driver_path <- stringr::word(config$driver_bucket, 2, -1, sep = "/") future::plan("future::multisession", workers = 8) -furrr::future_walk(site_list, function(curr_site_id){ +furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){ print(curr_site_id) @@ -50,7 +49,9 @@ furrr::future_walk(site_list, function(curr_site_id){ if(nrow(psuedo_df) > 0){ df2 <- psuedo_df |> - to_hourly(use_solar_geom = TRUE, psuedo = TRUE) |> + to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude), + use_solar_geom = TRUE, + psuedo = TRUE) |> dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |> dplyr::rename(parameter = ensemble)