diff --git a/.github/workflows/submissions.yaml b/.github/workflows/submissions.yaml index c3fdc8c185..860042a9e7 100644 --- a/.github/workflows/submissions.yaml +++ b/.github/workflows/submissions.yaml @@ -20,6 +20,8 @@ jobs: container: eco4cast/rocker-neon4cast:latest steps: - uses: actions/checkout@v3 + with: + ref: submissions_processing # TODO: change to prod when happy - name: Process submissions shell: Rscript {0} diff --git a/submission_processing/process_submissions.R b/submission_processing/process_submissions.R index 1ad913c8ee..a07dbd57af 100644 --- a/submission_processing/process_submissions.R +++ b/submission_processing/process_submissions.R @@ -1,6 +1,6 @@ -library(neon4cast) #project_specific - +library(read4cast) +library(score4cast) library(readr) library(dplyr) library(arrow) @@ -11,14 +11,15 @@ library(tools) library(fs) library(stringr) library(lubridate) +source("R/eco4cast-helpers/forecast_output_validator.R") install_mc() config <- yaml::read_yaml("challenge_configuration.yaml") -sites <- readr::read_csv(config$site_table,show_col_types = FALSE) |> - select(field_site_id, latitude, longitude) |> - rename(site_id = field_site_id) +sites <- readr::read_csv(config$site_table, + show_col_types = FALSE) |> + select(site_id, latitude, longitude) minioclient::mc_alias_set("s3_store", config$endpoint, @@ -38,10 +39,14 @@ fs::dir_create(local_dir) message("Downloading forecasts ...") -minioclient::mc_mirror(from = paste0("submit/",config$submissions_bucket), to = local_dir) +minioclient::mc_mirror(from = fs::path("submit", config$submissions_bucket, config$project_id), + to = local_dir) -submissions <- fs::dir_ls(local_dir, recurse = TRUE, type = "file") +submissions <- fs::dir_ls(local_dir, + recurse = TRUE, + type = "file") submissions_filenames <- basename(submissions) +print(submissions) if(length(submissions) > 0){ @@ -67,7 +72,9 @@ if(length(submissions) > 0){ s3_inventory$CreateDir(paste0("inventory/catalog/forecasts/project_id=", config$project_id)) - s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket,"/catalog/forecasts/project_id=", config$project_id), + s3_inventory <- arrow::s3_bucket(paste0(config$inventory_bucket, + "/catalog/forecasts/project_id=", + config$project_id), endpoint_override = config$endpoint, access_key = Sys.getenv("OSN_KEY"), secret_key = Sys.getenv("OSN_SECRET")) @@ -76,6 +83,8 @@ if(length(submissions) > 0){ time_stamp <- format(Sys.time(), format = "%Y%m%d%H%M%S") + print(inventory_df) + for(i in 1:length(submissions)){ curr_submission <- basename(submissions[i]) @@ -91,26 +100,29 @@ if(length(submissions) > 0){ if(valid){ + # still OK to use read4cast as there aren't challenge-specific things + # in the package, other than list of all potential target variables, + # which could be updated if we forecast new variables (but for usgsrc4cast we're forecasting chla) fc <- read4cast::read_forecast(submissions[i]) pub_datetime <- strftime(Sys.time(), format = "%Y-%m-%d %H:%M:%S", tz = "UTC") if(!"duration" %in% names(fc)){ - if(theme == "terrestrial_30min"){ - fc <- fc |> dplyr::mutate(duration = "PT30M") - }else if(theme %in% c("ticks","beetles")){ - fc <- fc |> dplyr::mutate(duration = "P1W") - }else if(theme %in% c("aquatics","phenology","terrestrial_daily")){ - fc <- fc |> dplyr::mutate(duration = "P1D") - }else{ - if(stringr::str_detect(fc$datetime[1], ":")){ - fc <- fc |> dplyr::mutate(duration = "P1H") - }else{ - fc <- fc |> dplyr::mutate(duration = "P1D") - } - } + # if(theme == "terrestrial_30min"){ + # fc <- fc |> dplyr::mutate(duration = "PT30M") + # }else if(theme %in% c("ticks","beetles")){ + # fc <- fc |> dplyr::mutate(duration = "P1W") + # }else if(theme %in% c("aquatics","phenology","terrestrial_daily")){ + # fc <- fc |> dplyr::mutate(duration = "P1D") + # }else{ + # if(stringr::str_detect(fc$datetime[1], ":")){ + # fc <- fc |> dplyr::mutate(duration = "P1H") + # }else{ + fc <- fc |> dplyr::mutate(duration = "P1D") # currently only have "P1D" duration for usgsrc4cast + # } } + if(!("model_id" %in% colnames(fc))){ fc <- fc |> mutate(model_id = file_name_model_id) }else if(fc$model_id[1] == "null"){ @@ -128,7 +140,7 @@ if(length(submissions) > 0){ reference_datetime = lubridate::as_datetime(reference_datetime), reference_date = lubridate::as_date(reference_datetime), parameter = as.character(parameter), - project_id = "neon4cast") |> + project_id = config$project_id) |> dplyr::filter(datetime >= reference_datetime) print(head(fc)) @@ -142,8 +154,12 @@ if(length(submissions) > 0){ s3$CreateDir(paste0("summaries")) fc |> - dplyr::summarise(prediction = mean(prediction), .by = dplyr::any_of(c("site_id", "datetime", "reference_datetime", "family", "depth_m", "duration", "model_id", - "parameter", "pub_datetime", "reference_date", "variable", "project_id"))) |> + dplyr::summarise(prediction = mean(prediction), + .by = dplyr::any_of(c("site_id", "datetime", + "reference_datetime", "family", + "depth_m", "duration", "model_id", + "parameter", "pub_datetime", + "reference_date", "variable", "project_id"))) |> score4cast::summarize_forecast(extra_groups = c("duration", "project_id", "depth_m")) |> dplyr::mutate(reference_date = lubridate::as_date(reference_datetime)) |> arrow::write_dataset(s3$path("summaries"), format = 'parquet', @@ -173,12 +189,18 @@ if(length(submissions) > 0){ submission_timestamp <- paste0(submission_dir,"/T", time_stamp, "_", basename(submissions[i])) fs::file_copy(submissions[i], submission_timestamp) - raw_bucket_object <- paste0("s3_store/",config$forecasts_bucket,"/raw/",basename(submission_timestamp)) + raw_bucket_object <- paste0("s3_store/", + config$forecasts_bucket, + "/raw/project_id=", config$project_id, "/", + basename(submission_timestamp)) minioclient::mc_cp(submission_timestamp, paste0(dirname(raw_bucket_object),"/", basename(submission_timestamp))) if(length(minioclient::mc_ls(raw_bucket_object)) > 0){ - minioclient::mc_rm(file.path("submit",config$submissions_bucket,curr_submission)) + minioclient::mc_rm(file.path("submit", + config$submissions_bucket, + config$project_id, + curr_submission)) } rm(fc) @@ -188,12 +210,18 @@ if(length(submissions) > 0){ submission_timestamp <- paste0(submission_dir,"/T", time_stamp, "_", basename(submissions[i])) fs::file_copy(submissions[i], submission_timestamp) - raw_bucket_object <- paste0("s3_store/",config$forecasts_bucket,"/raw/",basename(submission_timestamp)) + raw_bucket_object <- paste0("s3_store/", + config$forecasts_bucket, + "/raw/project_id=", config$project_id, "/", + basename(submission_timestamp)) minioclient::mc_cp(submission_timestamp, paste0(dirname(raw_bucket_object),"/", basename(submission_timestamp))) if(length(minioclient::mc_ls(raw_bucket_object)) > 0){ - minioclient::mc_rm(file.path("submit",config$submissions_bucket,curr_submission)) + minioclient::mc_rm(file.path("submit", + config$submissions_bucket, + config$project_id, + curr_submission)) } }