PRISM Zonal statistics - ESRI platform results

This notebook explores both Joe’s parquet conversions as well as direct ArcGISPro Parquet outputs

Authors

Ran Li (Maintainer)

Joe Simeone (Context Contributor)

Published

November 13, 2024

Code
## Dependencies
library(pacman) 
pacman::p_load(tidyverse, duckdb, arrow, purrr, cli, tictoc, glue, reactable)


## Python stuff
library(reticulate)
use_condaenv("r-reticulate")  


## Local Context
local_context = lst(  
  path_prism = '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1',
  path_prism_parquet = file.path(path_prism, 'ccuh_prism_zonal_stats_v1.parquet'),
  path_processing = file.path(path_prism, '_processing'),
  path_cache_qc = file.path(path_prism, 'cache', 'tract10_joe_subset_for_qc.parquet'),
  path_esri_tract10_parquets = file.path(path_processing, "esri_tract10_parquet"),
  path_esri_tract10_compiled = file.path(path_processing, "compiled_esri_tract10_prism_zonal_stats.parquet")
)

This notebook ingests the ESRI platform generated PRISM zonal statistics (Steve M. + Natalie R.) which was converted into a parquet data lake (Joe S.). This notebook will compile and do some basic validation before it is ingested into the CCUH system.

ESRI Parquet Outputs

Very exciting that we can stream line this directly from ESRI ArcGISPro which can output paritioned parquet file systems. See this GitHub comment from Steve Melly.

I tried converting January 2020 county tmean directly from file geodatabase table to parquet in ArcGISPro saved to _HEAT_Data90mcell_test Can you check and see if there are any problems with this file?

Initial examination of the file looks promising it looks like it was partitioned by year. Lets try connecting this file system!! (connect like you would connect to a database). Parquet files allow you to work efficiently either in memory (inmport everything very quickly) or if your data is bigger than RAM available to access or do computations of memory (like a database).

It all starts with a file somewhere

path_esri_parquet = '//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test'

Below lets do both.

Reading parquet

Here we work the traditional way by reading the data into memory. Below is how to do it with pandas.

import polars as pl
df = pl.read_parquet(path_esri_parquet)

print(df)
shape: (3_109, 6)
┌──────────┬─────────┬────────────┬─────────────────────┬──────────┬────────────┐
│ STCOFIPS ┆ MEASURE ┆ VALUE      ┆ DATEVALUE           ┆ OBJECTID ┆ year       │
│ ---      ┆ ---     ┆ ---        ┆ ---                 ┆ ---      ┆ ---        │
│ str      ┆ str     ┆ f64        ┆ datetime[ns]        ┆ i64      ┆ str        │
╞══════════╪═════════╪════════════╪═════════════════════╪══════════╪════════════╡
│ 01001    ┆ tmean   ┆ 6.896944   ┆ 2020-01-01 00:00:00 ┆ 1        ┆ 2020janagp │
│ 01003    ┆ tmean   ┆ 10.258131  ┆ 2020-01-01 00:00:00 ┆ 2        ┆ 2020janagp │
│ 01005    ┆ tmean   ┆ 7.992727   ┆ 2020-01-01 00:00:00 ┆ 3        ┆ 2020janagp │
│ 01007    ┆ tmean   ┆ 6.583633   ┆ 2020-01-01 00:00:00 ┆ 4        ┆ 2020janagp │
│ 01009    ┆ tmean   ┆ 5.037539   ┆ 2020-01-01 00:00:00 ┆ 5        ┆ 2020janagp │
│ …        ┆ …       ┆ …          ┆ …                   ┆ …        ┆ …          │
│ 56037    ┆ tmean   ┆ -12.122874 ┆ 2020-01-01 00:00:00 ┆ 3105     ┆ 2020janagp │
│ 56039    ┆ tmean   ┆ -13.429    ┆ 2020-01-01 00:00:00 ┆ 3106     ┆ 2020janagp │
│ 56041    ┆ tmean   ┆ -10.470041 ┆ 2020-01-01 00:00:00 ┆ 3107     ┆ 2020janagp │
│ 56043    ┆ tmean   ┆ -5.869372  ┆ 2020-01-01 00:00:00 ┆ 3108     ┆ 2020janagp │
│ 56045    ┆ tmean   ┆ -4.913032  ┆ 2020-01-01 00:00:00 ┆ 3109     ┆ 2020janagp │
└──────────┴─────────┴────────────┴─────────────────────┴──────────┴────────────┘

