Skip to content

Commit

Permalink
Merge pull request #24 from eco4cast/main
Browse files Browse the repository at this point in the history
main into prod
  • Loading branch information
jzwart authored Feb 5, 2024
2 parents 1887fe3 + 63eba13 commit 1d027ee
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 18 deletions.
27 changes: 19 additions & 8 deletions drivers/generate_stage2.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## setup
library(gdalcubes)
library(gefs4cast)
source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R")
# need to source to_hourly.R instead of from neon4cast because there are neon-specific code in neon4cast
source("drivers/to_hourly.R")

Sys.setenv("GEFS_VERSION"="v12")

Expand All @@ -21,19 +22,28 @@ s3_stage2 <- gefs4cast::gefs_s3_dir(product = "stage2",
endpoint = config$endpoint,
bucket = driver_bucket)

df <- arrow::open_dataset(s3_stage2) |>
dplyr::distinct(reference_datetime) |>
dplyr::collect()

# if there aren't any data (i.e., this is the first time we're creating this dataset),
# then skip the distinct(reference_datetime) filter
df <- arrow::open_dataset(s3_stage2)
if(length(df$files) > 0){
df <- arrow::open_dataset(s3_stage2) |>
dplyr::distinct(reference_datetime) |>
dplyr::collect()
}

curr_date <- Sys.Date()
last_week <- dplyr::tibble(reference_datetime = as.character(seq(curr_date - lubridate::days(7),
curr_date - lubridate::days(1),
by = "1 day")))

missing_dates <- dplyr::anti_join(last_week, df,
by = "reference_datetime") |>
dplyr::pull(reference_datetime)
if(length(df$files) > 0){
missing_dates <- dplyr::anti_join(last_week, df,
by = "reference_datetime") |>
dplyr::pull(reference_datetime)
}else{
missing_dates <- dplyr::pull(last_week, reference_datetime)
}


