This notebook ingests the ESRI platform generated PRISM zonal statistics (Steve M. + Natalie R.) which was converted into a parquet data lake (Joe S.). This notebook will compile and do some basic validation before it is ingested into the CCUH system.
ESRI Parquet Outputs
Very exciting that we can stream line this directly from ESRI ArcGISPro which can output paritioned parquet file systems. See this GitHub comment from Steve Melly.
I tried converting January 2020 county tmean directly from file geodatabase table to parquet in ArcGISPro saved to _HEAT_Data90mcell_test Can you check and see if there are any problems with this file?
Initial examination of the file looks promising it looks like it was partitioned by year. Lets try connecting this file system!! (connect like you would connect to a database). Parquet files allow you to work efficiently either in memory (inmport everything very quickly) or if your data is bigger than RAM available to access or do computations of memory (like a database).
Importantly the above, we are reading the entire dataset into memory. So when we print the result it is the data in memory as a table. Which works here but let’s say the data is 500GB - then this would not work.
For example here we ‘connect’ to the parquet file and print out.
## Connect to parquet like a databasecon = pl.scan_parquet(path_esri_parquet) print(con)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)
Parquet SCAN [//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test\year=2020janagp\filedate.parquet]
PROJECT */6 COLUMNS
Now what we see is not a table but rather a database-like connection from which we can do queries or data pulls. Importantly, see how the DATEVALUE is maintained - since all of this data and metadata is stored within the parquet file.
Note, this is extremely performant and designed for massive data pulls or computations and are often many scales faster than traditional transactional databases.
For example here we do a simple query of filter for a specific county.
## Queryquery = ( con .filter(pl.col('STCOFIPS') =='01007'))print(query)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)
FILTER [(col("STCOFIPS")) == (String(01007))] FROM
Parquet SCAN [//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test\year=2020janagp\filedate.parquet]
PROJECT */6 COLUMNS
When we print the query it isn’t the results but just a representation of our request to the database.
Now when we print the result it is the table we requested.
Note that this work flow is built on Apache Arrow and can has integrations with all major open source langauges (R, Python, Javascript, Julia and Web Assembly) so this workfow is the foudnation for interoperability within modern data science workflows.
ESRI SAS to Parquet outputs
Lets take a look at what we are starting with. We froze Joe’s original results from //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/joe_simeone_work/tract_parquet into CCUH long term storage for reproducibility - specifically here //files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_deprecating/esri_tract10_parquet.
So we start with 60 parquet files each file for a combination of measure (5 ) and years (12). Each is a parquet file.
Compile (intermediate)
Lets compile them all together - I think duckdb parquet reader is probably the easiest thing.
query = glue::glue(" COPY ( SELECT * FROM read_parquet('{local_context$path_esri_tract10_parquets}/*.parquet', filename = true) ) TO '{local_context$path_esri_tract10_compiled}' (FORMAT 'parquet')")
Excute compilation
if (!file.exists(local_context$path_esri_tract10_compiled)){ con <-dbConnect(duckdb()) result =dbExecute(con, query)} else { ds_esri_results = local_context$path_esri_tract10_compiled %>%open_dataset()}dim(ds_esri_results)
[1] 1583271090 8
This took some time (15 minutes ish) - mostly for writing parquet. The result is a 1.6 billion row table!
Validations
Before we ingest into the CCUH system, lets just do some basic tests - we’ll just do the same things for a few tracts.
Define test tracts
df_tracts_test =tribble(~geoid, ~desc, ~state, ~state_fips,'42101000100', '42=PA, 101=Philadelphia County/City - Center City area', 'PA', '42','42003020100', '42=PA, 003=Allegheny County (Pittsburgh) - Downtown area', 'PA', '42','36061000100', '36=NY, 061=New York County (Manhattan) - Financial District', 'NY', '36')
okay lets pull the data! For a table this big (> 1 billion rows) we recomend just using DuckDB.
Pull data for four tracts
if (!file.exists(local_context$path_cache_qc)) {## Database connection con <-dbConnect(duckdb())## Write query tracts_string =paste(sprintf("'%s'", df_tracts_test$geoid), collapse =", ") state_fip_string =paste(sprintf("'%s'", unique(df_tracts_test$state_fips)), collapse =", ") query = glue::glue(" SELECT GEOID10, MEASURE, DATEVAL, VALUE FROM '{local_context$path_esri_tract10_compiled}' WHERE ST_FIPS IN ({state_fip_string}) AND GEOID10 IN ({tracts_string})") query## Execute df_qc =dbGetQuery(con, query) %>%as_tibble()## Cache df_qc %>% arrow::write_parquet(local_context$path_cache_qc)} else { df_qc = arrow::read_parquet(local_context$path_cache_qc)}
---title: "PRISM Zonal statistics - ESRI platform results"subtitle: "This notebook explores both Joe's parquet conversions as well as direct ArcGISPro Parquet outputs"author: - name: Ran Li (Maintainer) orcid: 0000-0002-4699-4755 - name: Joe Simeone (Context Contributor)date: last-modifiedformat: html: toc: true toc-expand: true self-contained: true code-fold: true df-print: kable code-tools: true comments: hypothesis: theme: cleaneditor: sourceexecute: warning: false message: false eval: trueeditor_options: chunk_output_type: console---```{r}## Dependencieslibrary(pacman) pacman::p_load(tidyverse, duckdb, arrow, purrr, cli, tictoc, glue, reactable)## Python stufflibrary(reticulate)use_condaenv("r-reticulate") ## Local Contextlocal_context =lst( path_prism ='//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1',path_prism_parquet =file.path(path_prism, 'ccuh_prism_zonal_stats_v1.parquet'),path_processing =file.path(path_prism, '_processing'),path_cache_qc =file.path(path_prism, 'cache', 'tract10_joe_subset_for_qc.parquet'),path_esri_tract10_parquets =file.path(path_processing, "esri_tract10_parquet"),path_esri_tract10_compiled =file.path(path_processing, "compiled_esri_tract10_prism_zonal_stats.parquet"))```This notebook ingests the ESRI platform generated PRISM zonal statistics (Steve M. + Natalie R.) which was converted into a parquet data lake (Joe S.). This notebook will compile and do some basic validation before it is ingested into the CCUH system. ## ESRI Parquet OutputsVery exciting that we can stream line this directly from ESRI ArcGISPro which can output paritioned parquet file systems. See this GitHub comment from [Steve Melly](https://github.com/Drexel-UHC/ExtremeWeather_PEDSNET/issues/4#issuecomment-2471677710).> I tried converting January 2020 county tmean directly from file geodatabase table to parquet in ArcGISPro saved to \UHC\Schinasi\_HEAT\PRISM\_Data\Countyaveragesfromtiff90mcell\parquet\_test Can you check and see if there are any problems with this file?Initial examination of the file looks promising it looks like it was partitioned by year. Lets try connecting this file system!! (connect like you would connect to a database). Parquet files allow you to work efficiently either in memory (inmport everything very quickly) or if your data is bigger than RAM available to access or do computations of memory (like a database). It all starts with a file somewhere```{python}#| code-fold: falsepath_esri_parquet ='//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/Countyaveragesfromtiff90mcell/parquet_test'```Below lets do both.### Reading parquetHere we work the traditional way by reading the data into memory. Below is how to do it with pandas.```{python}#| code-fold: falseimport polars as pldf = pl.read_parquet(path_esri_parquet)print(df)```Importantly the above, we are reading the entire dataset into memory. So when we print the result it is the data in memory as a table. Which works here but let's say the data is 500GB - then this would not work. ### Connect to parquetAlternative; we can leverage the big data features of parquet files which allow it work like a database - many modern analytic workflows can replace database just with parquet based file formats - [e.g. Azure's default storage - Delta tables - is a parquet file with some metadata attached as json](https://learn.microsoft.com/en-us/dynamics365/customer-insights/data/connect-delta-lake). Importantly, this is very easy to do without any infrastructure costs and minimal coding changes.For example here we 'connect' to the parquet file and print out.```{python}#| code-fold: false## Connect to parquet like a databasecon = pl.scan_parquet(path_esri_parquet) print(con)print(con.collect_schema())```Now what we see is not a table but rather a database-like connection from which we can do queries or data pulls. Importantly, see how the DATEVALUE is maintained - since all of this data and metadata is stored within the parquet file. Note, this is extremely performant and designed for massive data pulls or computations and are often many scales faster than traditional transactional databases. For example here we do a simple query of filter for a specific county.```{python}#| code-fold: false## Queryquery = ( con .filter(pl.col('STCOFIPS') =='01007'))print(query)```When we print the query it isn't the results but just a representation of our request to the database.Now lets execute or 'collect' our results. ```{python}#| code-fold: false## Execute Queryresult = query.collect()print(result)```Now when we print the result it is the table we requested. Note that this work flow is built on [Apache Arrow](https://arrow.apache.org/) and can has integrations with all major open source langauges (R, Python, Javascript, Julia and Web Assembly) so this workfow is the foudnation for interoperability within modern data science workflows.## ESRI SAS to Parquet outputsLets take a look at what we are starting with. We froze Joe's original results from `//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/joe_simeone_work/tract_parquet` into CCUH long term storage for reproducibility - specifically here `//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_deprecating/esri_tract10_parquet`.```{r}#| code-summary: 'Inventory Joe Simeone Parquet Lake'df_joe =tibble(path =list.files(local_context$path_esri_tract10_parquet, full.names = T)) %>%mutate(file =basename(path),geo_level =str_sub(file, 1, 4),measure = file %>%str_remove_all("ct10|.parquet|\\d{4}"),year = file %>%str_extract_all("\\d{4}") %>%unlist() ) %>%select(file, geo_level, measure, year, path)df_joe %>%reactable(searchable = T)```So we start with 60 parquet files each file for a combination of measure (5 ) and years (12). Each is a parquet file.### Compile (intermediate)Lets compile them all together - I think duckdb parquet reader is probably the easiest thing.```{r}#| code-summary: 'Compilation Query'#| code-fold: falsequery = glue::glue(" COPY ( SELECT * FROM read_parquet('{local_context$path_esri_tract10_parquets}/*.parquet', filename = true) ) TO '{local_context$path_esri_tract10_compiled}' (FORMAT 'parquet')") ``````{r}#| code-summary: 'Excute compilation'if (!file.exists(local_context$path_esri_tract10_compiled)){ con <-dbConnect(duckdb()) result =dbExecute(con, query)} else { ds_esri_results = local_context$path_esri_tract10_compiled %>%open_dataset()}dim(ds_esri_results)```This took some time (15 minutes ish) - mostly for writing parquet. The result is a 1.6 billion row table!### ValidationsBefore we ingest into the CCUH system, lets just do some basic tests - we'll just do the same things for a few tracts.```{r}#| code-summary: 'Define test tracts'df_tracts_test =tribble(~geoid, ~desc, ~state, ~state_fips,'42101000100', '42=PA, 101=Philadelphia County/City - Center City area', 'PA', '42','42003020100', '42=PA, 003=Allegheny County (Pittsburgh) - Downtown area', 'PA', '42','36061000100', '36=NY, 061=New York County (Manhattan) - Financial District', 'NY', '36')```okay lets pull the data! For a table this big (\> 1 billion rows) we recomend just using DuckDB.```{r}#| code-summary: 'Pull data for four tracts'if (!file.exists(local_context$path_cache_qc)) {## Database connection con <-dbConnect(duckdb())## Write query tracts_string =paste(sprintf("'%s'", df_tracts_test$geoid), collapse =", ") state_fip_string =paste(sprintf("'%s'", unique(df_tracts_test$state_fips)), collapse =", ") query = glue::glue(" SELECT GEOID10, MEASURE, DATEVAL, VALUE FROM '{local_context$path_esri_tract10_compiled}' WHERE ST_FIPS IN ({state_fip_string}) AND GEOID10 IN ({tracts_string})") query## Execute df_qc =dbGetQuery(con, query) %>%as_tibble()## Cache df_qc %>% arrow::write_parquet(local_context$path_cache_qc)} else { df_qc = arrow::read_parquet(local_context$path_cache_qc)}```#### Test unique composite keys```{r}#| code-summary: "Check composite key integrity for test tracts"df_invalid = df_qc %>%add_count(GEOID10, MEASURE, DATEVAL) %>%filter(n>1)if (nrow(df_invalid) >0) cli_abort("Composite key integrity failed")```#### Visual inspection```{r}#| code-summary: 'Visual inspection 1'## Subsetrow = df_tracts_test %>%slice(1)df = df_qc %>%filter(GEOID10 == row$geoid)## Plotdf %>%ggplot(aes(x = DATEVAL, y = VALUE)) +geom_line() +facet_wrap(~MEASURE, scales ="free_y") +labs(title =glue("Visual inspection 1: Looks okay"),subtitle =glue(" {row$geoid} ({row$desc})"))``````{r}#| code-summary: 'Visual inspection 2'## Subsetrow = df_tracts_test %>%slice(2)df = df_qc %>%filter(GEOID10 == row$geoid)## Plotdf %>%ggplot(aes(x = DATEVAL, y = VALUE)) +geom_line() +facet_wrap(~MEASURE, scales ="free_y") +labs(title =glue("Visual inspection 2: Looks okay"),subtitle =glue(" {row$geoid} ({row$desc})"))``````{r}#| code-summary: 'Visual inspection 3'## Subsetrow = df_tracts_test %>%slice(3)df1 = df_qc %>%filter(GEOID10 == row$geoid)## Plotdf %>%ggplot(aes(x = DATEVAL, y = VALUE)) +geom_line() +facet_wrap(~MEASURE, scales ="free_y") +labs(title =glue("Visual inspection 3: Looks okay"),subtitle =glue(" {row$geoid} ({row$desc})"))```Okay looks to me. Ready for ingestion.