## Dependencies
library(pacman)
::p_load(tidyverse, exactextractr, terra, glue, sfarrow, leaflet, furrr, sf, purrr, cli, tictoc, assertr, reactable)
pacman
## Local Context
= lst(
local_context ## Identifiers
model_id = "source__prism_raster_tif_albers",
model_version = 'v1',
model_instance = paste(model_id, model_version, collapse = '_'),
## Paths
path_raw_prism = "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data",
path_server_prism_geotiff = "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/CCUH processing/raw/raster_geotiff/",
path_encrypted = '//files.drexel.edu/encrypted/SOPH/UHC/CCUH/ccuh-encrypted',
path_cache_local = 'cache_sensitive/',
path_cache_raw = file.path(path_cache_local, 'PRISM_data'),
path_cache_result = file.path(path_cache_local, 'raster')
)
PRISM Raster Data for CCUH
Loading of Albers:NAD83 projected PRISM raster data as geotiff for resuse into DBT. Note that since this isn’t parquet it will just be for provenance docuemntation. The actual data will (as of now) cached locally for computational efficiency as well as on the encrypted server as a long term secure storage. The overall goal of this pipeline is to pre-process raw PRISM data for our zonal statistics calcualtion, this involves:
- converting raw PRISM data from
.bil
to a more performant format.tif
- standardizing the CRS to Albers USAL NAD83
Note we are being minimalist, so processing only what is required for manuscripts. We can update this accordingly so please see the limitations below as defined in local context object.
Note that this process could work faster if we do our computations locally, therefore we have a local cache folder which includes:
- PRISM_data: which contains local cache of raw PRISM data (.bil format)
- raster: which contains the products (ALbers reprojected GeoTIFF files)
Evaluate
All
Inventory Available Raw PRISM data from Leah Server
if (!file.exists("cache/df_raw_prism.parquet")){
= tibble(
df_raw_prism upstream_block = list.files(
$path_raw_prism,
local_contextpattern = '.bil$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = upstream_block %>% basename() %>% str_remove('.bil'),
measure = case_when(
str_detect(file_name, "ppt_") ~ "ppt",
str_detect(file_name, "tdmean_") ~ "tdmean",
str_detect(file_name, "tmax_") ~ "tmax",
str_detect(file_name, "tmean_") ~ "tmean",
str_detect(file_name, "tmin_") ~ "tmin",
str_detect(file_name, "vpdmax_") ~ "vpdmax",
str_detect(file_name, "vpdmin_") ~ "vpdmin",
TRUE ~ "Unknown"
),date_string = str_sub(file_name, -8),
date = ymd(date_string),
year = year(date),
month = month(date),
day = day(date)
%>%
) select(file_name, everything())
%>% arrow::write_parquet("cache/df_raw_prism.parquet")
df_raw_prism
}
= arrow::read_parquet("cache/df_raw_prism.parquet")
df_prism_raw
## Evaluate
%>%
df_prism_raw add_count(measure, name = 'n_days') %>%
group_by(measure) %>%
summarize(
n_years = unique(year) %>% sort() %>% length(),
year_range = min(year) %>% paste0(" - ", max(year)),
n_days = unique(n_days)
%>%
) arrange(desc(n_days), measure)
measure | n_years | year_range | n_days |
---|---|---|---|
ppt | 19 | 2005 - 2023 | 6940 |
tdmean | 19 | 2005 - 2023 | 6939 |
tmax | 19 | 2005 - 2023 | 6939 |
tmean | 19 | 2005 - 2023 | 6939 |
tmin | 19 | 2005 - 2023 | 6939 |
vpdmax | 19 | 2005 - 2023 | 6939 |
vpdmin | 19 | 2005 - 2023 | 6939 |
Processed
Before we start processing, lets take a look at what is already done processing in our cache.
Inventory server cached results
= tibble(
df_prism_geotiff_server downstream_cache = list.files(
$path_server_prism_geotiff ,
local_contextpattern = '.tif$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = downstream_cache %>% basename() %>% str_remove('.tif'),
downstream_cache_file_size = file.size(downstream_cache),
measure = case_when(
str_detect(file_name, "ppt_") ~ "ppt",
str_detect(file_name, "tdmean_") ~ "tdmean",
str_detect(file_name, "tmax_") ~ "tmax",
str_detect(file_name, "tmean_") ~ "tmean",
str_detect(file_name, "tmin_") ~ "tmin",
str_detect(file_name, "vpdmax_") ~ "vpdmax",
str_detect(file_name, "vpdmin_") ~ "vpdmin",
TRUE ~ "Unknown"
),date_string = str_sub(file_name, -8),
date = ymd(date_string),
year = year(date),
month = month(date),
day = day(date),
done = T
%>%
) select(file_name, everything())
Inventory local geotiff
## Remove invalid .tif rasters
list.files(
$path_cache_result,
local_contextpattern = '.tif$',
recursive = T,
full.names = T) %>%
walk(~{
if (file.size(.x) < 1000000) {
cli_alert_warning("Removing invalid .tif file: {.x}")
file.remove(.x)
}
})
## Inventory cached results
= tibble(
df_prism_geotiff_local downstream_cache = list.files(
$path_cache_result,
local_contextpattern = '.tif$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = downstream_cache %>% basename() %>% str_remove('.tif'),
downstream_cache_file_size = file.size(downstream_cache),
measure = case_when(
str_detect(file_name, "ppt_") ~ "ppt",
str_detect(file_name, "tdmean_") ~ "tdmean",
str_detect(file_name, "tmax_") ~ "tmax",
str_detect(file_name, "tmean_") ~ "tmean",
str_detect(file_name, "tmin_") ~ "tmin",
str_detect(file_name, "vpdmax_") ~ "vpdmax",
str_detect(file_name, "vpdmin_") ~ "vpdmin",
TRUE ~ "Unknown"
),date_string = str_sub(file_name, -8),
date = ymd(date_string),
year = year(date),
month = month(date),
day = day(date),
done = T
%>%
) select(file_name, everything())
Summarize local + server geo_tiff
= bind_rows(
df_prism_processed
df_prism_geotiff_local,
df_prism_geotiff_server
)
%>%
df_prism_processed add_count(measure, name = 'n_days') %>%
group_by(measure) %>%
summarize(
n_years = unique(year) %>% sort() %>% length(),
year_range = min(year) %>% paste0(" - ", max(year)),
n_days = unique(n_days)
%>%
) arrange(desc(n_days), measure)
measure | n_years | year_range | n_days |
---|---|---|---|
ppt | 19 | 2005 - 2023 | 7305 |
tdmean | 19 | 2005 - 2023 | 7305 |
tmax | 19 | 2005 - 2023 | 7305 |
tmean | 19 | 2005 - 2023 | 7305 |
tmin | 19 | 2005 - 2023 | 7305 |
vpdmax | 19 | 2005 - 2023 | 7305 |
vpdmin | 19 | 2005 - 2023 | 7305 |
Looks good.
QC: raster file size stable
#' Here we want to make sure that the raster files sizes don't change over time. Small bug noticed inprocessing.
## Get most previous log
= arrow::open_dataset("cache_sensitive/raster_log")
db = db %>%
df_log_previous distinct() %>%
collect() %>%
filter(log_index == max(log_index)) %>%
select(file_name,
prev_size = downstream_cache_file_size)
## Check difference in sizes
= df_prism_processed %>%
df_qc_tif_size select(file_name,
size = downstream_cache_file_size) %>%
left_join(df_log_previous, by = "file_name") %>%
mutate(same_size = size == prev_size)
= df_qc_tif_size %>%
df_invalid filter(!same_size)
if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")
Remaining
Finally lets return a list of files that still need processing - subseting for the limits we are interested in.
Files to process
= df_prism_raw %>%
df_prism_to_process left_join(df_prism_processed %>%
select(file_name, done) %>%
distinct()) %>%
mutate(
upstream_cache = upstream_block %>%
str_remove_all(local_context$path_raw_prism) %>%
paste0(
$path_cache_raw,
local_context
.),upstream_cache_status = file.exists(upstream_cache),
downstream_cache = upstream_block %>%
str_remove_all(local_context$path_raw_prism) %>%
paste0(local_context$path_cache_result, .) %>%
str_replace_all(".bil$", ".tif")
%>%
) filter(is.na(done))
%>%
df_prism_to_process add_count(measure, name = 'n_days') %>%
group_by(measure) %>%
summarize(
n_years = unique(year) %>% sort() %>% length(),
years = unique(year) %>% paste(collapse = ","),
n_days = unique(n_days)
%>%
) arrange(desc(n_days), measure)
measure | n_years | years | n_days |
---|
Process from .bil to .tif
Cache upstream files
Note that if you are trying to convert a large number of files, it is best to cache the files locally before processing. But for a few files this isn’t really worth it. But here lets set up a guardrail that if we are trying to process more than 10 it throws a warning that so you should download.
Code
if (nrow(df_prism_to_process) > 10){
cli_abort("You are trying to process more than 10 files, consider downloading the files locally")
}
Processing function
Now setup the function for conversion
Pre-process: Function
## test parameters
= df_prism_to_process %>%
row filter(measure == "tmean") %>%
slice(1)
## Function To convert .bil to .tif
= function(
crated_convert_prism_to_tif
file_name,
upstream_block,
upstream_cache,
downstream_cache){
## Stop if done
if ( file.exists(downstream_cache)) return("Already done")
## Import upstream raster
if (file.exists(upstream_cache)){
= terra::rast(upstream_cache)
prism_rast else {
} = terra::rast(upstream_block)
prism_rast
}
## Transform to Albers USA
= prism_rast %>% terra::project("EPSG:5070")
prism_rast_albers
## Export
= file.path('cache_sensitive','raster', file_name)
out_dir if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
= file.path(out_dir, paste0(file_name, '.tif'))
out_path
|> terra::writeRaster(filename=out_path, overwrite=TRUE)
prism_rast_albers
::cli_alert_success("Finished processing and export: {file_name}")
cli
}
<- possibly(
crated_convert_prism_to_tif__possibly
crated_convert_prism_to_tif, otherwise = NULL,
quiet = FALSE
)
Processing
Parallel runs: convert .bil to .tif
if ( between(nrow(df_prism_to_process) ,1, 10) ){
::pmap(
purrr%>%
df_prism_to_process select(file_name, upstream_block,
upstream_cache, downstream_cache),
crated_convert_prism_to_tif__possibly
)
}
if (nrow(df_prism_to_process) > 10){
# Configure parallel
plan(multisession, workers = 6)
<- furrr_options(globals = TRUE, seed = TRUE)
opts
## Run
tic()
%>%
df_prism_to_process filter(upstream_cache_status == TRUE) %>%
select(file_name, upstream_block,
%>%
upstream_cache, downstream_cache) ::future_pmap(
furrr.f = crated_convert_prism_to_tif__possibly,
.options = opts
)toc()
}
Sync to server
For now due to size limitations (4.75 TB) we can only upload a subset of the geotiff files. We chose to prioritize the 2020 data for the county10 ESRI validations and benchmarks.
Get 2020 geotiff files
## Inventory 2020 geotiff files
= tibble(
df_2020_geotiff path = list.files(
$path_cache_result,
local_contextpattern = '.tif$',
recursive = T,
full.names = T) %>%
keep(~str_detect(.x, "_2020"))
%>%
) mutate(
file = basename(path)
%>%
) mutate(measure = str_extract(file, "ppt|tmean|tmax|tmin|tdmean|vpdmax|vpdmin"),
date = str_extract(file, "\\d{8}")) %>%
assert_rows(col_concat, is_uniq, measure, date)%>%
select(file, measure, date, path)
## Preview
%>%
df_2020_geotiff reactable(searchable = T, defaultPageSize = 10)
So we have 2,562 geotiff files. Lets now copy these into Leah’s folder at - //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/CCUH processing/raw/raster_geotiff/
Copy 2020 geotiff files to server
%>%
df_2020_geotiff group_by(row_number()) %>%
group_walk(~{
= .$file
file = .$path
path = file.path(local_context$path_server_prism_geotiff, file)
out_path if (!file.exists(out_path)) {
file.copy(path, out_path)
cli_alert_success("Copied to server: {file}")
else {
} cli_alert("Already exists on server: {file}")
} })