PRISM - 1 - Inventory

Authors

Ran Li (Maintainer)

Joe Simeone (Context Contributor)

Published

October 28, 2024

Setup
## Dependencies
pacman::p_load(
  tidyverse, assertr, arrow,
  tictoc, furrr, carrier, glue, 
  exactextractr,duckdb, DBI, reactable, tidycensus,
  terra, sfarrow, leaflet, sf, haven, ggpubr, cli, duckdb )

## Local Context
source("PRISM - 0 - global context.R")
local_context = c(global_context)

This notebook is meant to be point of access for the CCUH PRISM resource. It includes an inventory of available raw PRISM raster data, calculated zonal statistics, and code snippets for accessing the zonal statistics for those with access.

1. PRISM Raster

Inventory

Inventory available PRISM Data
## Inventory
if (!file.exists(local_context$path_cache_df_prism_inventory)){
  vec__cached_prism_rasters_local  = list.files(
    local_context$path_cache_prism_geotiff , 
    recursive =  T,
    full.names = T)
  
  vec__cached_prism_rasters_server  = list.files(
    local_context$path_server_prism_geotiff ,
    recursive =  T,
    full.names = T)
  
  df_prism_inventory_all= tibble(
    r_path_cache = c(
      vec__cached_prism_rasters_local, vec__cached_prism_rasters_server
      )
  ) |> 
    mutate(
      server = str_detect(r_path_cache, 'files.drexel.edu/encrypted'),
      r_base_name = basename(r_path_cache),
      base_name_split_tmp =  str_split(r_base_name, "_"),
      measure = map_chr(base_name_split_tmp, ~ .x[2]),
      datetxt = map_chr(base_name_split_tmp, ~ .x[5]) |>
        str_remove_all('.tif'),
      date = as.Date(datetxt, format = "%Y%m%d"),
      year = lubridate::year(date),
      month = month(as.Date(date)),
      day = day(as.Date(date))
    ) |> 
    select(-base_name_split_tmp) |> 
    select(r_base_name, measure, datetxt,date,everything()) %>% 
    filter(r_base_name != 'Thumbs.db') %>% 
    mutate(raster_size = file.size(r_path_cache))
    
    
    df_prism_inventory_server = df_prism_inventory %>% 
      filter(server) 
    
    df_prism_inventory = df_prism_inventory_server %>% 
      bind_rows(df_prism_inventory_all %>%
                  filter(!r_base_name%in%df_prism_inventory_server$r_base_name)) %>% 
      ## Validation
      assert(not_na , measure, date) %>%
      assert_rows(col_concat, is_uniq, measure, date) %>% 
      verify(
        ## 365 days of data for each measure-year combination
        count(., measure, year, name = 'n_file_per_year') %>% 
          filter(n_file_per_year<365) %>%
          nrow() == 0
      )
    
    df_prism_inventory %>%
      arrow::write_parquet(local_context$path_cache_df_prism_inventory)
}
df_prism_inventory = local_context$path_cache_df_prism_inventory %>% 
  arrow::read_parquet()

## Summarize
df_prism_inventory %>% 
  utility__summarize_prism_inventory() 
measure n_years year_range n_days
ppt 19 2005 - 2023 6939
tdmean 19 2005 - 2023 6939
tmax 19 2005 - 2023 6939
tmean 19 2005 - 2023 6939
tmin 19 2005 - 2023 6939
vpdmax 19 2005 - 2023 6939
vpdmin 19 2005 - 2023 6939

We have 48,573 total PRISM files distributed by the measures and years above. If new PRISM is added please update the PRISM Source Data Notebook.

2. Zonal Statistics Inventory

Compilation

We have PRISM results generated from CCUH processing stored as a hive partitioned dataset and results from the ESRI platform stored as a parquet file. Here we compile into a single file for more performant access and reuse. This compilation step has a few steps each with its own code chunk below.

  • Compile CCUH partitioned results into a single table
  • Harmonize ESRI table into CCUH formatted single table
  • Merge the two tables into a single table for access

Harmonize CCUH and ESRI schema

Evaluate existing schemas
## Database connection
con = dbConnect(duckdb())

