-
Notifications
You must be signed in to change notification settings - Fork 7
/
README.Rmd
223 lines (160 loc) · 7.53 KB
/
README.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
---
output: github_document
---
# arkdb <img src="man/figures/logo.svg" align="right" alt="" width="120" />
[![R build status](https://github.com/ropensci/arkdb/workflows/R-CMD-check/badge.svg)](https://github.com/ropensci/arkdb/actions)
[![Travis build status](https://app.travis-ci.com/ropensci/arkdb.svg?branch=master)](https://app.travis-ci.com/ropensci/arkdb)
[![Coverage status](https://codecov.io/gh/ropensci/arkdb/branch/master/graph/badge.svg)](https://app.codecov.io/github/ropensci/arkdb?branch=master)
[![CRAN_Status_Badge](http://www.r-pkg.org/badges/version/arkdb)](https://cran.r-project.org/package=arkdb)
[![](https://badges.ropensci.org/224_status.svg)](https://github.com/ropensci/software-review/issues/224)
[![lifecycle](https://img.shields.io/badge/lifecycle-stable-brightgreen.svg)](https://lifecycle.r-lib.org/articles/stages.html)
[![CRAN RStudio mirror downloads](http://cranlogs.r-pkg.org/badges/grand-total/arkdb)](https://CRAN.R-project.org/package=arkdb)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1343943.svg)](https://doi.org/10.5281/zenodo.1343943)
<!-- badges: end -->
<!-- README.md is generated from README.Rmd. Please edit that file -->
```{r, echo = FALSE}
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
fig.path = "README-"
)
```
The goal of `arkdb` is to provide a convenient way to move data from large compressed text files (tsv, csv, etc) into any DBI-compliant database connection (e.g. MYSQL, Postgres, SQLite; see [DBI](https://solutions.rstudio.com/db/r-packages/DBI/)), and move tables out of such databases into text files. The key feature of `arkdb` is that files are moved between databases and text files in chunks of a fixed size, allowing the package functions to work with tables that would be much too large to read into memory all at once. There is also functionality for filtering and applying transformation to data as it is extracted from the database.
The `arkdb` package is easily extended to use custom read and write methods allowing you to dictate your own output formats. See `R/streamable_table.R` for examples that include using:
- Base c/tsv
- Apache arrow's parquet
- The `readr` package for c/tsv
## Links
- A more detailed introduction to package design and use can be found in the package [Vignette](https://docs.ropensci.org/arkdb/articles/arkdb.html)
- [Online versions of package documentation](https://docs.ropensci.org/arkdb/)
## Installation
You can install arkdb from GitHub with:
```{r gh-installation, eval = FALSE}
# install.packages("devtools")
devtools::install_github("cboettig/arkdb")
```
# Basic use
```{r message = FALSE}
library(arkdb)
# additional libraries just for this demo
library(dbplyr)
library(dplyr)
library(fs)
```
## Creating an archive of a database
Consider the `nycflights` database in SQLite:
```{r example}
tmp <- tempdir() # Or can be your working directory, "."
db <- dbplyr::nycflights13_sqlite(tmp)
```
Create an archive of the database:
```{r}
dir <- fs::dir_create(fs::path(tmp, "nycflights"))
ark(db, dir, lines = 50000)
```
## Unarchive
Import a list of compressed tabular files (i.e. `*.csv.bz2`) into a local SQLite database:
```{r}
files <- fs::dir_ls(dir)
new_db <- DBI::dbConnect(RSQLite::SQLite(), fs::path(tmp, "local.sqlite"))
unark(files, new_db, lines = 50000)
```
```{r include=FALSE}
disconnect <- function(db){
## Cleanup
if(inherits(db, "DBIConnection")){
DBI::dbDisconnect(db)
} else {
DBI::dbDisconnect(db$con)
}
}
DBI::dbDisconnect(db)
DBI::dbDisconnect(new_db)
codemeta::write_codemeta()
```
## Using filters
This package can also be used to generate slices of data that are required for analytical or operational purposes. In the example below we archive to disk only the flight data that occurred in the month of December. It is recommended to use filters on a single table at a time.
```{r, eval=FALSE}
ark(db, dir, lines = 50000, tables = "flights", filter_statement = "WHERE month = 12")
```
## Using callbacks
It is possible to use a callback to perform just-in-time data transformations before ark writes your data object to disk in your preferred format. In the example below, we write a simple transformation to convert the flights data `arr_delay` field, from minutes, to hours. It is recommended to use callbacks on a single table at a time. A callback function can be anything you can imagine so long as it returns a data.frame that can be written to disk.
```{r, eval=FALSE}
mins_to_hours <- function(data) {
data$arr_delay <- data$arr_delay/60
data
}
ark(db, dir, lines = 50000, tables = "flights", callback = mins_to_hours)
```
## ark() in parallel
There are two strategies for using `ark` in parallel. One is to loop over the tables, re-using the ark function per table in parallel. The other, introduced in 0.0.15, is to use the "window-parallel" method which loops over chunks of your table. This is particularly useful if your tables are very large and can speed up the process significantly.
Note: `window-parallel` currently only works in conjunction with `streamable_parquet`
```{r, eval = FALSE}
# Strategy 1: Parallel over tables
library(arkdb)
library(future.apply)
plan(multisession)
# Any streamable_table method is acceptable
future_lapply(vector_of_tables, function(x) ark(db, dir, lines, tables = x))
# Strategy 2: Parallel over chunks of a table
library(arkdb)
library(future.apply)
plan(multisession)
ark(
db,
dir,
streamable_table = streamable_parquet(), # required for window-parallel
lines = 50000,
tables = "flights",
method = "window-parallel"
)
# Strategy 3: Parallel over tables and chunks of tables
library(arkdb)
library(future.apply)
# 16 core machine for example
plan(list(tweak(multisession, n = 4), tweak(multisession, n = 4)))
# 4 tables at a time, 4 threads per table
future_lapply(vector_of_tables, function(x) {
ark(
db,
dir,
streamable_table = streamable_parquet(), # required for window-parallel
lines = 50000,
tables = x,
method = "window-parallel")
}
)
```
## ETLs with arkdb
The `arkdb` package can also be used to create a number of ETL pipelines involving text archives or databases given its ability to filter, and use callbacks. In the example below, we leverage `duckdb` to read a fictional folder of files by US state, filter by `var_filtered`, apply a callback transformation `transform_fun` to `var_transformed` save as parquet, and then load a folder of parquet files for analysis with Apache Arrow.
```{r, eval = FALSE}
library(arrow)
library(duckdb)
db <- dbConnect(duckdb::duckdb())
transform_fun <- function(data) {
data$var_transformed <- sqrt(data$var_transformed)
data
}
for(state in c("DC", state.abb)) {
path <- paste0("path/to/archives/", state, ".gz")
ark(
db,
dir = paste0("output/", state),
streamable_table = streamable_parquet(), # parquet files of nline rows
lines = 100000,
# See: https://duckdb.org/docs/data/csv
tables = sprintf("read_csv_auto('%s')", path),
compress = "none", # Compression meaningless for parquet as it's already compressed
overwrite = T,
filenames = state, # Overload tablename
filter_statement = "WHERE var_filtered = 1",
callback = transform_fun
)
}
# The result is trivial to read in with arrow
ds <- open_dataset("output", partitioning = "state")
```
-----
Please note that this project is released with a [Contributor Code of Conduct](https://ropensci.org/code-of-conduct/).
By participating in this project you agree to abide by its terms.
[![ropensci_footer](https://ropensci.org/public_images/ropensci_footer.png)](https://ropensci.org)