diff --git a/NAMESPACE b/NAMESPACE index 58d568c0ab..baed6524ae 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -181,6 +181,7 @@ importFrom(dplyr,first) importFrom(dplyr,full_join) importFrom(dplyr,group_by) importFrom(dplyr,group_by_at) +importFrom(dplyr,group_split) importFrom(dplyr,if_else) importFrom(dplyr,lag) importFrom(dplyr,mutate) @@ -320,4 +321,5 @@ importFrom(tidyselect,matches) importFrom(tidyselect,vars_select) importFrom(utils,capture.output) importFrom(utils,file.edit) +importFrom(utils,object.size) importFrom(utils,str) diff --git a/NEWS.md b/NEWS.md index 6a941b2dd3..682d9d586c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -3,7 +3,7 @@ ## New Features - New function `derive_vars_cat()` for deriving pairs of variables or more, e.g. -`AVALCATx` & `AVALCAxN`. (#2480) +`AVALCATy` & `AVALCAyN`. (#2480) - New function `derive_vars_crit_flag()` for deriving criterion flag variables (`CRITy`, `CRITyFL`, `CRITyFLN`). (#2468) - New function `transform_range()` to transform values from a source range to a @@ -45,6 +45,10 @@ or that the queries dataset contains duplicates. (#2543) - In `get_summary_records()`, previously deprecated formal arguments `analysis_var` and `summary_fun` now removed from function, documentation, tests etc. (#2521) +- The functions `derive_vars_joined()`, `derive_var_joined_exist_flag()`, +`derive_extreme_event()`, and `filter_joined()` were updated to reduce their +memory consumption. (#2590) + ## Breaking Changes - The following function arguments are entering the next phase of the deprecation process: (#2487) diff --git a/R/admiral-package.R b/R/admiral-package.R index f6866180ff..9b91b084bc 100644 --- a/R/admiral-package.R +++ b/R/admiral-package.R @@ -4,9 +4,9 @@ #' @importFrom cli cli_abort ansi_collapse cli_div cli_inform cli_text cli_warn #' @importFrom dplyr across arrange between bind_cols bind_rows case_when #' coalesce desc distinct ends_with everything filter first full_join group_by -#' group_by_at if_else mutate n n_distinct na_if pull rename rename_with -#' row_number select semi_join slice starts_with summarise summarise_all -#' tibble tribble ungroup union lag +#' group_by_at group_split if_else lag mutate n n_distinct na_if pull rename +#' rename_with row_number select semi_join slice starts_with summarise +#' summarise_all tibble tribble ungroup union #' @importFrom hms as_hms #' @importFrom lifecycle deprecate_warn deprecate_stop deprecated #' @importFrom lubridate %--% as_datetime ceiling_date date days duration @@ -30,6 +30,6 @@ #' str_trim #' @importFrom tidyr crossing drop_na fill nest pivot_longer pivot_wider unnest #' @importFrom tidyselect all_of any_of contains matches vars_select -#' @importFrom utils capture.output file.edit str +#' @importFrom utils capture.output file.edit object.size str #' "_PACKAGE" diff --git a/R/derive_joined.R b/R/derive_joined.R index 5af5edf546..566bbc64fc 100644 --- a/R/derive_joined.R +++ b/R/derive_joined.R @@ -806,17 +806,92 @@ get_joined_data <- function(dataset, # join the input dataset with itself such that to each observation of the # input dataset all following observations are joined + data_add_to_join <- select( + data_add, + !!!by_vars, + !!!replace_values_by_names(extract_vars(order)), + !!!replace_values_by_names(join_vars), + !!tmp_obs_nr_var + ) + + # split input dataset into smaller pieces and process them separately + # This reduces the memory consumption. + if (is.null(by_vars_left)) { + # create batches of about 1MB input data + obs_per_batch <- floor(1000000 / as.numeric(object.size(data) / nrow(data))) + tmp_batch_nr <- get_new_tmp_var(data, prefix = "tmp_batch_nr") + data_list <- data %>% + mutate(!!tmp_batch_nr := ceiling(row_number() / obs_per_batch)) %>% + group_by(!!tmp_batch_nr) %>% + group_split(.keep = FALSE) + data_add_list <- list(data_add_to_join) + } else { + data_nest <- nest(data, data = everything(), .by = vars2chr(unname(by_vars_left))) + data_add_nest <- nest(data_add, data_add = everything(), .by = vars2chr(unname(by_vars_left))) + data_all_nest <- inner_join(data_nest, data_add_nest, by = vars2chr(by_vars_left)) + data_list <- data_all_nest$data + data_add_list <- data_all_nest$data_add + } + + joined_data <- map2( + data_list, + data_add_list, + function(x, y) { + get_joined_sub_data( + x, + y, + by_vars = by_vars_left, + tmp_obs_nr_var = tmp_obs_nr_var, + tmp_obs_nr_left = tmp_obs_nr_left, + join_type = join_type, + first_cond_upper = first_cond_upper, + first_cond_lower = first_cond_lower, + filter_join = filter_join + ) + } + ) + + bind_rows(joined_data) %>% + remove_tmp_vars() %>% + select(-!!tmp_obs_nr_var_join) +} + +#' Join Data for "joined" functions +#' +#' The helper function joins the data for the "joined" functions. All `.join` +#' variables are included in the output dataset. It is called by +#' `get_joined_data()` to process each by group separately. This reduces the +#' memory consumption. +#' +#' @inheritParams get_joined_data +#' +#' @details +#' +#' 1. The input dataset (`dataset`) and the additional dataset (`dataset_add`) +#' are left joined by the grouping variables (`by_vars`). If no grouping +#' variables are specified, a full join is performed. +#' +#' 1. The joined dataset is restricted as specified by arguments `join_type`, +#' `first_cond_upper`, and `first_cond_lower`. See argument descriptions for +#' details. +#' +#' 1. The joined dataset is restricted by the `filter_join` condition. +#' +#' @keywords internal +get_joined_sub_data <- function(dataset, + dataset_add, + by_vars, + tmp_obs_nr_var, + tmp_obs_nr_left, + join_type, + first_cond_upper, + first_cond_lower, + filter_join) { data_joined <- left_join( - data, - select( - data_add, - !!!by_vars, - !!!replace_values_by_names(extract_vars(order)), - !!!replace_values_by_names(join_vars), - !!tmp_obs_nr_var - ), - by = vars2chr(by_vars_left), + dataset, + dataset_add, + by = vars2chr(by_vars), suffix = c("", ".join") ) @@ -837,7 +912,7 @@ get_joined_data <- function(dataset, # select all observations up to the first confirmation observation data_joined <- filter_relative( data_joined, - by_vars = expr_c(by_vars_left, tmp_obs_nr_var), + by_vars = expr_c(by_vars, tmp_obs_nr_var), condition = !!first_cond_upper, order = exprs(!!parse_expr(paste0(as_name(tmp_obs_nr_var), ".join"))), mode = "first", @@ -851,7 +926,7 @@ get_joined_data <- function(dataset, # select all observations up to the first confirmation observation data_joined <- filter_relative( data_joined, - by_vars = expr_c(by_vars_left, tmp_obs_nr_var), + by_vars = expr_c(by_vars, tmp_obs_nr_var), condition = !!first_cond_lower, order = exprs(!!parse_expr(paste0("desc(", as_name(tmp_obs_nr_var), ".join)"))), mode = "first", @@ -862,9 +937,7 @@ get_joined_data <- function(dataset, } # apply confirmation condition, which may include summary functions data_joined %>% - group_by(!!!by_vars_left, !!tmp_obs_nr_left) %>% + group_by(!!!by_vars, !!tmp_obs_nr_left) %>% filter_if(filter_join) %>% - ungroup() %>% - remove_tmp_vars() %>% - select(-!!tmp_obs_nr_var_join) + ungroup() } diff --git a/man/get_joined_sub_data.Rd b/man/get_joined_sub_data.Rd new file mode 100644 index 0000000000..70b2f1a4ec --- /dev/null +++ b/man/get_joined_sub_data.Rd @@ -0,0 +1,118 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/derive_joined.R +\name{get_joined_sub_data} +\alias{get_joined_sub_data} +\title{Join Data for "joined" functions} +\usage{ +get_joined_sub_data( + dataset, + dataset_add, + by_vars, + tmp_obs_nr_var, + tmp_obs_nr_left, + join_type, + first_cond_upper, + first_cond_lower, + filter_join +) +} +\arguments{ +\item{dataset}{Input dataset + +The variables specified by the \code{by_vars} argument are expected to be in the dataset.} + +\item{dataset_add}{Additional dataset + +The variables specified by the \code{by_vars}, the \code{new_vars}, the \code{join_vars}, +and the \code{order} argument are expected.} + +\item{by_vars}{Grouping variables + +The two datasets are joined by the specified variables. + +Variables can be renamed by naming the element, i.e. +\verb{by_vars = exprs( = )}, similar to the \code{dplyr} joins. + +\emph{Permitted Values}: list of variables created by \code{exprs()} +e.g. \code{exprs(USUBJID, VISIT)}} + +\item{tmp_obs_nr_var}{Temporary observation number + +The specified variable is added to the input dataset (\code{dataset}) and the +additional dataset (\code{dataset_add}). It is set to the observation number +with respect to \code{order}. For each by group (\code{by_vars}) the observation +number starts with \code{1}. The variable can be used in the conditions +(\code{filter_join}, \code{first_cond_upper}, \code{first_cond_lower}). It can also be +used to select consecutive observations or the last observation.} + +\item{join_type}{Observations to keep after joining + +The argument determines which of the joined observations are kept with +respect to the original observation. For example, if \code{join_type = "after"} +is specified all observations after the original observations are kept. + +For example for confirmed response or BOR in the oncology setting or +confirmed deterioration in questionnaires the confirmatory assessment must +be after the assessment. Thus \code{join_type = "after"} could be used. + +Whereas, sometimes you might allow for confirmatory observations to occur +prior to the observation. For example, to identify AEs occurring on or +after seven days before a COVID AE. Thus \code{join_type = "all"} could be used. + +\emph{Permitted Values:} \code{"before"}, \code{"after"}, \code{"all"}} + +\item{first_cond_upper}{Condition for selecting range of data (after) + +If this argument is specified, the other observations are restricted up to +the first observation where the specified condition is fulfilled. If the +condition is not fulfilled for any of the other observations, no +observations are considered, i.e., the observation is not flagged. + +This argument should be specified if \code{filter_join} contains summary +functions which should not apply to all observations but only up to the +confirmation assessment.} + +\item{first_cond_lower}{Condition for selecting range of data (before) + +If this argument is specified, the other observations are restricted from +the first observation before the current observation where the specified +condition is fulfilled up to the current observation. If the condition is +not fulfilled for any of the other observations, no observations are +considered, i.e., the observation is not flagged. + +This argument should be specified if \code{filter_join} contains summary +functions which should not apply to all observations but only from a +certain observation before the current observation up to the current +observation.} + +\item{filter_join}{Filter for the joined dataset + +The specified condition is applied to the joined dataset. Therefore +variables from both datasets \code{dataset} and \code{dataset_add} can be used. + +Variables created by \code{order} or \code{new_vars} arguments can be used in the +condition. + +The condition can include summary functions like \code{all()} or \code{any()}. The +joined dataset is grouped by the original observations. + +\emph{Permitted Values}: a condition} +} +\description{ +The helper function joins the data for the "joined" functions. All \code{.join} +variables are included in the output dataset. It is called by +\code{get_joined_data()} to process each by group separately. This reduces the +memory consumption. +} +\details{ +\enumerate{ +\item The input dataset (\code{dataset}) and the additional dataset (\code{dataset_add}) +are left joined by the grouping variables (\code{by_vars}). If no grouping +variables are specified, a full join is performed. +\item The joined dataset is restricted as specified by arguments \code{join_type}, +\code{first_cond_upper}, and \code{first_cond_lower}. See argument descriptions for +details. +\item The joined dataset is restricted by the \code{filter_join} condition. +} +} +\keyword{internal}