if(length(missing_dates) > 0){
for(i in 1:length(missing_dates)){
Expand All @@ -55,6 +65,7 @@ if(length(missing_dates) > 0){
dplyr::mutate(reference_datetime = missing_dates[i])

hourly_df <- to_hourly(site_df,
site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = FALSE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5)),
Expand Down
11 changes: 6 additions & 5 deletions drivers/generate_stage3.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
library(minioclient)
library(gdalcubes)
library(gefs4cast)
source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R")
source("drivers/to_hourly.R")

config <- yaml::read_yaml("challenge_configuration.yaml")
driver_bucket <- stringr::word(config$driver_bucket, 1, sep = "/")
Expand All @@ -21,8 +21,7 @@ df <- arrow::open_dataset("pseudo") |>

site_list <- readr::read_csv(paste0("https://github.com/eco4cast/usgsrc4cast-ci/",
"raw/prod/USGS_site_metadata.csv"),
show_col_types = FALSE) |>
dplyr::pull(site_id)
show_col_types = FALSE)


s3_stage3 <- gefs4cast::gefs_s3_dir(product = "stage3",
Expand All @@ -32,7 +31,7 @@ s3_stage3 <- gefs4cast::gefs_s3_dir(product = "stage3",

future::plan("future::multisession", workers = 8)

furrr::future_walk(site_list, function(curr_site_id){
furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){

df <- arrow::open_dataset("pseudo") |>
dplyr::filter(variable %in% c("PRES","TMP","RH","UGRD","VGRD","APCP","DSWRF","DLWRF")) |>
Expand All @@ -46,7 +45,9 @@ furrr::future_walk(site_list, function(curr_site_id){

print(curr_site_id)
df |>
to_hourly(use_solar_geom = TRUE, psuedo = TRUE) |>
to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = TRUE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |>
dplyr::rename(parameter = ensemble) |>
arrow::write_dataset(path = s3, partitioning = "site_id")
Expand Down
136 changes: 136 additions & 0 deletions drivers/to_hourly.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@

#'
#' @param df dataframe of stage1 NEON GEFS forecasts for sites to forecast at
#' @param site_list a dataframe of the latitude and longitude for all site_ids in df
#' @param use_solar_geom logical for using solar geometry for daily SW calculation
#' @param psuedo logical for something...
to_hourly <- function(df,
site_list,
use_solar_geom = TRUE,
psuedo = FALSE){

if(!psuedo){
reference_datetime <- lubridate::as_datetime(df$reference_datetime)[1]
}else{
reference_datetime <- NA
}

var_order <- names(df)

ensemble_maxtime <- df |>
dplyr::group_by(site_id, family, ensemble) |>
dplyr::summarise(max_time = max(datetime), .groups = "drop")

ensembles <- unique(df$ensemble)
datetime <- seq(min(df$datetime), max(df$datetime), by = "1 hour")
variables <- unique(df$variable)
sites <- unique(df$site_id)

full_time <- expand.grid(sites, ensembles, datetime, variables) |>
dplyr::rename(site_id = Var1,
ensemble = Var2,
datetime = Var3,
variable = Var4) |>
dplyr::mutate(datetime = lubridate::as_datetime(datetime)) |>
dplyr::arrange(site_id, ensemble, variable, datetime) |>
dplyr::left_join(ensemble_maxtime, by = c("site_id","ensemble")) |>
dplyr::filter(datetime <= max_time) |>
dplyr::select(-c("max_time")) |>
dplyr::distinct()

states <- df |>
dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |>
dplyr::filter(!psuedo | (psuedo & horizon != "006") | (psuedo & datetime == max(df$datetime))) |>
dplyr::select(-horizon) |>
dplyr::group_by(site_id, family, ensemble, variable) |>
dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |>
dplyr::filter(variable %in% c("PRES", "RH", "TMP", "UGRD", "VGRD")) |>
dplyr::arrange(site_id, family, ensemble, datetime) |>
dplyr::mutate(prediction = imputeTS::na_interpolation(prediction, option = "linear")) |>
dplyr::mutate(prediction = ifelse(variable == "TMP", prediction + 273, prediction)) |>
dplyr::mutate(prediction = ifelse(variable == "RH", prediction/100, prediction)) |>
dplyr::ungroup()

fluxes <- df |>
dplyr::select(site_id, family, horizon, ensemble, datetime, variable, prediction) |>
dplyr::filter(horizon != "003") |>
dplyr::select(-horizon) |>
dplyr::group_by(site_id, family, ensemble, variable) |>
dplyr::right_join(full_time, by = c("site_id", "ensemble", "datetime", "family", "variable")) |>
dplyr::filter(variable %in% c("APCP","DSWRF","DLWRF")) |>
dplyr::arrange(site_id, family, ensemble, datetime) |>
tidyr::fill(prediction, .direction = "up") |>
dplyr::mutate(prediction = ifelse(variable == "APCP", prediction / (6 * 60 * 60), prediction),
variable = ifelse(variable == "APCP", "PRATE", variable)) |>
dplyr::ungroup()

if(use_solar_geom){

fluxes <- fluxes |>
dplyr::left_join(site_list, by = "site_id") |>
dplyr::mutate(hour = lubridate::hour(datetime),
date = lubridate::as_date(datetime),
doy = lubridate::yday(datetime) + hour/24,
longitude = ifelse(longitude < 0, 360 + longitude, longitude),
rpot = downscale_solar_geom(doy, longitude, latitude)) |> # hourly sw flux calculated using solar geometry
dplyr::group_by(site_id, family, ensemble, date, variable) |>
dplyr::mutate(avg.rpot = mean(rpot, na.rm = TRUE),
avg.SW = mean(prediction, na.rm = TRUE))|> # daily sw mean from solar geometry
dplyr::ungroup() |>
dplyr::mutate(prediction = ifelse(variable == "DSWRF" & avg.rpot > 0.0, rpot * (avg.SW/avg.rpot),prediction)) |>
dplyr::select(any_of(var_order))
}

hourly_df <- dplyr::bind_rows(states, fluxes) |>
dplyr::arrange(site_id, family, ensemble, datetime) |>
dplyr::mutate(variable = ifelse(variable == "TMP", "air_temperature", variable),
variable = ifelse(variable == "PRES", "air_pressure", variable),
variable = ifelse(variable == "RH", "relative_humidity", variable),
variable = ifelse(variable == "DLWRF", "surface_downwelling_longwave_flux_in_air", variable),
variable = ifelse(variable == "DSWRF", "surface_downwelling_shortwave_flux_in_air", variable),
variable = ifelse(variable == "PRATE", "precipitation_flux", variable),
variable = ifelse(variable == "VGRD", "eastward_wind", variable),
variable = ifelse(variable == "UGRD", "northward_wind", variable),
variable = ifelse(variable == "APCP", "precipitation_amount", variable),
reference_datetime = reference_datetime) |>
dplyr::select(any_of(var_order))

return(hourly_df)

}

cos_solar_zenith_angle <- function(doy, lat, lon, dt, hr) {
et <- equation_of_time(doy)
merid <- floor(lon / 15) * 15
merid[merid < 0] <- merid[merid < 0] + 15
lc <- (lon - merid) * -4/60 ## longitude correction
tz <- merid / 360 * 24 ## time zone
midbin <- 0.5 * dt / 86400 * 24 ## shift calc to middle of bin
t0 <- 12 + lc - et - tz - midbin ## solar time
h <- pi/12 * (hr - t0) ## solar hour
dec <- -23.45 * pi / 180 * cos(2 * pi * (doy + 10) / 365) ## declination
cosz <- sin(lat * pi / 180) * sin(dec) + cos(lat * pi / 180) * cos(dec) * cos(h)
cosz[cosz < 0] <- 0
return(cosz)
}

equation_of_time <- function(doy) {
stopifnot(doy <= 367)
f <- pi / 180 * (279.5 + 0.9856 * doy)
et <- (-104.7 * sin(f) + 596.2 * sin(2 * f) + 4.3 *
sin(4 * f) - 429.3 * cos(f) - 2 *
cos(2 * f) + 19.3 * cos(3 * f)) / 3600 # equation of time -> eccentricity and obliquity
return(et)
}

downscale_solar_geom <- function(doy, lon, lat) {

dt <- median(diff(doy)) * 86400 # average number of seconds in time interval
hr <- (doy - floor(doy)) * 24 # hour of day for each element of doy

## calculate potential radiation
cosz <- cos_solar_zenith_angle(doy, lat, lon, dt, hr)
rpot <- 1366 * cosz
return(rpot)
}

11 changes: 6 additions & 5 deletions drivers/update_stage3.R
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
library(gdalcubes)
library(gefs4cast)
source("https://raw.githubusercontent.com/eco4cast/neon4cast/main/R/to_hourly.R")
source("drivers/to_hourly.R")

site_list <- readr::read_csv(paste0("https://github.com/eco4cast/usgsrc4cast-ci/",
"raw/prod/USGS_site_metadata.csv"),
show_col_types = FALSE) |>
dplyr::pull(site_id)
show_col_types = FALSE)

Sys.setenv("GEFS_VERSION"="v12")

Expand All @@ -15,7 +14,7 @@ driver_path <- stringr::word(config$driver_bucket, 2, -1, sep = "/")

future::plan("future::multisession", workers = 8)

furrr::future_walk(site_list, function(curr_site_id){
furrr::future_walk(dplyr::pull(site_list, site_id), function(curr_site_id){

print(curr_site_id)

Expand Down Expand Up @@ -50,7 +49,9 @@ furrr::future_walk(site_list, function(curr_site_id){
if(nrow(psuedo_df) > 0){

df2 <- psuedo_df |>
to_hourly(use_solar_geom = TRUE, psuedo = TRUE) |>
to_hourly(site_list = dplyr::select(site_list, site_id, latitude, longitude),
use_solar_geom = TRUE,
psuedo = TRUE) |>
dplyr::mutate(ensemble = as.numeric(stringr::str_sub(ensemble, start = 4, end = 5))) |>
dplyr::rename(parameter = ensemble)

Expand Down

0 comments on commit 1d027ee

Please sign in to comment.