Skip to content

Commit

Permalink
mfdb_import_taxonomy: Use bulk_copy to speed up import #67
Browse files Browse the repository at this point in the history
Bulk copy the data in first, then do all the work in SQL instead of R.
  • Loading branch information
lentinj committed Mar 10, 2021
1 parent f9f9bdb commit c20ab7e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 148 deletions.
2 changes: 2 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Use bulk_copy to speed up large taxonomy imports

Schema changes:
* Remove taxonomy name restrictions

Expand Down
73 changes: 35 additions & 38 deletions R/mfdb_import_taxonomy.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,43 @@ mfdb_import_taxonomy <- function (mdb, table_name, data_in, extra_cols = c('desc
}
}

# Fetch all existing ids, quit if all are there
existing <- mfdb_fetch(mdb,
"SELECT ", id_col, ", name, ", paste(extra_cols, collapse = ", "),
" FROM ", table_name,
" ORDER BY 1")

# Throw away rows which don't need updating
if (nrow(existing) > 0) {
# NB: This won't work if an extra_col is float (say latitude), but
# detecting this will probably use up just as much time as updating.
data_in <- data_in[!(data_in$name %in% merge(
existing[, c('name', extra_cols)],
data_in[, c('name', extra_cols)])$name), ]
if (nrow(data_in) == 0) {
mdb$logger$info(paste0("Taxonomy ", table_name ," up-to-date"))
return()
}
}
mdb$logger$info(paste0("Taxonomy ", table_name ," needs updating"))

mfdb_transaction(mdb, {
# New rows should be inserted
new_data <- data_in[data_in$name %in% setdiff(data_in$name, existing$name), ]

# If some ids appear in both new and existing, bump entire range up so we don't intersect
overlapping_ids <- intersect(new_data[[id_col]], existing[[id_col]])
mdb$logger$debug(paste0("new_data & existing overlap by ", length(overlapping_ids), " entries"))
if (length(overlapping_ids) > 0) {
mdb$logger$debug(paste0("upping new IDs by ", max(existing[[id_col]])))
new_data[[id_col]] <- max(existing[[id_col]]) + new_data[[id_col]]
mfdb_transaction(mdb, mfdb_bulk_copy(mdb, table_name, data_in, function (temp_tbl) {
# Remove rows where nothing changed, if we remove all of them, exit.
# NB: This won't work if an extra_col is float (e.g. latitude), but should only be an optimisation
matching_rows <- mfdb_send(mdb, "DELETE FROM ", temp_tbl, " WHERE ", id_col, " IN (",
"SELECT tmp.", id_col,
" FROM ", temp_tbl, " AS tmp",
" JOIN ", table_name, " AS cur ON cur.name = tmp.name",
" WHERE ", paste0("cur.", extra_cols, " = tmp.", extra_cols, collapse = " AND "),
")", result = "rowcount")
if (matching_rows >= nrow(data_in)) return(NULL)

# Update all rows where names match, remove
mfdb_send(mdb, "UPDATE ", table_name, " AS cur",
" SET ", paste0(extra_cols, " = tmp.", extra_cols, collapse = ","),
" FROM ", temp_tbl, " AS tmp",
" WHERE cur.name = tmp.name")
mfdb_send(mdb, "DELETE FROM ", temp_tbl, " AS tmp WHERE tmp.name IN (SELECT name FROM ", table_name, ")")

# Renumber remaining entries if there's an overlap
has_overlap <- mfdb_fetch(mdb, "SELECT EXISTS(",
"SELECT 1 FROM ", table_name, " cur, ", temp_tbl, " tmp WHERE cur.", id_col, " = tmp.", id_col,
")")[1,1]
if (has_overlap) {
max_id <- mfdb_fetch(mdb, "SELECT MAX(", id_col, ") FROM ", table_name)[1,1]
mfdb_send(mdb,
"UPDATE ", temp_tbl,
" SET ", id_col, " = ", id_col, " + ", max_id)
}

mfdb_insert(mdb, table_name, new_data)

# Rows with matching names should be updated, but existing ids kept
if (nrow(existing) > 0) mfdb_update(mdb,
table_name,
merge(existing[, c(id_col, 'name')], data_in[, c('name', extra_cols)]),
where = c())
})
# Insert remaining rows into table
mfdb_send(mdb,
"INSERT INTO ", table_name,
" (", paste(c(id_col, 'name', extra_cols), collapse=","), ")",
" SELECT ", paste(c(id_col, 'name', extra_cols), collapse=","),
" FROM ", temp_tbl,
NULL)
}))

invisible(NULL)
}
Expand Down
110 changes: 0 additions & 110 deletions tests/test-mfdb_import.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,116 +14,6 @@ cap <- function(code) {
}), value = TRUE, invert = TRUE)
}