Importantly the above, we are reading the entire dataset into memory. So when we print the result it is the data in memory as a table. Which works here but let’s say the data is 500GB - then this would not work.

Connect to parquet

Alternative; we can leverage the big data features of parquet files which allow it work like a database - many modern analytic workflows can replace database just with parquet based file formats - e.g. Azure’s default storage - Delta tables - is a parquet file with some metadata attached as json. Importantly, this is very easy to do without any infrastructure costs and minimal coding changes.

For example here we ‘connect’ to the parquet file and print out.

## Connect to parquet like a database
con =  pl.scan_parquet(path_esri_parquet) 
print(con)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

Parquet SCAN [//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test\year=2020janagp\filedate.parquet]
PROJECT */6 COLUMNS
print(con.collect_schema())
Schema([('STCOFIPS', String), ('MEASURE', String), ('VALUE', Float64), ('DATEVALUE', Datetime(time_unit='ns', time_zone=None)), ('OBJECTID', Int64), ('year', String)])

Now what we see is not a table but rather a database-like connection from which we can do queries or data pulls. Importantly, see how the DATEVALUE is maintained - since all of this data and metadata is stored within the parquet file.

Note, this is extremely performant and designed for massive data pulls or computations and are often many scales faster than traditional transactional databases.

For example here we do a simple query of filter for a specific county.

## Query
query = (
  con
  .filter(pl.col('STCOFIPS') == '01007')
)

print(query)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

FILTER [(col("STCOFIPS")) == (String(01007))] FROM
  Parquet SCAN [//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test\year=2020janagp\filedate.parquet]
  PROJECT */6 COLUMNS

When we print the query it isn’t the results but just a representation of our request to the database.

Now lets execute or ‘collect’ our results.

## Execute Query
result = query.collect()

print(result)
shape: (1, 6)
┌──────────┬─────────┬──────────┬─────────────────────┬──────────┬────────────┐
│ STCOFIPS ┆ MEASURE ┆ VALUE    ┆ DATEVALUE           ┆ OBJECTID ┆ year       │
│ ---      ┆ ---     ┆ ---      ┆ ---                 ┆ ---      ┆ ---        │
│ str      ┆ str     ┆ f64      ┆ datetime[ns]        ┆ i64      ┆ str        │
╞══════════╪═════════╪══════════╪═════════════════════╪══════════╪════════════╡
│ 01007    ┆ tmean   ┆ 6.583633 ┆ 2020-01-01 00:00:00 ┆ 4        ┆ 2020janagp │
└──────────┴─────────┴──────────┴─────────────────────┴──────────┴────────────┘

Now when we print the result it is the table we requested.

Note that this work flow is built on Apache Arrow and can has integrations with all major open source langauges (R, Python, Javascript, Julia and Web Assembly) so this workfow is the foudnation for interoperability within modern data science workflows.

ESRI SAS to Parquet outputs

Lets take a look at what we are starting with. We froze Joe’s original results from //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/joe_simeone_work/tract_parquet into CCUH long term storage for reproducibility - specifically here //files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_deprecating/esri_tract10_parquet.

Inventory Joe Simeone Parquet Lake
df_joe = tibble(
  path = list.files(local_context$path_esri_tract10_parquet, full.names = T)
) %>% 
  mutate(
    file = basename(path),
    geo_level = str_sub(file, 1, 4),
    measure = file %>% str_remove_all("ct10|.parquet|\\d{4}"),
    year = file %>% str_extract_all("\\d{4}") %>% unlist()
  ) %>% 
  select(file, geo_level, measure, year, path)

df_joe %>% 
  reactable(searchable = T)

So we start with 60 parquet files each file for a combination of measure (5 ) and years (12). Each is a parquet file.

Compile (intermediate)

Lets compile them all together - I think duckdb parquet reader is probably the easiest thing.

query = glue::glue("
   COPY (
    SELECT * FROM read_parquet('{local_context$path_esri_tract10_parquets}/*.parquet', filename = true)
    ) TO '{local_context$path_esri_tract10_compiled}'  (FORMAT 'parquet')
") 
Excute compilation
if (!file.exists(local_context$path_esri_tract10_compiled)){ 
  con <- dbConnect(duckdb())

  result = dbExecute(con, query)
} else {
  ds_esri_results = local_context$path_esri_tract10_compiled %>% open_dataset()
}

dim(ds_esri_results)
[1] 1583271090          8

This took some time (15 minutes ish) - mostly for writing parquet. The result is a 1.6 billion row table!

Validations

Before we ingest into the CCUH system, lets just do some basic tests - we’ll just do the same things for a few tracts.

Define test tracts
df_tracts_test = tribble(
  ~geoid,         ~desc,                                                          ~state, ~state_fips,
  '42101000100',  '42=PA, 101=Philadelphia County/City - Center City area',      'PA',   '42',
  '42003020100',  '42=PA, 003=Allegheny County (Pittsburgh) - Downtown area',    'PA',   '42',
  '36061000100',  '36=NY, 061=New York County (Manhattan) - Financial District', 'NY',   '36'
)

okay lets pull the data! For a table this big (> 1 billion rows) we recomend just using DuckDB.

Pull data for four tracts
if (!file.exists(local_context$path_cache_qc)) {
  ## Database connection
  con <- dbConnect(duckdb())
  
  ## Write query
  tracts_string = paste(sprintf("'%s'", df_tracts_test$geoid), collapse = ", ")
  state_fip_string = paste(sprintf("'%s'", unique(df_tracts_test$state_fips)), collapse = ", ")
  query = glue::glue("
  SELECT 
    GEOID10, MEASURE, DATEVAL, VALUE 
  FROM '{local_context$path_esri_tract10_compiled}' 
  WHERE ST_FIPS IN ({state_fip_string})
    AND GEOID10 IN ({tracts_string})
")
  query
  
  ## Execute
  df_qc  = dbGetQuery(con, query) %>% as_tibble()
  
  ## Cache
  df_qc %>% arrow::write_parquet(local_context$path_cache_qc)

} else {
  df_qc = arrow::read_parquet(local_context$path_cache_qc)
}

Test unique composite keys

Check composite key integrity for test tracts
df_invalid = df_qc %>% 
  add_count(GEOID10, MEASURE, DATEVAL) %>% 
  filter(n>1)
if (nrow(df_invalid) > 0) cli_abort("Composite key integrity failed")

Visual inspection

Visual inspection 1
## Subset
row = df_tracts_test %>% slice(1)
df = df_qc %>% 
  filter(GEOID10 == row$geoid)
 

## Plot
df %>% 
  ggplot(aes(x = DATEVAL, y = VALUE)) + 
  geom_line() + 
  facet_wrap(~MEASURE, scales = "free_y") + 
  labs(title = glue("Visual inspection 1: Looks okay"),
       subtitle = glue(" {row$geoid} ({row$desc})"))

Visual inspection 2
## Subset
row = df_tracts_test %>% slice(2)
df = df_qc %>% 
  filter(GEOID10 == row$geoid)
 

## Plot
df %>% 
  ggplot(aes(x = DATEVAL, y = VALUE)) + 
  geom_line() + 
  facet_wrap(~MEASURE, scales = "free_y") + 
  labs(title = glue("Visual inspection 2: Looks okay"),
       subtitle = glue(" {row$geoid} ({row$desc})"))

Visual inspection 3
## Subset
row = df_tracts_test %>% slice(3)
df1 = df_qc %>% 
  filter(GEOID10 == row$geoid)
 

## Plot
df %>% 
  ggplot(aes(x = DATEVAL, y = VALUE)) + 
  geom_line() + 
  facet_wrap(~MEASURE, scales = "free_y") + 
  labs(title = glue("Visual inspection 3: Looks okay"),
       subtitle = glue(" {row$geoid} ({row$desc})"))

Okay looks to me. Ready for ingestion.