This notebook is meant to be point of access for the CCUH PRISM resource. It includes an inventory of available raw PRISM raster data, calculated zonal statistics, and code snippets for accessing the zonal statistics for those with access.
We have 48,573 total PRISM files distributed by the measures and years above. If new PRISM is added please update the PRISM Source Data Notebook.
2. Zonal Statistics Inventory
Compilation
We have PRISM results generated from CCUH processing stored as a hive partitioned dataset and results from the ESRI platform stored as a parquet file. Here we compile into a single file for more performant access and reuse. This compilation step has a few steps each with its own code chunk below.
Compile CCUH partitioned results into a single table
Harmonize ESRI table into CCUH formatted single table
Merge the two tables into a single table for access
Harmonize CCUH and ESRI schema
Evaluate existing schemas
## Database connectioncon =dbConnect(duckdb())## CCUH Resultsds_ccuh = local_context$path_server_int_ccuh_data %>%open_dataset()path_ccuh_partition ='cache_sensitive/result/engine=exactextract/geo_level=tract10/measure=tmean/state=PA/year=2019/data.parquet'dev_query_ccuh =glue(" SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) WHERE year = 2019 AND state = 'PA' LIMIT 5") df_ccuh <-dbGetQuery(con, dev_query_ccuh) %>%as_tibble()## ESRI Resultsds_esri = local_context$path_server_int_esri_data %>%open_dataset()dev_query_esri =glue(" SELECT * FROM '{ds_esri$files}' LIMIT 5")df_esri <-dbGetQuery(con, dev_query_esri) %>%as_tibble()## Preview schemasds_ccuh
Looks good lets translate this into a duckdb query.
Develop harmonization query in duckdb
con =dbConnect(duckdb())query_harmonization_dev =glue("COPY ( WITH xwalk_state AS ( SELECT \"state.census.geoid\" as ST_FIPS,\"state.abb\" as state FROM read_parquet('{ds_xwalk_state_fip$files}') ), ccuh_data AS ( SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) WHERE year = 2019 AND state = 'PA' LIMIT 10 ), esri_data AS ( SELECT GEOID10 as GEOID, DATEVAL as date, VALUE as value, 'esri' as engine, 'tract10' as geo_level, LOWER(MEASURE) as measure, xwalk.state, YEAR as year FROM read_parquet('{ds_esri$files}') esri LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS WHERE year = 2019 AND state = 'PA' LIMIT 10 ) SELECT * FROM ccuh_data UNION ALL SELECT * FROM esri_data) TO 'cache/dev_harmonized.parquet' (FORMAT 'parquet')")dbExecute(con, query_harmonization_dev)
This is the actual harmonization query to run without any limitations.
Harmonize and merge CCUH and ESRI datasets
if (!file.exists(local_context$path_server_final_data)){ con =dbConnect(duckdb()) query_harmonization =glue("COPY ( WITH xwalk_state AS ( SELECT \"state.census.geoid\" as ST_FIPS,\"state.abb\" as state FROM read_parquet('{ds_xwalk_state_fip$files}') ), ccuh_data AS ( SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) ), esri_data AS ( SELECT GEOID10 as GEOID, DATEVAL as date, VALUE as value, 'esri' as engine, 'tract10' as geo_level, LOWER(MEASURE) as measure, xwalk.state, YEAR as year FROM read_parquet('{ds_esri$files}') esri LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS ) SELECT * FROM ccuh_data UNION ALL SELECT * FROM esri_data) TO '{local_context$path_server_final_data}' (FORMAT 'parquet')")print(Sys.time())dbExecute(con, query_harmonization) ## ~2 hours for all PRISM compilation (mostly file writing time)print(Sys.time())}
Inventory
Inventory available Zonal Stats Data
if (!file.exists(local_context$path_cache_df_results_inventory)){ con =dbConnect(duckdb()) query_inventory =glue(" SELECT engine, measure, geo_level, year, state, COUNT(*) as n_data_points FROM read_parquet('{local_context$path_server_final_data}') GROUP BY engine, measure, geo_level, year, state ") df_results_inventory <-dbGetQuery(con, query_inventory) %>%as_tibble() df_results_inventory %>%write_parquet(local_context$path_cache_df_results_inventory)}# Inventory resultsdf_results_inventory = local_context$path_cache_df_results_inventory %>%read_parquet()
```{r}library(duckdb)library(glue)# Create connection and read parquet datasetcon <- dbConnect(duckdb())# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"# Filter the dataquery <- glue_sql(" SELECT * FROM {`prism_path`} WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020", .con = con)# Execute querydf <- dbGetQuery(con, query)# Print resultshead(df)# Close connectiondbDisconnect(con, shutdown = TRUE)```
```{python}import duckdb# Create connectioncon = duckdb.connect()# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"# Define and execute queryquery = f""" SELECT * FROM '{prism_path}' WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020"""# Execute query and get results as a DataFramedf = con.execute(query).df()# Print resultsprint(df.head())# Close connectioncon.close()```
---title: "PRISM - 1 - Inventory"author: - name: Ran Li (Maintainer) orcid: 0000-0002-4699-4755 - name: Joe Simeone (Context Contributor)date: last-modifiedformat: html: toc: true self-contained: true code-fold: true df-print: kable code-tools: true comments: hypothesis: theme: cleaneditor: sourceexecute: warning: false message: false eval: trueeditor_options: chunk_output_type: console---```{r}#| code-summary: 'Setup'#| code-fold: true## Dependenciespacman::p_load( tidyverse, assertr, arrow, tictoc, furrr, carrier, glue, exactextractr,duckdb, DBI, reactable, tidycensus, terra, sfarrow, leaflet, sf, haven, ggpubr, cli, duckdb )## Local Contextsource("PRISM - 0 - global context.R")local_context =c(global_context)```This notebook is meant to be point of access for the CCUH PRISM resource. It includes an inventory of available raw PRISM raster data, calculated zonal statistics, and code snippets for accessing the zonal statistics for those with access.# 1. PRISM Raster ## Inventory```{r}#| code-summary: "Inventory available PRISM Data"## Inventoryif (!file.exists(local_context$path_cache_df_prism_inventory)){ vec__cached_prism_rasters_local =list.files( local_context$path_cache_prism_geotiff , recursive = T,full.names = T) vec__cached_prism_rasters_server =list.files( local_context$path_server_prism_geotiff ,recursive = T,full.names = T) df_prism_inventory_all=tibble(r_path_cache =c( vec__cached_prism_rasters_local, vec__cached_prism_rasters_server ) ) |>mutate(server =str_detect(r_path_cache, 'files.drexel.edu/encrypted'),r_base_name =basename(r_path_cache),base_name_split_tmp =str_split(r_base_name, "_"),measure =map_chr(base_name_split_tmp, ~ .x[2]),datetxt =map_chr(base_name_split_tmp, ~ .x[5]) |>str_remove_all('.tif'),date =as.Date(datetxt, format ="%Y%m%d"),year = lubridate::year(date),month =month(as.Date(date)),day =day(as.Date(date)) ) |>select(-base_name_split_tmp) |>select(r_base_name, measure, datetxt,date,everything()) %>%filter(r_base_name !='Thumbs.db') %>%mutate(raster_size =file.size(r_path_cache)) df_prism_inventory_server = df_prism_inventory %>%filter(server) df_prism_inventory = df_prism_inventory_server %>%bind_rows(df_prism_inventory_all %>%filter(!r_base_name%in%df_prism_inventory_server$r_base_name)) %>%## Validationassert(not_na , measure, date) %>%assert_rows(col_concat, is_uniq, measure, date) %>%verify(## 365 days of data for each measure-year combinationcount(., measure, year, name ='n_file_per_year') %>%filter(n_file_per_year<365) %>%nrow() ==0 ) df_prism_inventory %>% arrow::write_parquet(local_context$path_cache_df_prism_inventory)}df_prism_inventory = local_context$path_cache_df_prism_inventory %>% arrow::read_parquet()## Summarizedf_prism_inventory %>%utility__summarize_prism_inventory() ```We have `r format(nrow(df_prism_inventory), big.mark = ",")` total PRISM files distributed by the measures and years above. If new PRISM is added please update the [PRISM Source Data Notebook](https://drexelccuh.quarto.pub/source__prism_raster_tif_albers_v1).# 2. Zonal Statistics Inventory## CompilationWe have PRISM results generated from CCUH processing stored as a hive partitioned dataset and results from the ESRI platform stored as a parquet file. Here we compile into a single file for more performant access and reuse. This compilation step has a few steps each with its own code chunk below.- **Compile CCUH** partitioned results into a single table- **Harmonize ESRI** table into CCUH formatted single table- **Merge** the two tables into a single table for access### Harmonize CCUH and ESRI schema```{r}#| code-summary: "Evaluate existing schemas"## Database connectioncon =dbConnect(duckdb())## CCUH Resultsds_ccuh = local_context$path_server_int_ccuh_data %>%open_dataset()path_ccuh_partition ='cache_sensitive/result/engine=exactextract/geo_level=tract10/measure=tmean/state=PA/year=2019/data.parquet'dev_query_ccuh =glue(" SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) WHERE year = 2019 AND state = 'PA' LIMIT 5") df_ccuh <-dbGetQuery(con, dev_query_ccuh) %>%as_tibble()## ESRI Resultsds_esri = local_context$path_server_int_esri_data %>%open_dataset()dev_query_esri =glue(" SELECT * FROM '{ds_esri$files}' LIMIT 5")df_esri <-dbGetQuery(con, dev_query_esri) %>%as_tibble()## Preview schemasds_ccuhds_esri```Okay this is pretty simple, just some renames. Lets develop the logic here before we merge.```{r}#| code-summary: "Develop harmonization query in dplyr"## Op. state_fip crosswalkds_xwalk_state_fip = local_context$path_server_xwalk_state %>%open_dataset()xwalk_state_fip <- ds_xwalk_state_fip %>%collect() %>%select(state.census.geoid, state = state.abb) ## Dplyrdf_esri %>%rename(GEOID = GEOID10,date = DATEVAL,value = VALUE,measure = MEASURE,state.census.geoid = ST_FIPS,year = YEAR ) %>%left_join(xwalk_state_fip) %>%mutate(engine ="esri", # Adding missing engine fieldmeasure =tolower(measure), # Converting measure to lowercasegeo_level ='tract10' ) %>%select(GEOID, date, value, engine, geo_level, measure, state, year)```Looks good lets translate this into a duckdb query. ```{r}#| code-summary: "Develop harmonization query in duckdb"con =dbConnect(duckdb())query_harmonization_dev =glue("COPY ( WITH xwalk_state AS ( SELECT \"state.census.geoid\" as ST_FIPS,\"state.abb\" as state FROM read_parquet('{ds_xwalk_state_fip$files}') ), ccuh_data AS ( SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) WHERE year = 2019 AND state = 'PA' LIMIT 10 ), esri_data AS ( SELECT GEOID10 as GEOID, DATEVAL as date, VALUE as value, 'esri' as engine, 'tract10' as geo_level, LOWER(MEASURE) as measure, xwalk.state, YEAR as year FROM read_parquet('{ds_esri$files}') esri LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS WHERE year = 2019 AND state = 'PA' LIMIT 10 ) SELECT * FROM ccuh_data UNION ALL SELECT * FROM esri_data) TO 'cache/dev_harmonized.parquet' (FORMAT 'parquet')")dbExecute(con, query_harmonization_dev)```Okay Lets take a look```{r}#| code-summary: "Preview harmonized data""cache/dev_harmonized.parquet"%>%open_dataset() %>%collect()```Looks good.### MergeThis is the actual harmonization query to run without any limitations.```{r}#| code-summary: "Harmonize and merge CCUH and ESRI datasets"if (!file.exists(local_context$path_server_final_data)){ con =dbConnect(duckdb()) query_harmonization =glue("COPY ( WITH xwalk_state AS ( SELECT \"state.census.geoid\" as ST_FIPS,\"state.abb\" as state FROM read_parquet('{ds_xwalk_state_fip$files}') ), ccuh_data AS ( SELECT * FROM read_parquet( '{local_context$path_server_results}/*/*/*/*/*/*.parquet', hive_partitioning = true ) ), esri_data AS ( SELECT GEOID10 as GEOID, DATEVAL as date, VALUE as value, 'esri' as engine, 'tract10' as geo_level, LOWER(MEASURE) as measure, xwalk.state, YEAR as year FROM read_parquet('{ds_esri$files}') esri LEFT JOIN xwalk_state xwalk ON esri.ST_FIPS = xwalk.ST_FIPS ) SELECT * FROM ccuh_data UNION ALL SELECT * FROM esri_data) TO '{local_context$path_server_final_data}' (FORMAT 'parquet')")print(Sys.time())dbExecute(con, query_harmonization) ## ~2 hours for all PRISM compilation (mostly file writing time)print(Sys.time())}```## Inventory```{r}#| code-summary: "Inventory available Zonal Stats Data"if (!file.exists(local_context$path_cache_df_results_inventory)){ con =dbConnect(duckdb()) query_inventory =glue(" SELECT engine, measure, geo_level, year, state, COUNT(*) as n_data_points FROM read_parquet('{local_context$path_server_final_data}') GROUP BY engine, measure, geo_level, year, state ") df_results_inventory <-dbGetQuery(con, query_inventory) %>%as_tibble() df_results_inventory %>%write_parquet(local_context$path_cache_df_results_inventory)}# Inventory resultsdf_results_inventory = local_context$path_cache_df_results_inventory %>%read_parquet()```- **Unique Measures**: `r df_results_inventory %>% pull(measure) %>% unique() %>% length()`- **Total Data Points**: `r format(sum(df_results_inventory$n_data_points), big.mark = ",")`::: {.panel-tabset}## Data by Engine```{r}df_results_inventory %>%summarise(n_data_points =sum(n_data_points),.by =c('engine','geo_level'))```## Details by Engine```{r}df_results_inventory %>%# filter(engine == 'exactextract') %>% group_by(measure, engine, geo_level) %>%summarize(n_states =unique(state) %>%sort() %>%length(),n_years =unique(year) %>%sort() %>%length(),year_range =min(year) %>%paste0(" - ", max(year)) ) %>%arrange( measure) %>%reactable(searchable = T,defaultPageSize =10 )```:::# 3. Access## Data::: {.panel-tabset}## R ```{r}library(arrow)library(tidyverse)ds_prism = "`r local_context$path_server_final_data`" |> arrow::open_dataset() df = ds_prism |> filter( engine == 'exactextract', geo_level == 'tract10', state == 'PA', measure == 'tmean', year == 2020 ) |>collect()```## Python```{python}import polars as plds_prism = pl.scan_parquet("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet")df = (ds_prism .filter( (pl.col("engine") == "exactextract") & (pl.col("geo_level") == "tract10") & (pl.col("state") == "PA") & (pl.col("measure") == "tmean") & (pl.col("year") == 2020) ) .collect())```## SQL (DuckDB-R)```{r}library(duckdb)library(glue)# Create connection and read parquet datasetcon <- dbConnect(duckdb())# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"# Filter the dataquery <- glue_sql(" SELECT * FROM {`prism_path`} WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020", .con = con)# Execute querydf <- dbGetQuery(con, query)# Print resultshead(df)# Close connectiondbDisconnect(con, shutdown = TRUE)````## SQL (DuckDB-Python)```{python}import duckdb# Create connectioncon = duckdb.connect()# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/prism_zonal_stats_v1.parquet"# Define and execute queryquery = f""" SELECT * FROM '{prism_path}' WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020"""# Execute query and get results as a DataFramedf = con.execute(query).df()# Print resultsprint(df.head())# Close connectioncon.close()````:::## Metadata### PRISM Table```{r}#| code-summary: 'PRISM Table metadata'local_context$df_metadata %>%reactable(searchable = T,defaultPageSize =99 )```### PRISM Variables```{r}#| code-summary: 'PRISM measures metadata'local_context$df_prism_measures %>%reactable(searchable = T,defaultPageSize =99 )```---# Appendix## Access BenchmarksThis will probably be its own notebook in the future but but some recomendations for query performance to leverage [scan optimizations like Projection and Filter Pushdown](https://duckdb.org/2021/12/03/duck-arrow.html).