-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path2_process.R
198 lines (174 loc) · 9.87 KB
/
2_process.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
source('2_process/src/process_helpers.R')
source('2_process/src/summarize_sediment_byOutlet.R')
p2_process <- list(
##### Process classified tifs from HydroShare #####
# Convert classified rasters to binary - 0 for not a sediment class, 1 for sediment class
tar_target(p2_sedpresence_terraqs, {
classified_raster_to_sed_presence(
in_file = p1_hs_sedclass_tif_info$tif_fn,
out_file = sprintf('2_process/tmp/%s', gsub('.tif', '.qs', basename(p1_hs_sedclass_tif_info$tif_fn))))
},
pattern = map(p1_hs_sedclass_tif_info),
format = 'file'),
# PER MISSION-GROUP, sum each of the raster files cell values of binary
# sediment presence to create a heatmap. Note the full mission files were
# two big to do this, so need to batch and then recombine at the end.
# To take advantage of tarchetypes batching, had to first separate the
# Sentinel/Landsat targets. Since there are only two, not a big deal to manual split :)
tar_target(p2_terraqs_grp, tibble(terraqs_fn = p2_sedpresence_terraqs) %>%
mutate(terraqs_fn_hash = tools::md5sum(terraqs_fn), # Use hash to rebuild downstream targets if the files change
mission = ifelse(grepl('Sentinel', terraqs_fn),
yes = 'Sentinel', no = 'Landsat'))),
tarchetypes::tar_group_count(p2_terraqs_grp_ct_sentinel,
filter(p2_terraqs_grp, mission == "Sentinel"),
count = 50),
tarchetypes::tar_group_count(p2_terraqs_grp_ct_landsat,
filter(p2_terraqs_grp, mission == "Landsat"),
count = 50),
# Now map over Sentinel batches
tar_target(p2_sediment_heatmap_sentinel_batch_terraqs,
sum_sed_presence(p2_terraqs_grp_ct_sentinel$terraqs_fn,
sprintf('2_process/tmp/sediment_heatmap_%s_grp%02d.qs',
unique(p2_terraqs_grp_ct_sentinel$mission),
unique(p2_terraqs_grp_ct_sentinel$tar_group))),
pattern = map(p2_terraqs_grp_ct_sentinel),
format='file'),
# Repeat the same command but combine the group heatmaps into a single Sentinel one
tar_target(p2_sediment_heatmap_sentinel_terraqs,
sum_sed_presence(p2_sediment_heatmap_sentinel_batch_terraqs,
unique(gsub('tmp', 'out', gsub(
'_grp([0-9]+)', '',
p2_sediment_heatmap_sentinel_batch_terraqs)))),
format='file'),
# Now map over Landsat batches
tar_target(p2_sediment_heatmap_landsat_batch_terraqs,
sum_sed_presence(p2_terraqs_grp_ct_landsat$terraqs_fn,
sprintf('2_process/tmp/sediment_heatmap_%s_grp%02d.qs',
unique(p2_terraqs_grp_ct_landsat$mission),
unique(p2_terraqs_grp_ct_landsat$tar_group))),
pattern = map(p2_terraqs_grp_ct_landsat),
format='file'),
# Repeat the same command but combine the group heatmaps into a single Landsat one
tar_target(p2_sediment_heatmap_landsat_terraqs,
sum_sed_presence(p2_sediment_heatmap_landsat_batch_terraqs,
unique(gsub('tmp', 'out', gsub(
'_grp([0-9]+)', '',
p2_sediment_heatmap_landsat_batch_terraqs)))),
format='file'),
# Summarize the sediment presence as a time series per outlet polygon)
tar_target(p2_sediment_crs, terra::crs(load_terraqs(p2_sedpresence_terraqs[1]))),
tar_target(p2_outlet_list_sf,
create_bbox_sf(p1_river_outlet_bbox_tbl, crs=p2_sediment_crs),
pattern = map(p1_river_outlet_bbox_tbl),
iteration = 'list'),
tar_target(p2_sediment_presence_summary_byOutlet,
summarize_sed_pct_byOutlet(p2_sedpresence_terraqs, p2_outlet_list_sf),
pattern = p2_sedpresence_terraqs),
tar_target(p2_sediment_presence_summary_byOutlet_ready,
p2_sediment_presence_summary_byOutlet %>%
mutate(date= as.Date(date),
year = year(date),
year_mission = sprintf('%s_%s', year, mission),
season = ordered(month_to_season(month(date)),
levels = c('Winter', 'Spring', 'Summer', 'Fall')))),
##### Load and process observed blooms spreadsheet #####
tar_target(p2_obs_blooms_details, clean_bloom_history(p1_obs_blooms_xlsx)),
tar_target(p2_obs_blooms_sf, {
p2_obs_blooms_details %>%
select(Year, `Start Date`, `End Date`, Latitude, Longitude, Verified_cyanos) %>%
st_as_sf(coords = c('Longitude', 'Latitude'), crs=4326) %>%
# Remove observations outside of our AOI
st_crop(p1_lake_superior_box_sf) %>%
# Transform to match other sf object projections
st_transform(crs = st_crs(p1_lake_superior_watershed_sf))
}),
##### Read PRISM files and load into tibbles #####
# TODO: some grid cells return NAs because the centroid is over
# the water (and I assume there is some sort of water masking?)
# Also, need to see grid cell size compared to PRISM resolution
# because some seem like they are duplicates.
tar_target(p2_lake_superior_watershed_filt, {
# Transform Lake Superior grid shape before using in filter
p1_lake_superior_box_sf_transf <- p1_lake_superior_box_sf %>%
st_transform(crs = st_crs(p1_lake_superior_watershed_sf))
# Filter to only subwatersheds within 5 miles of the AOI bbox
p1_lake_superior_watershed_sf %>%
st_filter(p1_lake_superior_box_sf_transf,
.predicate = st_is_within_distance,
dist = 1609*5)
}),
# Collapse subwatersheds into a single shape
tar_target(p2_lake_superior_watershed_dissolved,
p2_lake_superior_watershed_filt %>%
st_union() %>% st_as_sf()),
# Create a grid of 10km cells across the AOI watersheds (PRISM data come in
# 4 km but that resolution might be too fine to process for now)
tar_target(p2_lake_superior_watershed_grid_all,
p2_lake_superior_watershed_dissolved %>%
# Cellsize is in meters because of the projection we are in
st_make_grid(cellsize=10000) %>%
st_as_sf()),
# Subset to grid cells that intersect the HUC watersheds and calculate
# the fraction of cell that overlaps the watershed polygon. That fraction
# will be used to calculate contributing precip later.
tar_target(p2_lake_superior_watershed_grid_sf, {
huc10_sf_transf <- st_transform(p1_huc10_nwis_sites, crs=st_crs(p1_lake_superior_watershed_sf))
grids_over_hucs <- p2_lake_superior_watershed_grid_all %>%
st_filter(huc10_sf_transf, .predicate = st_intersects)
# For each HUC, identify the cells that intersect with them
# and create grid cell sf with the fractions and HUC as a column.
grids_over_hucs_info <- huc10_sf_transf %>%
split(.$huc10) %>%
purrr::map(~{
grid_cells <- grids_over_hucs %>% st_filter(.x, .predicate = st_intersects)
grid_cells_adjs <- st_intersection(grid_cells, .x)
grid_cell_frac <- units::drop_units(st_area(grid_cells_adjs)/st_area(grid_cells))
grid_cells %>% mutate(huc = .x$huc10, huc_frac = grid_cell_frac)
}) %>% bind_rows() %>% rename(geometry = x)
return(grids_over_hucs_info)
}),
# Convert cell polygons to cell centroids then filter to keep only those
# with centroids that intersect the watershed shape
tar_target(p2_lake_superior_watershed_grid_centers_sf,
# Get the center of each cell and filter
p2_lake_superior_watershed_grid_sf %>%
st_centroid()),
# Convert cell centroids to CRS=4326 so that we can extract the matching
# PRISM data for each cell.
tar_target(p2_lake_superior_watershed_grid_centers_tbl,
p2_lake_superior_watershed_grid_centers_sf %>%
st_transform(crs=4326) %>%
st_coordinates() %>%
as_tibble() %>%
setNames(c('longitude', 'latitude')) %>%
mutate(cell_no = row_number()) %>%
# Add corresponding huc and huc_frac columns
bind_cols(st_drop_geometry(p2_lake_superior_watershed_grid_centers_sf))),
# Get the prism data from the files for each of the lat/longs, variables, and
# dates. Note that this is a lengthy step because of using the `cross` pattern.
# Over 3 hrs with 20 works, 2 variables, 20 date batches, and 37 grid cells
tar_target(p2_prism_data, {
# Make this target dependent on the prism files so that
# it will build if they change.
p1_prism_files
get_prism_data_at_huc_centers(
huc_latlong_table = p2_lake_superior_watershed_grid_centers_tbl,
prism_var = p1_prism_vars,
prism_dates = p1_prism_download_batches$date,
prism_dir = p1_prism_dir)
},
pattern = cross(p2_lake_superior_watershed_grid_centers_tbl, p1_prism_vars,
p1_prism_download_batches),
# Define this target as one that can be done in parallel when `tar_make_clustermq(workers = X)`
# is called. The default for other targets in this pipeline is NOT to parallelize.
# Note that on Lindsay's computer, 20 workers seems to be the best option.
deployment = "worker",
# Management parallel computing worker data storage to speed up performance
storage = "worker", retrieval = "worker"),
# Summarize the data per HUC
tar_target(p2_prism_data_huc, summarize_meteo_data_by_huc(p2_prism_data))
# If you downloaded the CSV file of all pre-processed PRISM data, uncomment
# this target and comment out the one above instead. Make sure you already
# placed the file in `2_process/in/` before building the pipeline.
# tar_target(p2_prism_data_huc, read_csv('2_process/in/prism_data_huc.csv'))
)