## CCUH Results
ds_ccuh = local_context$path_server_int_ccuh_data %>% open_dataset()
path_ccuh_partition = 'cache_sensitive/result/engine=exactextract/geo_level=tract10/measure=tmean/state=PA/year=2019/data.parquet'
dev_query_ccuh = glue("
  SELECT *
  FROM read_parquet(
        '{local_context$path_server_results}/*/*/*/*/*/*.parquet',
        hive_partitioning = true
        )
  WHERE year = 2019 AND state = 'PA'      
  LIMIT 5
") 
df_ccuh <- dbGetQuery(con,  dev_query_ccuh) %>% as_tibble()


## ESRI Results
ds_esri = local_context$path_server_int_esri_data %>% open_dataset()
dev_query_esri = glue("
  SELECT *
  FROM '{ds_esri$files}'
  LIMIT 5
")
df_esri <- dbGetQuery(con,  dev_query_esri) %>% as_tibble()

## Preview schemas
ds_ccuh
FileSystemDataset with 1 Parquet file
GEOID: string
date: date32[day]
value: double
engine: string
geo_level: string
measure: string
state: string
year: int64
Evaluate existing schemas
ds_esri
FileSystemDataset with 1 Parquet file
GEOID10: string
MEASURE: string
DATETXT: string
DATEVAL: date32[day]
VALUE: double
YEAR: double
ST_FIPS: string
filename: string

Okay this is pretty simple, just some renames. Lets develop the logic here before we merge.

Develop harmonization query in dplyr
## Op. state_fip crosswalk
ds_xwalk_state_fip = local_context$path_server_xwalk_state %>% open_dataset()
xwalk_state_fip <- ds_xwalk_state_fip %>%
  collect() %>% 
  select(state.census.geoid, state = state.abb) 


## Dplyr
df_esri %>%
  rename(
    GEOID = GEOID10,
    date = DATEVAL,
    value = VALUE,
    measure = MEASURE,
    state.census.geoid = ST_FIPS,
    year = YEAR
  ) %>%
  left_join(xwalk_state_fip) %>% 
  mutate(
    engine = "esri",  # Adding missing engine field
    measure = tolower(measure),  # Converting measure to lowercase
    geo_level = 'tract10'
  ) %>%
  select(GEOID, date, value, engine, geo_level, measure, state, year)
GEOID date value engine geo_level measure state year
01005950300 2008-01-01 0 esri tract10 ppt AL 2008
01005950900 2008-01-01 0 esri tract10 ppt AL 2008
01005950800 2008-01-01 0 esri tract10 ppt AL 2008
01005950700 2008-01-01 0 esri tract10 ppt AL 2008
01005950600 2008-01-01 0 esri tract10 ppt AL 2008

Looks good lets translate this into a duckdb query.

Develop harmonization query in duckdb
con = dbConnect(duckdb())

query_harmonization_dev = glue("
COPY (
  WITH xwalk_state AS (
    SELECT 
      \"state.census.geoid\" as ST_FIPS,
      \"state.abb\" as state
    FROM read_parquet('{ds_xwalk_state_fip$files}')
  ),
  ccuh_data AS (
    SELECT *
    FROM read_parquet(
      '{local_context$path_server_results}/*/*/*/*/*/*.parquet',
      hive_partitioning = true
      )
    WHERE year = 2019 AND state = 'PA'
    LIMIT 10
  ),
  esri_data AS (
    SELECT 
      GEOID10 as GEOID,
      DATEVAL as date,
      VALUE as value,
      'esri' as engine,
      'tract10' as geo_level,
      LOWER(MEASURE) as measure,
      xwalk.state,
      YEAR as year
    FROM read_parquet('{ds_esri$files}') esri
    LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS
    WHERE year = 2019 AND state = 'PA'
    LIMIT 10
  )
  SELECT * FROM ccuh_data
  UNION ALL
  SELECT * FROM esri_data
) TO 'cache/dev_harmonized.parquet' 
(FORMAT 'parquet')
")

dbExecute(con, query_harmonization_dev)
[1] 20

Okay Lets take a look

Preview harmonized data
"cache/dev_harmonized.parquet" %>% 
  open_dataset() %>% 
  collect()
GEOID date value engine geo_level measure state year
42021 2019-01-01 18.47894 exactextract county10 ppt PA 2019
42003 2019-01-01 17.96466 exactextract county10 ppt PA 2019
42001 2019-01-01 21.02010 exactextract county10 ppt PA 2019
42061 2019-01-01 19.67658 exactextract county10 ppt PA 2019
42039 2019-01-01 24.65426 exactextract county10 ppt PA 2019
42045 2019-01-01 22.67616 exactextract county10 ppt PA 2019
42035 2019-01-01 25.88171 exactextract county10 ppt PA 2019
42073 2019-01-01 21.70659 exactextract county10 ppt PA 2019
42093 2019-01-01 22.07783 exactextract county10 ppt PA 2019
42117 2019-01-01 14.70781 exactextract county10 ppt PA 2019
42001030101 2019-01-01 21.61870 esri tract10 ppt PA 2019
42001030102 2019-01-01 21.79928 esri tract10 ppt PA 2019
42001030200 2019-01-01 21.05810 esri tract10 ppt PA 2019
42001030300 2019-01-01 20.22635 esri tract10 ppt PA 2019
42001030400 2019-01-01 20.15579 esri tract10 ppt PA 2019
42001030500 2019-01-01 20.29276 esri tract10 ppt PA 2019
42001030600 2019-01-01 21.34399 esri tract10 ppt PA 2019
42001030700 2019-01-01 21.76708 esri tract10 ppt PA 2019
42001030800 2019-01-01 22.15399 esri tract10 ppt PA 2019
42001030900 2019-01-01 22.36329 esri tract10 ppt PA 2019

Looks good.

Merge

This is the actual harmonization query to run without any limitations.

Harmonize and merge CCUH and ESRI datasets
if (!file.exists(local_context$path_server_final_data)){
  
  con = dbConnect(duckdb())
  
  
  query_harmonization = glue("
COPY (
  WITH xwalk_state AS (
    SELECT 
      \"state.census.geoid\" as ST_FIPS,
      \"state.abb\" as state
    FROM read_parquet('{ds_xwalk_state_fip$files}')
  ),
  ccuh_data AS (
    SELECT *
    FROM read_parquet(
      '{local_context$path_server_results}/*/*/*/*/*/*.parquet',
      hive_partitioning = true
      )
  ),
  esri_data AS (
    SELECT 
      GEOID10 as GEOID,
      DATEVAL as date,
      VALUE as value,
      'esri' as engine,
      'tract10' as geo_level,
      LOWER(MEASURE) as measure,
      xwalk.state,
      YEAR as year
    FROM read_parquet('{ds_esri$files}') esri
    LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS
  )
  SELECT * FROM ccuh_data
  UNION ALL
  SELECT * FROM esri_data
) TO '{local_context$path_server_final_data}' (FORMAT 'parquet')
")
  
  print(Sys.time())
  dbExecute(con, query_harmonization) ## ~2 hours for all PRISM compilation (mostly file writing time)
  print(Sys.time())
  
}

Inventory

Inventory available Zonal Stats Data
if (!file.exists(local_context$path_cache_df_results_inventory)){
  
  con = dbConnect(duckdb())
  
  query_inventory = glue("
   SELECT 
     engine,
     measure, 
     geo_level,
     year,
     state,
     COUNT(*) as n_data_points
   FROM read_parquet('{local_context$path_server_final_data}')
   GROUP BY 
     engine,
     measure,
     geo_level, 
     year,
     state
  ")
  
  df_results_inventory <- dbGetQuery(con, query_inventory) %>% as_tibble()
  
  df_results_inventory %>%
    write_parquet(local_context$path_cache_df_results_inventory)
}

# Inventory results
df_results_inventory = local_context$path_cache_df_results_inventory %>%
  read_parquet()
  • Unique Measures: 7
  • Total Data Points: 2,156,659,034
Code
df_results_inventory %>% 
  summarise(n_data_points = sum(n_data_points),
            .by = c('engine','geo_level'))
engine geo_level n_data_points
exactextract county10 152616338
exactextract county_subdivisions10 105297192
exactextract tract10 131641944
terra county10 1590580
terra county_subdivisions10 62070010
terra tract10 76395320
terra zcta10 42684520
esri tract10 1583271090
terra DSPHNBHD 1092040
Code
df_results_inventory %>% 
  # filter(engine == 'exactextract') %>% 
  group_by(measure, engine, geo_level) %>% 
  summarize(
    n_states = unique(state) %>% sort() %>% length(),
    n_years = unique(year) %>% sort() %>% length(),
    year_range = min(year) %>% paste0(" - ", max(year))
  ) %>% 
  arrange( measure) %>% 
  reactable(
    searchable = T,
    defaultPageSize = 10
  )

3. Access

Data

```{r}
library(arrow)
library(tidyverse)

ds_prism = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet" |> 
  arrow::open_dataset() 
  
df = ds_prism |>
  filter(
    engine == 'exactextract',
    geo_level == 'tract10',
    state == 'PA',
    measure == 'tmean',
    year == 2020
  ) |>
collect()
```
```{python}
import polars as pl

ds_prism = pl.scan_parquet("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet")

df = (ds_prism
    .filter(
        (pl.col("engine") == "exactextract") &
        (pl.col("geo_level") == "tract10") &
        (pl.col("state") == "PA") &
        (pl.col("measure") == "tmean") &
        (pl.col("year") == 2020)
    )
    .collect()
)

```
```{r}
library(duckdb)
library(glue)

# Create connection and read parquet dataset
con <- dbConnect(duckdb())


# Define the path to the parquet file
prism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"

# Filter the data
query <- glue_sql("
  SELECT *
  FROM {`prism_path`}
  WHERE engine = 'exactextract'
    AND geo_level = 'tract10'
    AND state = 'PA'
    AND measure = 'tmean'
    AND year = 2020
", .con = con)


# Execute query
df <- dbGetQuery(con, query)

# Print results
head(df)

# Close connection
dbDisconnect(con, shutdown = TRUE)
```
```{python}
import duckdb

# Create connection
con = duckdb.connect()

# Define the path to the parquet file
prism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"

# Define and execute query
query = f"""
    SELECT *
    FROM '{prism_path}'
    WHERE engine = 'exactextract'
        AND geo_level = 'tract10'
        AND state = 'PA'
        AND measure = 'tmean'
        AND year = 2020
"""

# Execute query and get results as a DataFrame
df = con.execute(query).df()

# Print results
print(df.head())

# Close connection
con.close()
```

Metadata

PRISM Table

PRISM Table metadata
local_context$df_metadata %>% 
  reactable(
    searchable = T,
    defaultPageSize = 99
  )

PRISM Variables

PRISM measures metadata
local_context$df_prism_measures %>% 
  reactable(
    searchable = T,
    defaultPageSize = 99
  )

Appendix

Access Benchmarks

This will probably be its own notebook in the future but but some recomendations for query performance to leverage scan optimizations like Projection and Filter Pushdown.