PRISM Raster Data for CCUH

source__prism_raster_tif_albers_v1
Authors

Ran Li (Maintainer)

Joe Simeone (Context Contributor)

Published

October 25, 2024

Loading of Albers:NAD83 projected PRISM raster data as geotiff for resuse into DBT. Note that since this isn’t parquet it will just be for provenance docuemntation. The actual data will (as of now) cached locally for computational efficiency as well as on the encrypted server as a long term secure storage. The overall goal of this pipeline is to pre-process raw PRISM data for our zonal statistics calcualtion, this involves:

Note we are being minimalist, so processing only what is required for manuscripts. We can update this accordingly so please see the limitations below as defined in local context object.

## Dependencies
library(pacman) 
pacman::p_load(tidyverse, exactextractr, terra, glue, sfarrow, leaflet, furrr, sf, purrr, cli, tictoc, assertr, reactable)


## Local Context
local_context = lst(
  ## Identifiers
  model_id = "source__prism_raster_tif_albers",
  model_version = 'v1',
  model_instance = paste(model_id, model_version, collapse = '_'),
  ## Paths
  path_raw_prism = "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data",
  path_server_prism_geotiff = "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/CCUH processing/raw/raster_geotiff/",
  path_encrypted = '//files.drexel.edu/encrypted/SOPH/UHC/CCUH/ccuh-encrypted',
  path_cache_local = 'cache_sensitive/',
  path_cache_raw = file.path(path_cache_local, 'PRISM_data'),
  path_cache_result = file.path(path_cache_local, 'raster')
)

Note that this process could work faster if we do our computations locally, therefore we have a local cache folder which includes:

Evaluate

All

Inventory Available Raw PRISM data from Leah Server
if (!file.exists("cache/df_raw_prism.parquet")){
  df_raw_prism = tibble(
    upstream_block = list.files(
      local_context$path_raw_prism, 
      pattern = '.bil$',
      recursive =  T,
      full.names = T)
  ) %>% 
    mutate(
      file_name =  upstream_block %>% basename() %>% str_remove('.bil'),
      measure = case_when(
        str_detect(file_name, "ppt_") ~ "ppt",
        str_detect(file_name, "tdmean_") ~ "tdmean",
        str_detect(file_name, "tmax_") ~ "tmax",
        str_detect(file_name, "tmean_") ~ "tmean",
        str_detect(file_name, "tmin_") ~ "tmin",
        str_detect(file_name, "vpdmax_") ~ "vpdmax",
        str_detect(file_name, "vpdmin_") ~ "vpdmin",
        TRUE ~ "Unknown"
      ),
      date_string = str_sub(file_name, -8),
      date = ymd(date_string),
      year = year(date),
      month = month(date),
      day = day(date)
    )  %>% 
    select(file_name, everything()) 
  df_raw_prism %>% arrow::write_parquet("cache/df_raw_prism.parquet")
}

df_prism_raw = arrow::read_parquet("cache/df_raw_prism.parquet") 
  

## Evaluate
df_prism_raw %>% 
  add_count(measure, name = 'n_days') %>% 
  group_by(measure) %>% 
  summarize(
    n_years = unique(year) %>% sort() %>% length(),
    year_range = min(year) %>% paste0(" - ", max(year)),
    n_days = unique(n_days)
  ) %>% 
  arrange(desc(n_days), measure) 
measure n_years year_range n_days
ppt 19 2005 - 2023 6940
tdmean 19 2005 - 2023 6939
tmax 19 2005 - 2023 6939
tmean 19 2005 - 2023 6939
tmin 19 2005 - 2023 6939
vpdmax 19 2005 - 2023 6939
vpdmin 19 2005 - 2023 6939

Processed

Before we start processing, lets take a look at what is already done processing in our cache.