ok_group("mfdb_import_taxonomy", {
# Pretend table is empty
mdb$ret_rows <- data.frame()

ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
name = c(),
description = c(),
stringsAsFactors = TRUE))), c(
"Return Value:",
" NULL",
NULL)), "No input data, nothing to do")

ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 1:3,
name = c('RES', 'FEZ', 'DES'),
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"INSERT INTO gear (gear_id,name,t_group,description) VALUES (1,'RES',NULL,'Research'),(2,'FEZ',NULL,'Tommy Cooper'),(3,'DES',NULL,'Encryption')",
"Return Value:",
" NULL",
NULL)), "Insert 3 rows into empty table")

mdb$ret_rows <- data.frame(
gear_id = 1:3,
name = c('RES', 'FEZ', 'DES'),
t_group = NA,
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = FALSE)
ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 1:3,
name = c('RES', 'FEZ', 'DES'),
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"Return Value:",
" NULL",
NULL)), "Identical, nothing happens")

mdb$ret_rows <- data.frame(
gear_id = 11:13,
name = c('RES', 'FEZ', 'DES'),
t_group = NA,
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = FALSE)
ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 1:3,
name = c('RES', 'FEZ', 'DES'),
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"Return Value:",
" NULL",
NULL)), "Differing IDs don't phase us")

mdb$ret_rows <- data.frame(
gear_id = 1:2,
name = c('RES', 'FEZ'),
t_group = NA,
description = c('Research', 'Hat'),
stringsAsFactors = FALSE)
ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 1:3,
name = c('RES', 'FEZ', 'DES'),
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"INSERT INTO gear (gear_id,name,t_group,description) VALUES (3,'DES',NULL,'Encryption')",
"UPDATE gear SET name='FEZ',t_group=NULL,description='Tommy Cooper' WHERE gear_id = 2",
"Return Value:",
" NULL",
NULL)), "Update 1 row, insert a new row")

mdb$ret_rows <- data.frame(
gear_id = 1:2,
name = c('RES', 'FEZ'),
t_group = NA,
description = c('Research', 'Hat'),
stringsAsFactors = FALSE)
ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 4:6,
name = c('RES', 'FEZ', 'DES'),
description = c('Research', 'Tommy Cooper', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"INSERT INTO gear (gear_id,name,t_group,description) VALUES (6,'DES',NULL,'Encryption')",
"UPDATE gear SET name='FEZ',t_group=NULL,description='Tommy Cooper' WHERE gear_id = 2",
"Return Value:",
" NULL",
NULL)), "Differing IDs only affect adding new rows")

mdb$ret_rows <- data.frame(
gear_id = 1:2,
name = c('RES', 'FEZ'),
t_group = c('RES', 'RES'),
description = c('Research', 'Hat'),
stringsAsFactors = FALSE)
ok(cmp(cap(mfdb:::mfdb_import_taxonomy(mdb, 'gear', data.frame(
id = 4:6,
name = c('RES', 'FEZ', 'DES'),
t_group = c('RES', 'RES', 'RES'),
description = c('Research', 'Hat', 'Encryption'),
stringsAsFactors = TRUE))), c(
"SELECT gear_id, name, t_group, description FROM gear ORDER BY 1",
"INSERT INTO gear (gear_id,name,t_group,description) VALUES (6,'DES','RES','Encryption')",
"Return Value:",
" NULL",
NULL)), "Differing IDs only affect adding new rows")
})

ok_group("sanitise_col", local({
# Case insensitive matching of column name (but no partial matching)
ok(cmp(sanitise_col(mdb, data.frame(aaaa=1,b=2,aa=3), 'aaaa'), 1), "Picked correct column (aaaa)")
Expand Down

0 comments on commit c20ab7e

Please sign in to comment.