From 4c7dbc063b5bfa925b93579da1720af6b280f089 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Fri, 11 Aug 2023 12:38:54 -0700 Subject: [PATCH 1/2] tidymodels process update --- DESCRIPTION | 2 +- NEWS.md | 6 + R/ensemble_models.R | 406 +++++++++------------------------ R/train_models.R | 537 ++++++++++++++------------------------------ R/utility.R | 3 +- 5 files changed, 281 insertions(+), 673 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 49f635a3..793e54ea 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: finnts Title: Microsoft Finance Time Series Forecasting Framework -Version: 0.3.0 +Version: 0.3.0.9000 Authors@R: c(person(given = "Mike", family = "Tokic", diff --git a/NEWS.md b/NEWS.md index e63e653f..2973340a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,9 @@ +# finnts 0.3.0.9000 (DEVELOPMENT VERSION) + +## Improvements + +- Tidymodels speed up + # finnts 0.3.0 ## Improvements diff --git a/R/ensemble_models.R b/R/ensemble_models.R index f3a9551b..94d88c06 100644 --- a/R/ensemble_models.R +++ b/R/ensemble_models.R @@ -263,7 +263,7 @@ ensemble_models <- function(run_info, } # get hyperparameters - hyperparameters_tbl <- tibble::tibble() + model_hyperparameters_tbl <- tibble::tibble() for (x in model_workflow_tbl %>% dplyr::group_split(dplyr::row_number(), .keep = FALSE)) { model <- x %>% @@ -299,7 +299,7 @@ ensemble_models <- function(run_info, tibble::rowid_to_column("Hyperparameter_Combo") %>% dplyr::mutate(Model = model) - hyperparameters_tbl <- rbind(hyperparameters_tbl, hyperparameters_temp) + model_hyperparameters_tbl <- rbind(model_hyperparameters_tbl, hyperparameters_temp) } if (inner_parallel) { @@ -311,24 +311,6 @@ ensemble_models <- function(run_info, negative_forecast <- negative_forecast } - # tune hyperparameters - tune_iter_list <- model_train_test_tbl %>% - dplyr::mutate(Combo = x) %>% - dplyr::filter(Run_Type == "Validation") %>% - dplyr::select(Combo, Train_Test_ID) %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE) %>% - purrr::map(.f = function(x) { - hyperparameters_tbl %>% - dplyr::select(Hyperparameter_Combo, Model) %>% - dplyr::rename(Hyperparameter_ID = Hyperparameter_Combo) %>% - dplyr::mutate( - Combo = x$Combo, - Train_Test_ID = x$Train_Test_ID - ) - }) %>% - dplyr::bind_rows() %>% - dplyr::select(Combo, Model, Train_Test_ID, Hyperparameter_ID) - par_info <- par_start( run_info = run_info, parallel_processing = if (inner_parallel) { @@ -337,320 +319,134 @@ ensemble_models <- function(run_info, NULL }, num_cores = num_cores, - task_length = nrow(tune_iter_list) + task_length = num_cores ) inner_cl <- par_info$cl inner_packages <- par_info$packages - `%op%` <- par_info$foreach_operator - tune_output_tbl <- foreach::foreach( - x = tune_iter_list %>% + model_tbl <- foreach::foreach( + model_run = model_workflow_tbl %>% + dplyr::select(Model_Name) %>% dplyr::group_split(dplyr::row_number(), .keep = FALSE), .combine = "rbind", - .packages = inner_packages, .errorhandling = "remove", .verbose = FALSE, .inorder = FALSE, .multicombine = TRUE, .noexport = NULL - ) %op% - { - # run input values - param_combo <- x %>% - dplyr::pull(Hyperparameter_ID) - - model <- x %>% - dplyr::pull(Model) - - data_split <- x %>% - dplyr::pull(Train_Test_ID) - - combo <- x %>% - dplyr::pull(Combo) - - train_end_date <- model_train_test_tbl %>% - dplyr::filter(Train_Test_ID == data_split) %>% - dplyr::pull(Train_End) - - test_end_date <- model_train_test_tbl %>% - dplyr::filter(Train_Test_ID == data_split) %>% - dplyr::pull(Test_End) - - # get train/test data - full_data <- prep_ensemble_tbl %>% - dplyr::mutate(Date_index.num = 0) - - training <- full_data %>% - dplyr::filter(Date <= train_end_date) %>% - dplyr::select(-Train_Test_ID) - - testing <- full_data %>% - dplyr::filter( - Date > train_end_date, - Date <= test_end_date, - Train_Test_ID == data_split - ) - - # get hyperparameters - hyperparameters <- hyperparameters_tbl %>% - dplyr::filter( - Model == model, - Hyperparameter_Combo == param_combo - ) %>% - dplyr::select(Hyperparameters) %>% - tidyr::unnest(Hyperparameters) - - # get workflow - workflow <- model_workflow_tbl %>% - dplyr::filter(Model_Name == model) - - workflow_final <- workflow$Model_Workflow[[1]] %>% - tune::finalize_workflow(parameters = hyperparameters) - - # fit model - set.seed(seed) - - model_fit <- workflow_final %>% - generics::fit(data = training) - - # create prediction - set.seed(seed) - - model_prediction <- testing %>% - dplyr::bind_cols( - predict(model_fit, new_data = testing) - ) %>% - dplyr::select(Combo, Date, Target, .pred) %>% - dplyr::rename(Forecast = .pred) %>% - negative_fcst_adj(negative_forecast) - - # finalize output tbl - final_tbl <- tibble::tibble( - Combo = combo, - Model = model, - Train_Test_ID = data_split, - Hyperparameter_ID = param_combo, - Model_Fit = list(model_fit), - Prediction = list(model_prediction) - ) + ) %do% { - return(final_tbl) - } %>% - base::suppressPackageStartupMessages() + # get initial run info + model <- model_run %>% + dplyr::pull(Model_Name) - par_end(inner_cl) + workflow <- model_workflow_tbl %>% + dplyr::filter(Model_Name == model) %>% + dplyr::select(Model_Workflow) - final_tune_iter_list <- model_train_test_tbl %>% - dplyr::mutate(Combo = x) %>% - dplyr::filter(Run_Type == "Validation") %>% - dplyr::select(Combo, Train_Test_ID) %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE) %>% - purrr::map(.f = function(x) { - hyperparameters_tbl %>% - dplyr::select(Hyperparameter_Combo, Model) %>% - dplyr::rename(Hyperparameter_ID = Hyperparameter_Combo) %>% - dplyr::mutate( - Combo = x$Combo, - Train_Test_ID = x$Train_Test_ID - ) - }) %>% - dplyr::bind_rows() %>% - dplyr::select(Combo, Model) %>% - dplyr::distinct() - - final_tune_output_tbl <- foreach::foreach( - x = final_tune_iter_list %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE), - .combine = "rbind", - .packages = NULL, - .errorhandling = "stop", - .verbose = FALSE, - .inorder = FALSE, - .multicombine = TRUE, - .noexport = NULL - ) %do% - { - combo <- x %>% - dplyr::pull(Combo) - - model <- x %>% - dplyr::pull(Model) - - test_tbl <- tune_output_tbl %>% - dplyr::filter( - Model == model - ) %>% - dplyr::select(Model, Hyperparameter_ID, Train_Test_ID, Prediction, Model_Fit) - - best_param <- test_tbl %>% - dplyr::select(-Model_Fit) %>% - tidyr::unnest(Prediction) %>% - dplyr::mutate(Combo = combo) %>% - dplyr::group_by(Combo, Model, Hyperparameter_ID) %>% - yardstick::rmse( - truth = Target, - estimate = Forecast, - na_rm = TRUE - ) %>% - dplyr::ungroup() %>% - dplyr::arrange(.estimate) %>% - dplyr::slice(1) %>% - dplyr::pull(Hyperparameter_ID) - - best_model_fit <- test_tbl %>% - dplyr::filter(Hyperparameter_ID == best_param) %>% - dplyr::slice(1) - - best_model_fit <- best_model_fit$Model_Fit[[1]] - - final_predictions <- test_tbl %>% - dplyr::filter(Hyperparameter_ID == best_param) %>% - dplyr::select(-Model_Fit) %>% - tidyr::unnest(Prediction) %>% - dplyr::select(Combo, Date, Train_Test_ID, Target, Forecast) - - return(tibble::tibble( - Combo = unique(final_predictions$Combo), - Model = model, - Hyperparameter_ID = best_param, - Model_Fit = list(best_model_fit), - Prediction = list(final_predictions) - )) - } %>% - base::suppressPackageStartupMessages() - - # refit models - refit_iter_list <- model_train_test_tbl %>% - dplyr::filter(Run_Type %in% c("Future_Forecast", "Back_Test")) %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE) %>% - purrr::map(.f = function(x) { - final_tune_output_tbl %>% - dplyr::mutate( - Run_Type = x %>% dplyr::pull(Run_Type), - Train_Test_ID = x %>% dplyr::pull(Train_Test_ID), - Train_End = x %>% dplyr::pull(Train_End), - Test_End = x %>% dplyr::pull(Test_End) - ) %>% - dplyr::select(-Model_Fit, -Prediction) - }) %>% - dplyr::bind_rows() + workflow <- workflow$Model_Workflow[[1]] - par_info <- par_start( - run_info = run_info, - parallel_processing = if (inner_parallel) { - "local_machine" + hyperparameters <- model_hyperparameters_tbl %>% + dplyr::filter(Model == model) %>% + dplyr::select(Hyperparameter_Combo, Hyperparameters) %>% + tidyr::unnest(Hyperparameters) + + # tune hyperparameters + set.seed(seed) + + tune_results <- tune::tune_grid( + object = workflow, + resamples = create_splits(prep_ensemble_tbl, model_train_test_tbl %>% dplyr::filter(Run_Type == "Validation")), + grid = hyperparameters %>% dplyr::select(-Hyperparameter_Combo), + control = tune::control_grid( + allow_par = inner_parallel, + pkgs = inner_packages, + parallel_over = "everything" + ) + ) %>% + base::suppressWarnings() + + best_param <- tune::select_best(tune_results, metric = "rmse") + + if (length(colnames(best_param)) == 1) { + hyperparameter_id <- 1 } else { - NULL - }, - num_cores = num_cores, - task_length = nrow(refit_iter_list) - ) + hyperparameter_id <- hyperparameters %>% + dplyr::inner_join(best_param) %>% + dplyr::select(Hyperparameter_Combo) %>% + dplyr::pull() %>% + base::suppressMessages() + } - inner_cl <- par_info$cl - inner_packages <- par_info$packages - `%op%` <- par_info$foreach_operator + final_wflow <- tune::finalize_workflow(workflow, best_param) + set.seed(seed) + wflow_fit <- generics::fit(final_wflow, prep_ensemble_tbl %>% tidyr::drop_na(Target)) - refit_tbl <- foreach::foreach( - x = refit_iter_list %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE), - .combine = "rbind", - .packages = inner_packages, - .errorhandling = "remove", - .verbose = FALSE, - .inorder = FALSE, - .multicombine = TRUE, - .noexport = NULL - ) %op% - { - combo <- x %>% - dplyr::pull(Combo) - - model <- x %>% - dplyr::pull(Model) - - model_fit <- final_tune_output_tbl %>% - dplyr::filter( - Model == model, - Combo == combo - ) - - final_hyperparameters <- unique(model_fit$Hyperparameter_ID) - - model_fit <- model_fit$Model_Fit[[1]] - - run_type <- x %>% - dplyr::pull(Run_Type) - - run_id <- x %>% - dplyr::pull(Train_Test_ID) - - train_end <- x %>% - dplyr::pull(Train_End) - - test_end <- x %>% - dplyr::pull(Test_End) - - full_data <- prep_ensemble_tbl %>% - dplyr::filter(Combo == combo) %>% - dplyr::mutate(Date_index.num = 0) - - training <- full_data %>% - dplyr::filter(Date <= train_end) %>% - dplyr::select(-Train_Test_ID) - - testing <- full_data %>% - dplyr::filter( - Date > train_end, - Date <= test_end, - Train_Test_ID == run_id - ) - - # fit model - set.seed(seed) - - model_fit <- model_fit %>% - generics::fit(data = training) - - # create prediction - set.seed(seed) - - model_prediction <- testing %>% - dplyr::bind_cols( - predict(model_fit, new_data = testing) - ) %>% - dplyr::select(Combo, Date, Target, .pred) %>% - dplyr::rename(Forecast = .pred) %>% - negative_fcst_adj(negative_forecast) - - # finalize output tbl - final_tbl <- tibble::tibble( - Combo_ID = combo, - Model_Name = model, - Model_Type = "local", - Recipe_ID = "Ensemble", - Train_Test_ID = run_id, - Hyperparameter_ID = final_hyperparameters, - Model_Fit = list(model_fit), - Prediction = list(model_prediction) + # refit on all train test splits + set.seed(seed) + + refit_tbl <- tune::fit_resamples( + object = final_wflow, + resamples = create_splits(prep_ensemble_tbl, model_train_test_tbl %>% dplyr::filter(Run_Type %in% c("Back_Test", "Future_Forecast"))), + metrics = NULL, + control = tune::control_resamples( + allow_par = inner_parallel, + save_pred = TRUE, + pkgs = inner_packages, + parallel_over = "everything" ) + ) - return(final_tbl) - } %>% - base::suppressPackageStartupMessages() + final_fcst <- tune::collect_predictions(refit_tbl) %>% + dplyr::rename( + Forecast = .pred, + Train_Test_ID = id + ) %>% + dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) %>% + dplyr::left_join(model_train_test_tbl %>% + dplyr::select(Run_Type, Train_Test_ID), + by = "Train_Test_ID" + ) %>% + dplyr::left_join( + prep_ensemble_tbl %>% + dplyr::mutate(.row = dplyr::row_number()) %>% + dplyr::select(Combo, Date, .row), + by = ".row" + ) %>% + dplyr::mutate(Hyperparameter_ID = hyperparameter_id) %>% + dplyr::select(-.row, -.config) %>% + negative_fcst_adj(negative_forecast) + + combo_id <- unique(final_fcst$Combo) + + final_return_tbl <- tibble::tibble( + Combo_ID = combo_id, + Model_Name = model, + Model_Type = "local", + Recipe_ID = "ensemble", + Forecast_Tbl = list(final_fcst), + Model_Fit = list(wflow_fit) + ) + + return(final_return_tbl) + } par_end(inner_cl) + # ensure at least one model ran successfully + if (nrow(model_tbl) < 1) { + stop("All models failed to train") + } + # get final combined results and final fitted models - final_model_fit_tbl <- refit_tbl %>% - dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) %>% - dplyr::filter(Train_Test_ID == 1) %>% + final_model_fit_tbl <- model_tbl %>% tidyr::unite(col = "Model_ID", c("Model_Name", "Model_Type", "Recipe_ID"), sep = "--", remove = FALSE) %>% dplyr::select(Combo_ID, Model_ID, Model_Name, Model_Type, Recipe_ID, Model_Fit) - final_ensemble_results_tbl <- refit_tbl %>% + final_ensemble_results_tbl <- model_tbl %>% dplyr::select(-Model_Fit) %>% - tidyr::unnest(Prediction) %>% + tidyr::unnest(Forecast_Tbl) %>% tidyr::unite(col = "Model_ID", c("Model_Name", "Model_Type", "Recipe_ID"), sep = "--", remove = FALSE) %>% dplyr::group_by(Combo_ID, Model_ID, Train_Test_ID) %>% dplyr::mutate(Horizon = dplyr::row_number()) %>% diff --git a/R/train_models.R b/R/train_models.R index 296050d5..a0215fb3 100644 --- a/R/train_models.R +++ b/R/train_models.R @@ -269,37 +269,6 @@ train_models <- function(run_info, negative_forecast <- negative_forecast } - # tune models - tune_iter_list <- model_train_test_tbl %>% - dplyr::mutate(Combo = x) %>% - dplyr::filter(Run_Type == "Validation") %>% - dplyr::select(Combo, Train_Test_ID) %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE) %>% - purrr::map(.f = function(x) { - temp <- model_hyperparameter_tbl %>% - dplyr::select(Hyperparameter_Combo, Model, Recipe) %>% - dplyr::rename( - Hyperparameter_ID = Hyperparameter_Combo, - Recipe_ID = Recipe - ) %>% - dplyr::mutate( - Combo = x$Combo, - Train_Test_ID = x$Train_Test_ID - ) - - if (x$Combo == "All-Data") { - temp <- temp %>% - dplyr::filter( - Model %in% global_model_list, - Recipe_ID %in% global_model_recipes - ) - } - - return(temp) - }) %>% - dplyr::bind_rows() %>% - dplyr::select(Combo, Model, Recipe_ID, Train_Test_ID, Hyperparameter_ID) - par_info <- par_start( run_info = run_info, parallel_processing = if (inner_parallel) { @@ -308,362 +277,142 @@ train_models <- function(run_info, NULL }, num_cores = num_cores, - task_length = nrow(tune_iter_list) + task_length = num_cores ) inner_cl <- par_info$cl inner_packages <- par_info$packages - `%op%` <- par_info$foreach_operator - initial_tune_tbl <- foreach::foreach( - x = tune_iter_list %>% + model_tbl <- foreach::foreach( + model_run = model_workflow_tbl %>% + dplyr::select(Model_Name, Model_Recipe) %>% dplyr::group_split(dplyr::row_number(), .keep = FALSE), .combine = "rbind", - .packages = inner_packages, .errorhandling = "remove", .verbose = FALSE, .inorder = FALSE, .multicombine = TRUE, .noexport = NULL - ) %op% - { - - # run input values - param_combo <- x %>% - dplyr::pull(Hyperparameter_ID) - - model <- x %>% - dplyr::pull(Model) - - data_split <- x %>% - dplyr::pull(Train_Test_ID) - - data_prep_recipe <- x %>% - dplyr::pull(Recipe_ID) - - combo <- x %>% - dplyr::pull(Combo) - - train_end_date <- model_train_test_tbl %>% - dplyr::filter(Train_Test_ID == data_split) %>% - dplyr::pull(Train_End) - - test_end_date <- model_train_test_tbl %>% - dplyr::filter(Train_Test_ID == data_split) %>% - dplyr::pull(Test_End) - - # get train/test data - full_data <- model_recipe_tbl %>% - dplyr::filter(Recipe == data_prep_recipe) %>% - dplyr::select(Data) %>% - tidyr::unnest(Data) - - if (combo == "All-Data") { - full_data <- full_data %>% - tidyr::separate( - col = Combo, - into = combo_variables, - sep = "---", - remove = FALSE - ) - } - - training <- full_data %>% - dplyr::filter(Date <= train_end_date) - - testing <- full_data %>% - dplyr::filter( - Date > train_end_date, - Date <= test_end_date - ) - - if (data_prep_recipe == "R2") { - train_origin_max <- training %>% - dplyr::filter(Horizon == 1) - - testing <- testing %>% - dplyr::filter(Origin == max(train_origin_max$Origin) + 1) - } - - # get hyperparameters - hyperparameters <- model_hyperparameter_tbl %>% - dplyr::filter( - Model == model, - Recipe == data_prep_recipe, - Hyperparameter_Combo == param_combo - ) %>% - dplyr::select(Hyperparameters) %>% - tidyr::unnest(Hyperparameters) - - # get workflow - workflow <- model_workflow_tbl %>% - dplyr::filter( - Model_Name == model, - Model_Recipe == data_prep_recipe - ) - - workflow_final <- workflow$Model_Workflow[[1]] %>% - tune::finalize_workflow(parameters = hyperparameters) - - # fit model - set.seed(seed) - - if (nrow(hyperparameters) > 0) { - model_fit <- workflow_final %>% - generics::fit(data = training) - } else { - model_fit <- workflow_final %>% - generics::fit(data = training) - } - - # create prediction - set.seed(seed) - - model_prediction <- testing %>% - dplyr::bind_cols( - predict(model_fit, new_data = testing) - ) %>% - dplyr::select(Combo, Date, Target, .pred) %>% - dplyr::rename(Forecast = .pred) %>% - negative_fcst_adj(negative_forecast) - - # finalize output tbl - final_tbl <- tibble::tibble( - Combo_ID = combo, - Model_Name = model, - Model_Type = ifelse(combo == "All-Data", "global", "local"), - Recipe_ID = data_prep_recipe, - Train_Test_ID = data_split, - Hyperparameter_ID = param_combo, - Prediction = list(model_prediction) + ) %do% { + + # get initial run info + model <- model_run %>% + dplyr::pull(Model_Name) + + data_prep_recipe <- model_run %>% + dplyr::pull(Model_Recipe) + + prep_data <- model_recipe_tbl %>% + dplyr::filter(Recipe == data_prep_recipe) %>% + dplyr::select(Data) %>% + tidyr::unnest(Data) + + workflow <- model_workflow_tbl %>% + dplyr::filter( + Model_Name == model, + Model_Recipe == data_prep_recipe + ) %>% + dplyr::select(Model_Workflow) + + workflow <- workflow$Model_Workflow[[1]] + + hyperparameters <- model_hyperparameter_tbl %>% + dplyr::filter( + Model == model, + Recipe == data_prep_recipe + ) %>% + dplyr::select(Hyperparameter_Combo, Hyperparameters) %>% + tidyr::unnest(Hyperparameters) + + # tune hyperparameters + set.seed(seed) + + tune_results <- tune::tune_grid( + object = workflow, + resamples = create_splits(prep_data, model_train_test_tbl %>% dplyr::filter(Run_Type == "Validation")), + grid = hyperparameters %>% dplyr::select(-Hyperparameter_Combo), + control = tune::control_grid( + allow_par = inner_parallel, + pkgs = inner_packages, + parallel_over = "everything" ) + ) %>% + base::suppressWarnings() - return(final_tbl) - } %>% - base::suppressPackageStartupMessages() - - par_end(inner_cl) + best_param <- tune::select_best(tune_results, metric = "rmse") - # check if tuning failed - if (is.null(initial_tune_tbl)) { - if (combo_hash == "All-Data") { - combo_name <- "Global-Model" + if (length(colnames(best_param)) == 1) { + hyperparameter_id <- 1 } else { - combo_name <- model_recipe_tbl %>% - dplyr::slice(1) %>% - dplyr::select(Data) %>% - tidyr::unnest(Data) %>% - dplyr::select(Combo) %>% - dplyr::pull(Combo) %>% - unique() + hyperparameter_id <- hyperparameters %>% + dplyr::inner_join(best_param) %>% + dplyr::select(Hyperparameter_Combo) %>% + dplyr::pull() %>% + base::suppressMessages() } - stop(paste0( - "All models failed during hyperparameter tuning process for time series combo: '", - combo_name, "'" - ), - call. = FALSE + final_wflow <- tune::finalize_workflow(workflow, best_param) + set.seed(seed) + wflow_fit <- generics::fit(final_wflow, prep_data %>% tidyr::drop_na(Target)) + + # refit on all train test splits + set.seed(seed) + + refit_tbl <- tune::fit_resamples( + object = final_wflow, + resamples = create_splits(prep_data, model_train_test_tbl), + metrics = NULL, + control = tune::control_resamples( + allow_par = inner_parallel, + save_pred = TRUE, + pkgs = inner_packages, + parallel_over = "everything" + ) ) - } - - # select best hyperparamters - best_param <- initial_tune_tbl %>% - tidyr::unnest(Prediction) %>% - dplyr::mutate(SE = (Target - Forecast)^2) %>% - dplyr::group_by(Combo_ID, Model_Name, Model_Type, Recipe_ID, Hyperparameter_ID) %>% - dplyr::summarise(RMSE = sqrt(mean(SE, na.rm = TRUE))) %>% - dplyr::arrange(RMSE) %>% - dplyr::slice(1) %>% - dplyr::ungroup() - - model_tune_tbl <- initial_tune_tbl %>% - dplyr::select(Model_Name, Model_Type, Recipe_ID, Hyperparameter_ID, Train_Test_ID, Prediction) %>% - dplyr::right_join(best_param, by = c("Model_Name", "Model_Type", "Recipe_ID", "Hyperparameter_ID")) %>% - tidyr::unnest(Prediction) %>% - dplyr::mutate( - Combo_Hash = Combo_ID, - Combo_ID = ifelse(Combo_ID == "All-Data", "All-Data", Combo) - ) %>% - dplyr::select(Combo_Hash, Combo_ID, Model_Name, Model_Type, Recipe_ID, Train_Test_ID, Hyperparameter_ID, Combo, Date, Forecast, Target) - - # refit models - refit_iter_list <- model_train_test_tbl %>% - dplyr::filter(Run_Type %in% c("Future_Forecast", "Back_Test", "Ensemble")) %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE) %>% - purrr::map(.f = function(x) { - model_tune_tbl %>% - dplyr::mutate( - Run_Type = x %>% dplyr::pull(Run_Type), - Train_Test_ID = x %>% dplyr::pull(Train_Test_ID), - Train_End = x %>% dplyr::pull(Train_End), - Test_End = x %>% dplyr::pull(Test_End) - ) %>% - dplyr::select( - Combo_ID, Run_Type, Train_Test_ID, Recipe_ID, - Hyperparameter_ID, Train_End, Test_End, Model_Name, Model_Type - ) %>% - dplyr::distinct() - }) %>% - dplyr::bind_rows() - - par_info <- par_start( - run_info = run_info, - parallel_processing = if (inner_parallel) { - "local_machine" - } else { - NULL - }, - num_cores = num_cores, - task_length = nrow(refit_iter_list) - ) - inner_cl <- par_info$cl - inner_packages <- par_info$packages - `%op%` <- par_info$foreach_operator - - refit_tbl <- foreach::foreach( - x = refit_iter_list %>% - dplyr::group_split(dplyr::row_number(), .keep = FALSE), - .combine = "rbind", - .packages = inner_packages, - .errorhandling = "remove", - .verbose = FALSE, - .inorder = FALSE, - .multicombine = TRUE, - .noexport = NULL - ) %op% - { - combo <- x %>% - dplyr::pull(Combo_ID) - - model <- x %>% - dplyr::pull(Model_Name) - - recipe <- x %>% - dplyr::pull(Recipe_ID) - - param <- x %>% - dplyr::pull(Hyperparameter_ID) - - run_type <- x %>% - dplyr::pull(Run_Type) - - run_id <- x %>% - dplyr::pull(Train_Test_ID) - - train_end <- x %>% - dplyr::pull(Train_End) - - test_end <- x %>% - dplyr::pull(Test_End) - - if (combo != "All-Data") { - recipe_data <- model_recipe_tbl %>% - dplyr::filter( - Recipe == recipe, - ) %>% - dplyr::select(Data) %>% - tidyr::unnest(Data) - } else { - recipe_data <- model_recipe_tbl %>% - dplyr::filter(Recipe == recipe) %>% - dplyr::select(Data) %>% - tidyr::unnest(Data) %>% - tidyr::separate( - col = Combo, - into = combo_variables, - sep = "---", - remove = FALSE - ) - } - - training <- recipe_data %>% - dplyr::filter(Date <= train_end) - - testing <- recipe_data %>% - dplyr::filter( - Date > train_end, - Date <= test_end - ) - - if (recipe == "R2") { - train_origin_max <- training %>% - dplyr::filter(Horizon == 1) - - testing <- testing %>% - dplyr::filter(Origin == max(train_origin_max$Origin) + 1) - } - - # get hyperparameters - hyperparameters <- model_hyperparameter_tbl %>% - dplyr::filter( - Model == model, - Recipe == recipe, - Hyperparameter_Combo == param - ) %>% - dplyr::select(Hyperparameters) %>% - tidyr::unnest(Hyperparameters) - - # get workflow - workflow <- model_workflow_tbl %>% - dplyr::filter( - Model_Name == model, - Model_Recipe == recipe - ) - - workflow_final <- workflow$Model_Workflow[[1]] %>% - tune::finalize_workflow(parameters = hyperparameters) - - # fit model - set.seed(seed) - - if (nrow(hyperparameters) > 0) { - model_fit <- workflow_final %>% - generics::fit(data = training) - } else { - model_fit <- workflow_final %>% - generics::fit(data = training) - } - - # create prediction - set.seed(seed) - - model_prediction <- testing %>% - dplyr::bind_cols( - predict(model_fit, new_data = testing) - ) %>% - dplyr::select(Combo, Date, Target, .pred) %>% - dplyr::rename(Forecast = .pred) %>% - negative_fcst_adj(negative_forecast) - - # finalize output tbl - if (run_id == 1) { - model_fit <- model_fit - } else { - model_fit <- NULL - } - - final_tbl <- tibble::tibble( - Combo_ID = combo, - Model_Name = model, - Model_Type = ifelse(combo == "All-Data", "global", "local"), - Recipe_ID = recipe, - Train_Test_ID = run_id, - Hyperparameter_ID = param, - Model_Fit = list(model_fit), - Prediction = list(model_prediction) - ) + final_fcst <- tune::collect_predictions(refit_tbl) %>% + dplyr::rename( + Forecast = .pred, + Train_Test_ID = id + ) %>% + dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) %>% + dplyr::left_join(model_train_test_tbl %>% + dplyr::select(Run_Type, Train_Test_ID), + by = "Train_Test_ID" + ) %>% + dplyr::left_join( + prep_data %>% + dplyr::mutate(.row = dplyr::row_number()) %>% + dplyr::select(Combo, Date, .row), + by = ".row" + ) %>% + dplyr::mutate(Hyperparameter_ID = hyperparameter_id) %>% + dplyr::select(-.row, -.config) %>% + negative_fcst_adj(negative_forecast) + + combo_id <- ifelse(x == "All-Data", "All-Data", unique(final_fcst$Combo)) + + final_return_tbl <- tibble::tibble( + Combo_ID = combo_id, + Model_Name = model, + Model_Type = ifelse(combo_id == "All-Data", "global", "local"), + Recipe_ID = data_prep_recipe, + Forecast_Tbl = list(final_fcst), + Model_Fit = list(wflow_fit) + ) - return(final_tbl) - } %>% - base::suppressPackageStartupMessages() + return(final_return_tbl) + } par_end(inner_cl) + # ensure at least one model ran successfully + if (nrow(model_tbl) < 1) { + stop("All models failed to train") + } + # write outputs - fitted_models <- refit_tbl %>% - dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) %>% - dplyr::filter(Train_Test_ID == 1) %>% + fitted_models <- model_tbl %>% tidyr::unite(col = "Model_ID", c("Model_Name", "Model_Type", "Recipe_ID"), sep = "--", remove = FALSE) %>% dplyr::select(Combo_ID, Model_ID, Model_Name, Model_Type, Recipe_ID, Model_Fit) @@ -676,11 +425,9 @@ train_models <- function(run_info, suffix = "-single_models" ) - final_forecast_tbl <- refit_tbl %>% + final_forecast_tbl <- model_tbl %>% dplyr::select(-Model_Fit) %>% - tidyr::unnest(Prediction) %>% - rbind(model_tune_tbl %>% - dplyr::select(-Combo_Hash)) %>% + tidyr::unnest(Forecast_Tbl) %>% dplyr::arrange(Train_Test_ID) %>% tidyr::unite(col = "Model_ID", c("Model_Name", "Model_Type", "Recipe_ID"), sep = "--", remove = FALSE) %>% dplyr::group_by(Combo_ID, Model_ID, Train_Test_ID) %>% @@ -807,3 +554,61 @@ negative_fcst_adj <- function(data, return(fcst_final) } + +#' Function to get train test splits in rsample format +#' +#' @param data data frame +#' @param train_test_splits list of finnts train test splits df +#' +#' @return tbl with train test splits +#' @noRd +create_splits <- function(data, train_test_splits) { + + # Create the rsplit object + analysis_split <- function(data, train_indices, test_indices) { + rsplit_object <- rsample::make_splits( + x = list(analysis = train_indices, assessment = test_indices), + data = data + ) + } + + # Create a list to store the splits and a vector to store the IDs + splits <- list() + ids <- character() + + # Loop over the rows of the split data frame + for (i in seq_len(nrow(train_test_splits))) { + # Get the train and test end dates + train_end <- train_test_splits$Train_End[i] + test_end <- train_test_splits$Test_End[i] + + + + # Create the train and test indices + train_indices <- which(data$Date <= train_end) + + if ("Horizon" %in% colnames(data)) { + # adjust for the horizon in R2 recipe data + train_data <- data %>% + dplyr::filter( + Horizon == 1, + Date <= train_end + ) + + test_indices <- which(data$Date > train_end & data$Date <= test_end & data$Origin == max(train_data$Origin) + 1) + } else { + test_indices <- which(data$Date > train_end & data$Date <= test_end) + } + + # Create the split and add it to the list + splits[[i]] <- analysis_split(data, train_indices, test_indices) + + # Add the ID to the vector + ids[i] <- as.character(train_test_splits$Train_Test_ID[i]) + } + + # Create the resamples + resamples <- rsample::manual_rset(splits = splits, ids = ids) + + return(resamples) +} diff --git a/R/utility.R b/R/utility.R index 5dbf8e79..9585d974 100644 --- a/R/utility.R +++ b/R/utility.R @@ -10,7 +10,8 @@ utils::globalVariables(c( "Sum", "Target", "Test_End", "Train_End", "Train_Test_ID", "Type", "Variable", "as2", "combo_list", "data", "get_export_packages", "hi_80", "hi_95", "i", "lo_80", "lo_95", "model_spec_1", "name", "path_ext", "predict", "read.csv", "sc", "weighted_MAPE", "where", - "x", "num_cores", "run_info", "negative_forecast", "Forecast_Adj", "Final_Col", "lag_val", "libs" + "x", "num_cores", "run_info", "negative_forecast", "Forecast_Adj", "Final_Col", "lag_val", "libs", + ".config", "Forecast_Tbl", "Model_Workflow", "id", "model_run" )) #' @importFrom magrittr %>% From 5a77ea765bcca4e5b5af033da665e0e10aa85efc Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Fri, 11 Aug 2023 13:15:47 -0700 Subject: [PATCH 2/2] remove unused dependency --- DESCRIPTION | 3 +-- R/utility.R | 2 +- cran-comments.md | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 793e54ea..b2d25618 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -59,8 +59,7 @@ Imports: timetk, tune, vroom, - workflows, - yardstick + workflows Suggests: arrow (>= 8.0.0), AzureStor, diff --git a/R/utility.R b/R/utility.R index 9585d974..9ed25b81 100644 --- a/R/utility.R +++ b/R/utility.R @@ -10,7 +10,7 @@ utils::globalVariables(c( "Sum", "Target", "Test_End", "Train_End", "Train_Test_ID", "Type", "Variable", "as2", "combo_list", "data", "get_export_packages", "hi_80", "hi_95", "i", "lo_80", "lo_95", "model_spec_1", "name", "path_ext", "predict", "read.csv", "sc", "weighted_MAPE", "where", - "x", "num_cores", "run_info", "negative_forecast", "Forecast_Adj", "Final_Col", "lag_val", "libs", + "x", "num_cores", "run_info", "negative_forecast", "Forecast_Adj", "Final_Col", "lag_val", "libs", ".config", "Forecast_Tbl", "Model_Workflow", "id", "model_run" )) diff --git a/cran-comments.md b/cran-comments.md index 3a4672f3..7073a8b5 100644 --- a/cran-comments.md +++ b/cran-comments.md @@ -4,7 +4,7 @@ There were no ERRORs or WARNINGs. There was 1 NOTE: * checking dependencies in R code ... NOTE - Imports includes 34 non-default packages. + Imports includes 32 non-default packages. Importing from so many packages makes the package vulnerable to any of them becoming unavailable. Move as many as possible to Suggests and use conditionally.