Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #2590 improve_joined: improve performance of *joined* functions #2599

Merged
merged 10 commits into from
Dec 13, 2024
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ importFrom(dplyr,first)
importFrom(dplyr,full_join)
importFrom(dplyr,group_by)
importFrom(dplyr,group_by_at)
importFrom(dplyr,group_split)
importFrom(dplyr,if_else)
importFrom(dplyr,lag)
importFrom(dplyr,mutate)
Expand Down Expand Up @@ -320,4 +321,5 @@ importFrom(tidyselect,matches)
importFrom(tidyselect,vars_select)
importFrom(utils,capture.output)
importFrom(utils,file.edit)
importFrom(utils,object.size)
importFrom(utils,str)
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## New Features

- New function `derive_vars_cat()` for deriving pairs of variables or more, e.g.
`AVALCATx` & `AVALCAxN`. (#2480)
`AVALCATy` & `AVALCAyN`. (#2480)
- New function `derive_vars_crit_flag()` for deriving criterion flag variables
(`CRITy`, `CRITyFL`, `CRITyFLN`). (#2468)
- New function `transform_range()` to transform values from a source range to a
Expand Down Expand Up @@ -45,6 +45,10 @@ or that the queries dataset contains duplicates. (#2543)

- In `get_summary_records()`, previously deprecated formal arguments `analysis_var` and `summary_fun` now removed from function, documentation, tests etc. (#2521)

- The functions `derive_vars_joined()`, `derive_var_joined_exist_flag()`,
`derive_extreme_event()`, and `filter_joined()` were updated to reduce their
memory consumption. (#2590)

## Breaking Changes

- The following function arguments are entering the next phase of the deprecation process: (#2487)
Expand Down
8 changes: 4 additions & 4 deletions R/admiral-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#' @importFrom cli cli_abort ansi_collapse cli_div cli_inform cli_text cli_warn
#' @importFrom dplyr across arrange between bind_cols bind_rows case_when
#' coalesce desc distinct ends_with everything filter first full_join group_by
#' group_by_at if_else mutate n n_distinct na_if pull rename rename_with
#' row_number select semi_join slice starts_with summarise summarise_all
#' tibble tribble ungroup union lag
#' group_by_at group_split if_else lag mutate n n_distinct na_if pull rename
#' rename_with row_number select semi_join slice starts_with summarise
#' summarise_all tibble tribble ungroup union
#' @importFrom hms as_hms
#' @importFrom lifecycle deprecate_warn deprecate_stop deprecated
#' @importFrom lubridate %--% as_datetime ceiling_date date days duration
Expand All @@ -30,6 +30,6 @@
#' str_trim
#' @importFrom tidyr crossing drop_na fill nest pivot_longer pivot_wider unnest
#' @importFrom tidyselect all_of any_of contains matches vars_select
#' @importFrom utils capture.output file.edit str
#' @importFrom utils capture.output file.edit object.size str
#'
"_PACKAGE"
103 changes: 88 additions & 15 deletions R/derive_joined.R
Original file line number Diff line number Diff line change
Expand Up @@ -806,17 +806,92 @@ get_joined_data <- function(dataset,

# join the input dataset with itself such that to each observation of the
# input dataset all following observations are joined
data_add_to_join <- select(
bms63 marked this conversation as resolved.
Show resolved Hide resolved
data_add,
!!!by_vars,
!!!replace_values_by_names(extract_vars(order)),
!!!replace_values_by_names(join_vars),
!!tmp_obs_nr_var
)

# split input dataset into smaller pieces and process them separately
# This reduces the memory consumption.
if (is.null(by_vars_left)) {
# create batches of about 1MB input data
obs_per_batch <- floor(1000000 / as.numeric(object.size(data) / nrow(data)))
tmp_batch_nr <- get_new_tmp_var(data, prefix = "tmp_batch_nr")
data_list <- data %>%
mutate(!!tmp_batch_nr := ceiling(row_number() / obs_per_batch)) %>%
group_by(!!tmp_batch_nr) %>%
group_split(.keep = FALSE)
data_add_list <- list(data_add_to_join)
} else {
data_nest <- nest(data, data = everything(), .by = vars2chr(unname(by_vars_left)))
data_add_nest <- nest(data_add, data_add = everything(), .by = vars2chr(unname(by_vars_left)))
data_all_nest <- inner_join(data_nest, data_add_nest, by = vars2chr(by_vars_left))
data_list <- data_all_nest$data
data_add_list <- data_all_nest$data_add
}

joined_data <- map2(
data_list,
data_add_list,
function(x, y) {
get_joined_sub_data(
x,
y,
by_vars = by_vars_left,
tmp_obs_nr_var = tmp_obs_nr_var,
tmp_obs_nr_left = tmp_obs_nr_left,
join_type = join_type,
first_cond_upper = first_cond_upper,
first_cond_lower = first_cond_lower,
filter_join = filter_join
)
}
)

bind_rows(joined_data) %>%
remove_tmp_vars() %>%
select(-!!tmp_obs_nr_var_join)
}

#' Join Data for "joined" functions
#'
#' The helper function joins the data for the "joined" functions. All `.join`
#' variables are included in the output dataset. It is called by
#' `get_joined_data()` to process each by group separately. This reduces the
#' memory consumption.
#'
#' @inheritParams get_joined_data
#'
#' @details
#'
#' 1. The input dataset (`dataset`) and the additional dataset (`dataset_add`)
#' are left joined by the grouping variables (`by_vars`). If no grouping
#' variables are specified, a full join is performed.
#'
#' 1. The joined dataset is restricted as specified by arguments `join_type`,
#' `first_cond_upper`, and `first_cond_lower`. See argument descriptions for
#' details.
#'
#' 1. The joined dataset is restricted by the `filter_join` condition.
#'
#' @keywords internal
get_joined_sub_data <- function(dataset,
bms63 marked this conversation as resolved.
Show resolved Hide resolved
dataset_add,
by_vars,
tmp_obs_nr_var,
tmp_obs_nr_left,
join_type,
first_cond_upper,
first_cond_lower,
filter_join) {
data_joined <-
left_join(
data,
select(
data_add,
!!!by_vars,
!!!replace_values_by_names(extract_vars(order)),
!!!replace_values_by_names(join_vars),
!!tmp_obs_nr_var
),
by = vars2chr(by_vars_left),
dataset,
dataset_add,
by = vars2chr(by_vars),
suffix = c("", ".join")
)

Expand All @@ -837,7 +912,7 @@ get_joined_data <- function(dataset,
# select all observations up to the first confirmation observation
data_joined <- filter_relative(
data_joined,
by_vars = expr_c(by_vars_left, tmp_obs_nr_var),
by_vars = expr_c(by_vars, tmp_obs_nr_var),
condition = !!first_cond_upper,
order = exprs(!!parse_expr(paste0(as_name(tmp_obs_nr_var), ".join"))),
mode = "first",
Expand All @@ -851,7 +926,7 @@ get_joined_data <- function(dataset,
# select all observations up to the first confirmation observation
data_joined <- filter_relative(
data_joined,
by_vars = expr_c(by_vars_left, tmp_obs_nr_var),
by_vars = expr_c(by_vars, tmp_obs_nr_var),
condition = !!first_cond_lower,
order = exprs(!!parse_expr(paste0("desc(", as_name(tmp_obs_nr_var), ".join)"))),
mode = "first",
Expand All @@ -862,9 +937,7 @@ get_joined_data <- function(dataset,
}
# apply confirmation condition, which may include summary functions
data_joined %>%
group_by(!!!by_vars_left, !!tmp_obs_nr_left) %>%
group_by(!!!by_vars, !!tmp_obs_nr_left) %>%
filter_if(filter_join) %>%
ungroup() %>%
remove_tmp_vars() %>%
select(-!!tmp_obs_nr_var_join)
ungroup()
}
118 changes: 118 additions & 0 deletions man/get_joined_sub_data.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading