Skip to content

Commit

Permalink
implement weighted cases in arrow approach
Browse files Browse the repository at this point in the history
  • Loading branch information
rafapereirabr committed Jan 7, 2025
1 parent 9257eb2 commit c145541
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 343 deletions.
29 changes: 20 additions & 9 deletions R/geocode_rafa.R
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ geocode_rafa <- function(addresses_table,
old = c('logradouro', 'bairro'),
new = c('logradouro_sem_numero', 'localidade'))

### temporary
input_padrao[, numero := as.numeric(numero)]

# downloading cnefe. we only need to download the states present in the
# addresses table, which may save us some time.
input_states <- unique(input_padrao$estado)
Expand All @@ -106,7 +109,8 @@ geocode_rafa <- function(addresses_table,
con <- create_geocodebr_db(n_cores = n_cores)

# Convert input data frame to DuckDB table
duckdb::dbWriteTable(con, "input_padrao_db", input_padrao, temporary = TRUE)
duckdb::dbWriteTable(con, "input_padrao_db", input_padrao,
overwrite = TRUE, temporary = TRUE)


# register cnefe data to db, but only include states and municipalities
Expand All @@ -121,15 +125,23 @@ geocode_rafa <- function(addresses_table,
filtered_cnefe <- arrow::open_dataset(get_cache_dir()) |>
dplyr::filter(estado %in% input_states) |>
dplyr::filter(municipio %in% input_municipio) |>
dplyr::mutate(numero = dplyr::if_else(numero=='S/N', NA, numero)) |>
dplyr::mutate(numero = as.numeric(numero)) |>
dplyr::compute()

# 6666
# this is necessary to use match with weighted cases
duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe)
#filtered_cnefe <- dplyr::collect(filtered_cnefe)
#duckdb::dbWriteTable(con, "filtered_cnefe", filtered_cnefe,
# temporary = TRUE, overwrite = TRUE)

# #filtered_cnefe <- dplyr::collect(filtered_cnefe)
# #duckdb::dbWriteTable(con, "filtered_cnefe", filtered_cnefe,
# # temporary = TRUE, overwrite = TRUE)
#
#
# DBI::dbExecute(
# con,
# glue::glue("UPDATE filtered_cnefe SET numero = TRY_CAST(numero AS INTEGER);
# ALTER TABLE filtered_cnefe ALTER COLUMN numero TYPE INTEGER USING TRY_CAST(numero AS INTEGER);")
# )


# START DETERMINISTIC MATCHING -----------------------------------------------
Expand Down Expand Up @@ -159,8 +171,7 @@ geocode_rafa <- function(addresses_table,
}


# (case in c(1:4, 44, 5:12))
for (case in c(1:4, 5:12)) {
for (case in c(1:4, 44, 5:12)) {

relevant_cols <- get_relevant_cols_rafa(case)
formatted_case <- formatC(case, width = 2, flag = "0")
Expand All @@ -171,7 +182,7 @@ geocode_rafa <- function(addresses_table,
if (all(relevant_cols %in% names(input_padrao))) {

# select match function
match_fun <- ifelse(case != 44, match_aggregated_cases, match_aggregated_cases_weighted)
match_fun <- ifelse(case != 44, match_cases, match_weighted_cases)

n_rows_affected <- match_fun(
con,
Expand All @@ -193,7 +204,7 @@ geocode_rafa <- function(addresses_table,
# THIS could BE IMPROVED / optimized

# list all table outputs
all_possible_tables <- glue::glue("output_caso_{formatC(1:12, width = 2, flag = '0')}")
all_possible_tables <- glue::glue("output_caso_{formatC(c(1:12,44), width = 2, flag = '0')}")

# check which tables have been created
output_tables <- lapply(
Expand Down
24 changes: 12 additions & 12 deletions R/geocode_rafa_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ geocode_rafa_arrow <- function(addresses_table,
con <- create_geocodebr_db(n_cores = n_cores)

# Convert input data frame to DuckDB table
duckdb::dbWriteTable(con, "input_padrao_db", input_padrao, temporary = TRUE)
duckdb::dbWriteTable(con, "input_padrao_db", input_padrao,
overwrite = TRUE, temporary = TRUE)



Expand All @@ -136,13 +137,6 @@ geocode_rafa_arrow <- function(addresses_table,
# - case 11: match municipio, localidade
# - case 12: match municipio

if (progress) {
prog <- create_progress_bar(input_padrao)
n_rows_affected <- 0

message_looking_for_matches()
}

# determine geographical scope of the search
input_states <- unique(input_padrao$estado)
input_municipio <- unique(input_padrao$municipio)
Expand All @@ -151,8 +145,14 @@ geocode_rafa_arrow <- function(addresses_table,
if(is.null(input_municipio)){ input_municipio <- "*"}


# (case in c(1:4, 44, 5:12))
for (case in c(1:4, 5:12)) {
if (progress) {
prog <- create_progress_bar(input_padrao)
n_rows_affected <- 0

message_looking_for_matches()
}

for (case in c(1:4, 44, 5:12)) {

relevant_cols <- get_relevant_cols_rafa(case)
formatted_case <- formatC(case, width = 2, flag = "0")
Expand All @@ -163,7 +163,7 @@ geocode_rafa_arrow <- function(addresses_table,
if (all(relevant_cols %in% names(input_padrao))) {

# select match function
match_fun <- ifelse(case != 44, match_aggregated_cases_arrow, match_aggregated_cases_weighted)
match_fun <- ifelse(case != 44, match_cases_arrow, match_weighted_cases_arrow)

n_rows_affected <- match_fun(
con,
Expand All @@ -186,7 +186,7 @@ geocode_rafa_arrow <- function(addresses_table,
# THIS could BE IMPROVED / optimized

# list all table outputs
all_possible_tables <- glue::glue("output_caso_{formatC(1:12, width = 2, flag = '0')}")
all_possible_tables <- glue::glue("output_caso_{formatC(c(1:12,44), width = 2, flag = '0')}")

# check which tables have been created
output_tables <- lapply(
Expand Down
93 changes: 0 additions & 93 deletions R/match_aggregated_cases_weighted.R

This file was deleted.

47 changes: 19 additions & 28 deletions R/match_aggregated_cases.R → R/match_cases.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#' @return Writes the result of the left join as a new table in con
#'
#' @keywords internal
match_aggregated_cases <- function(con, x, y, output_tb, key_cols, match_type){
match_cases <- function(con, x, y, output_tb, key_cols, match_type){

# table table - 7.993404
# view table--- 7.047993
Expand All @@ -26,12 +26,12 @@ match_aggregated_cases <- function(con, x, y, output_tb, key_cols, match_type){
"CREATE OR REPLACE TEMPORARY VIEW pre_aggregated_cnefe AS
SELECT {cols_select} AVG(lon) AS lon, AVG(lat) AS lat
FROM {y}
WHERE {y}.numero != 'S/N'
WHERE {y}.numero IS NOT NULL
GROUP BY {cols_group};"
)

if (match_type %in% 5:12) {
query_aggregate <- gsub("WHERE filtered_cnefe.numero != 'S/N'", "", query_aggregate)
query_aggregate <- gsub("WHERE filtered_cnefe.numero IS NOT NULL", "", query_aggregate)
}

DBI::dbExecute(con, query_aggregate)
Expand All @@ -55,11 +55,11 @@ match_aggregated_cases <- function(con, x, y, output_tb, key_cols, match_type){
FROM {x}
LEFT JOIN pre_aggregated_cnefe
ON {join_condition}
WHERE {x}.numero != 'S/N' AND pre_aggregated_cnefe.lon IS NOT NULL;"
WHERE {x}.numero IS NOT NULL AND pre_aggregated_cnefe.lon IS NOT NULL;"
)

if (match_type %in% 5:12) {
query_match <- gsub("input_padrao_db.numero != 'S/N' AND", "", query_match)
query_match <- gsub("input_padrao_db.numero IS NOT NULL AND", "", query_match)
}

temp_n <- DBI::dbExecute(con, query_match)
Expand Down Expand Up @@ -96,20 +96,23 @@ match_aggregated_cases <- function(con, x, y, output_tb, key_cols, match_type){
#' @param output_tb Name of the new table to be written in con
#' @param key_cols Vector. Vector with the names of columns to perform left join
#' @param match_type Integer. An integer
#' @param input_states Vector. Passed from above
#' @param input_municipio Vector. Passed from above
#'
#' @return Writes the result of the left join as a new table in con
#'
#' @keywords internal
match_aggregated_cases_arrow <- function(con,
x,
y,
output_tb,
key_cols,
match_type,
input_states,
input_municipio
){

match_cases_arrow <- function(con,
x,
y,
output_tb,
key_cols,
match_type,
input_states,
input_municipio
){

# read correspondind parquet file
table_name <- paste(key_cols, collapse = "_")
table_name <- gsub('estado_municipio_logradouro_sem_numero', 'logradouro', table_name)
y <- table_name
Expand All @@ -119,7 +122,6 @@ match_aggregated_cases_arrow <- function(con,
# present in the input table, reducing the search scope and consequently
# reducing processing time and memory usage


# Load CNEFE data and write to DuckDB
filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |>
dplyr::filter(estado %in% input_states) |>
Expand All @@ -139,10 +141,6 @@ match_aggregated_cases_arrow <- function(con,
# pulo do gato aqui 6666666666666
# join_condition <- gsub("= input_padrao_db.logradouro_sem_numero", "LIKE '%' || input_padrao_db.logradouro_sem_numero || '%'", join_condition)

# Build the dynamic select and group statement
cols_select <- paste0(paste(key_cols, collapse = ", "),",")
cols_group <- paste(key_cols, collapse = ", ")


# query for left join
query_match <- glue::glue(
Expand All @@ -159,13 +157,7 @@ match_aggregated_cases_arrow <- function(con,
}

temp_n <- DBI::dbExecute(con, query_match)

# # add match_type column to output
# add_precision_col(
# con,
# update_tb = output_tb,
# match_type = match_type
# )
duckdb::duckdb_unregister_arrow(con, "filtered_cnefe")

# UPDATE input_padrao_db: Remove observations found in previous step
update_input_db(
Expand All @@ -174,7 +166,6 @@ match_aggregated_cases_arrow <- function(con,
reference_tb = output_tb
)

duckdb::duckdb_unregister_arrow(con, "filtered_cnefe")

return(temp_n)
}
Loading

0 comments on commit c145541

Please sign in to comment.