Inventory server cached results
df_prism_geotiff_server = tibble(
  downstream_cache = list.files(
    local_context$path_server_prism_geotiff  , 
    pattern = '.tif$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  downstream_cache %>% basename() %>% str_remove('.tif'),
    downstream_cache_file_size = file.size(downstream_cache),
    measure = case_when(
        str_detect(file_name, "ppt_") ~ "ppt",
        str_detect(file_name, "tdmean_") ~ "tdmean",
        str_detect(file_name, "tmax_") ~ "tmax",
        str_detect(file_name, "tmean_") ~ "tmean",
        str_detect(file_name, "tmin_") ~ "tmin",
        str_detect(file_name, "vpdmax_") ~ "vpdmax",
        str_detect(file_name, "vpdmin_") ~ "vpdmin",
        TRUE ~ "Unknown"
      ),
      date_string = str_sub(file_name, -8),
      date = ymd(date_string),
      year = year(date),
      month = month(date),
      day = day(date),
    done = T
    ) %>% 
  select(file_name, everything()) 
Inventory local geotiff
## Remove invalid .tif rasters
list.files(
  local_context$path_cache_result, 
  pattern = '.tif$',
  recursive =  T,
  full.names = T) %>% 
  walk(~{
    if (file.size(.x) < 1000000) {
      cli_alert_warning("Removing invalid .tif file: {.x}")
      file.remove(.x)
    }
  })

## Inventory cached results
df_prism_geotiff_local = tibble(
  downstream_cache = list.files(
    local_context$path_cache_result, 
    pattern = '.tif$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  downstream_cache %>% basename() %>% str_remove('.tif'),
    downstream_cache_file_size = file.size(downstream_cache),
    measure = case_when(
        str_detect(file_name, "ppt_") ~ "ppt",
        str_detect(file_name, "tdmean_") ~ "tdmean",
        str_detect(file_name, "tmax_") ~ "tmax",
        str_detect(file_name, "tmean_") ~ "tmean",
        str_detect(file_name, "tmin_") ~ "tmin",
        str_detect(file_name, "vpdmax_") ~ "vpdmax",
        str_detect(file_name, "vpdmin_") ~ "vpdmin",
        TRUE ~ "Unknown"
      ),
      date_string = str_sub(file_name, -8),
      date = ymd(date_string),
      year = year(date),
      month = month(date),
      day = day(date),
    done = T
    ) %>% 
  select(file_name, everything()) 
Summarize local + server geo_tiff
df_prism_processed = bind_rows(
  df_prism_geotiff_local,
  df_prism_geotiff_server
)

df_prism_processed %>% 
  add_count(measure, name = 'n_days') %>% 
  group_by(measure) %>% 
  summarize(
    n_years = unique(year) %>% sort() %>% length(),
    year_range = min(year) %>% paste0(" - ", max(year)),
    n_days = unique(n_days)
  ) %>% 
  arrange(desc(n_days), measure) 
measure n_years year_range n_days
ppt 19 2005 - 2023 7305
tdmean 19 2005 - 2023 7305
tmax 19 2005 - 2023 7305
tmean 19 2005 - 2023 7305
tmin 19 2005 - 2023 7305
vpdmax 19 2005 - 2023 7305
vpdmin 19 2005 - 2023 7305

Looks good.

QC: raster file size stable
#' Here we want to make sure that the raster files sizes don't change over time. Small bug noticed inprocessing. 


## Get most previous log
db = arrow::open_dataset("cache_sensitive/raster_log")
df_log_previous = db %>% 
  distinct() %>% 
  collect() %>% 
  filter(log_index == max(log_index)) %>% 
  select(file_name, 
         prev_size = downstream_cache_file_size)
  
## Check difference in sizes
df_qc_tif_size = df_prism_processed %>% 
  select(file_name, 
         size = downstream_cache_file_size) %>% 
  left_join(df_log_previous, by = "file_name") %>% 
  mutate(same_size = size == prev_size)
df_invalid = df_qc_tif_size %>% 
  filter(!same_size)
if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")

Remaining

Finally lets return a list of files that still need processing - subseting for the limits we are interested in.

Files to process
df_prism_to_process = df_prism_raw %>% 
  left_join(df_prism_processed %>% 
              select(file_name, done) %>% 
              distinct()) %>% 
  mutate(
    upstream_cache = upstream_block %>% 
      str_remove_all(local_context$path_raw_prism) %>% 
      paste0(
        local_context$path_cache_raw, 
        .),
    upstream_cache_status = file.exists(upstream_cache),
    downstream_cache = upstream_block %>% 
      str_remove_all(local_context$path_raw_prism) %>% 
      paste0(local_context$path_cache_result, .) %>% 
      str_replace_all(".bil$", ".tif")
    ) %>% 
  filter(is.na(done))

df_prism_to_process %>% 
  add_count(measure, name = 'n_days') %>% 
  group_by(measure) %>% 
  summarize(
    n_years = unique(year) %>% sort() %>% length(),
    years = unique(year) %>% paste(collapse = ","),
    n_days = unique(n_days)
  ) %>% 
  arrange(desc(n_days), measure) 
measure n_years years n_days

Process from .bil to .tif

Cache upstream files

Note that if you are trying to convert a large number of files, it is best to cache the files locally before processing. But for a few files this isn’t really worth it. But here lets set up a guardrail that if we are trying to process more than 10 it throws a warning that so you should download.

Code
if (nrow(df_prism_to_process) > 10){
  cli_abort("You are trying to process more than 10 files, consider downloading the files locally")
}

Processing function

Now setup the function for conversion

Pre-process: Function
## test parameters
row = df_prism_to_process %>% 
  filter(measure == "tmean") %>%
  slice(1)

## Function To convert .bil to .tif
crated_convert_prism_to_tif = function(
    file_name,
    upstream_block, 
    upstream_cache,
    downstream_cache){
  
  ## Stop if done
  if ( file.exists(downstream_cache)) return("Already done")
  
  ## Import upstream raster
  if (file.exists(upstream_cache)){
    prism_rast = terra::rast(upstream_cache)
  } else {
    prism_rast = terra::rast(upstream_block)
  }

  ## Transform to Albers USA
  prism_rast_albers = prism_rast %>% terra::project("EPSG:5070")
  
  ## Export
  out_dir = file.path('cache_sensitive','raster', file_name)
  if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
  out_path = file.path(out_dir, paste0(file_name, '.tif'))

  prism_rast_albers |> terra::writeRaster(filename=out_path, overwrite=TRUE) 
  
  cli::cli_alert_success("Finished processing and export: {file_name}")

}

crated_convert_prism_to_tif__possibly <- possibly(
  crated_convert_prism_to_tif, 
  otherwise = NULL, 
  quiet = FALSE
  )

Processing

Parallel runs: convert .bil to .tif
if ( between(nrow(df_prism_to_process) ,1, 10) ){
  purrr::pmap(
    df_prism_to_process %>% 
      select(file_name, upstream_block, 
             upstream_cache, downstream_cache), 
    crated_convert_prism_to_tif__possibly
  )
}

if (nrow(df_prism_to_process) > 10){
  # Configure parallel
  plan(multisession, workers = 6)
  opts <- furrr_options(globals = TRUE, seed = TRUE)
  
  ## Run
  tic()
  df_prism_to_process %>%
    filter(upstream_cache_status == TRUE) %>% 
    select(file_name, upstream_block, 
           upstream_cache, downstream_cache) %>%
    furrr::future_pmap(
      .f = crated_convert_prism_to_tif__possibly,
      .options = opts
    )
  toc()
} 

Sync to server

For now due to size limitations (4.75 TB) we can only upload a subset of the geotiff files. We chose to prioritize the 2020 data for the county10 ESRI validations and benchmarks.

Get 2020 geotiff files
## Inventory 2020 geotiff files
df_2020_geotiff = tibble(
  path = list.files(
    local_context$path_cache_result, 
    pattern = '.tif$',
    recursive =  T,
    full.names = T) %>% 
    keep(~str_detect(.x, "_2020"))
) %>% 
  mutate(
    file = basename(path)
  ) %>% 
  mutate(measure = str_extract(file, "ppt|tmean|tmax|tmin|tdmean|vpdmax|vpdmin"),
         date = str_extract(file, "\\d{8}")) %>% 
  assert_rows(col_concat, is_uniq, measure, date)%>% 
  select(file, measure, date, path) 

## Preview
df_2020_geotiff %>% 
  reactable(searchable = T, defaultPageSize = 10)

So we have 2,562 geotiff files. Lets now copy these into Leah’s folder at - //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/CCUH processing/raw/raster_geotiff/

Copy 2020 geotiff files to server
df_2020_geotiff %>% 
  group_by(row_number()) %>%  
  group_walk(~{
    file = .$file
    path = .$path
    out_path = file.path(local_context$path_server_prism_geotiff, file)
    if (!file.exists(out_path)) {
      file.copy(path, out_path)
      cli_alert_success("Copied to server: {file}")
    } else {
      cli_alert("Already exists on server: {file}")
    }
  })