The second component - the zonal boundaries parameter space - is operationalize in each individual processing notebook - dependent on what geographic boundaries the notebook is intended to calculate zonal statistics for all counties in USA with 2010 census county delinations. Lets first operatiaonlize albers reprojection to do the zonal statistics from.
Thats our template for processing in this geographic scope.
1.3 Remove already done
Before we start processing lets just make sure that the results are not already available. Lets first get whats in the local cache (we assume the local cache reflects the server results). Then lets update our template subset to those that are not processed yet.
Curate local results
lets first inventory local results and make sure there ar eno small file sizes < 1MB that sometimes occur if a processing was interupted.
Code
## Initial local resultsdf_local_results = template_raw |>mutate(out_exists =file.exists(out_local_file) ) %>%select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>%filter(out_exists) %>%mutate(size =file.size(out_local_file)/10^6)## Remove files less than 0.1 MBdf_local_results %>%filter(size <0.1) %>%pull(out_local) %>%walk(~{unlink(.x, recursive =TRUE, force =TRUE) })## Refresh local resultsdf_local_results = template_raw |>mutate(out_local_file =file.path(out_local, 'data.parquet')) %>%mutate(out_exists =file.exists(out_local_file) ) %>%select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>%filter(out_exists) %>%mutate(size =file.size(out_local_file)/10^6)## Distirbution of file sizedf_local_results %>%ggplot(aes(x = measure, y = size)) +geom_boxplot() +labs(title ='Distribution of individual day-conus-measure parquet file',subtitle ='Most* individual parquet files are ~ 1MB',caption ='*ppt data is smaller because many 0 values',y ='Size (MB)' )
Looks good. we remove occasional placeholder parquet files (<0.1MB) that occur when write processess are interupted.
Now lets filter any already processed results to determine what we still need to do.
So we estimate that the whole processing for all counties, all years, all measures would take around 1 day to process all tract measures. completely.
But we only need to process undone results which is around half of that so 3 days for tract10 processing of new data. Start today so ETA will be Friday?
Here we have several two formats the v1 processinga nd the v2 (robust to error) processing results. Lets compile these into a single dataset.
ESRI
Lets first standardize then load our ESRI results into duckdb.
```{r}## Add state crosswalk to databasedbExecute(con_prod, " CREATE TABLE xwalk_state AS SELECT state_abb as state, state_census_geoid as state_geoid FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/xwalk_state_v1/xwalk_state_v1.parquet' ")dbGetQuery(con_prod,"SELECT * FROM xwalk_state")## Load ESRI results into databasedbGetQuery(con_prod, "SHOW TABLES")dbGetQuery(con_prod,"SELECT * FROM xwalk_state")dfa <- dbGetQuery(con_prod,glue(" SELECT GEOID10 AS geoid, DATEVAL AS date, 'tract10' AS geo, x.state as state, YEAR as year, MEASURE AS measure, VALUE AS value FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid LIMIT 1"))dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_esri")dbExecute(con_prod, " CREATE TABLE tract10_results_esri AS SELECT GEOID10 AS geoid, DATEVAL AS date, 'tract10' AS geo, x.state as state, YEAR as year, MEASURE AS measure, VALUE AS value FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid")```
Preview
Code
df1 <-dbGetQuery(con_prod,glue(" SELECT * FROM tract10_results_esri LIMIT 10 ")) %>%as_tibble()df1
geoid
date
geo
state
year
measure
value
01121010900
2008-07-27
tract10
AL
2008
ppt
0.0143709
01121010700
2008-07-27
tract10
AL
2008
ppt
0.3526481
01121011100
2008-07-27
tract10
AL
2008
ppt
12.9364470
01121011900
2008-07-27
tract10
AL
2008
ppt
9.2121465
01073011904
2008-07-27
tract10
AL
2008
ppt
23.4812120
01073012703
2008-07-27
tract10
AL
2008
ppt
29.6871533
01073011301
2008-07-27
tract10
AL
2008
ppt
19.3599464
01073014410
2008-07-27
tract10
AL
2008
ppt
17.7761800
01073012912
2008-07-27
tract10
AL
2008
ppt
22.3617439
01073013400
2008-07-27
tract10
AL
2008
ppt
17.6457378
Looks good. But the whole thing from ESRI is a staggering 1.6 billion rows!
xwalk_tract10_land
Also for this particular tract10 dataset we want to subset to tracts that have more than Land_area > 0 (which is the input for the ESRI processing). Lets get a list of all GEOID in the ESRI (tracts with Land_area > 0)
Operationalize xwalk_tract10_land
if (!'xwalk_tract10_land'%in%dbGetQuery(con_prod, "SHOW TABLES")$name){## Get tracts with land df_tract10_w_land =dbGetQuery(con_prod,glue(" SELECT DISTINCT geoid FROM tract10_results_esri ")) %>%as_tibble()## Opertaionlize all tracts + land crosswalk api <-"//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/tiger10_boundaries__v1.1/tiger_us_boundaries_v1.parquet"%>%open_dataset() xwalk_tract10_land = api %>%filter(vintage =='tract10') %>%select(geoid) %>%distinct() %>%collect() %>%mutate(has_land = geoid %in% df_tract10_w_land$geoid) %>%arrange(has_land) xwalk_tract10_land %>% arrow::write_parquet("cache/xwalk_tract10_land.parquet")## Add to databasedbExecute(con_prod, " CREATE TABLE xwalk_tract10_land AS SELECT * FROM 'cache/xwalk_tract10_land.parquet' ")}
Okay we added xwalk_tract10_land to our database. Lets preview that
Code
dbGetQuery(con_prod, " SELECT * FROM xwalk_tract10_land LIMIT 10")
geoid
has_land
01003990000
FALSE
01097990000
FALSE
02013000100
FALSE
02016000200
FALSE
02016000100
FALSE
02240000100
FALSE
02240000400
FALSE
02270000100
FALSE
02070000200
FALSE
02070000100
FALSE
Okay. looks good.
ExactExtract
Now lets bring in out ExactExtract results, link to has_land and filter for those tracts that only have land.
```{r}dfa = dbGetQuery(con_prod, " SELECT exact_results.geoid, exact_results.date, exact_results.geo_level as geo, exact_results.state_abb as state, exact_results.year, exact_results.measure, exact_results.value, xwalk.has_land FROM read_parquet( 'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet', hive_partitioning = true ) exact_results LEFT JOIN xwalk_tract10_land xwalk ON exact_results.geoid = xwalk.geoid WHERE xwalk.has_land = TRUE LIMIT 1")dbGetQuery(con_prod, "SHOW TABLES")dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_exact")dbExecute(con_prod, " CREATE TABLE tract10_results_exact AS SELECT exact_results.geoid, exact_results.date, exact_results.geo_level as geo, exact_results.state_abb as state, exact_results.year, exact_results.measure, exact_results.value, xwalk.has_land FROM read_parquet( 'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet', hive_partitioning = true ) exact_results LEFT JOIN xwalk_tract10_land xwalk ON exact_results.geoid = xwalk.geoid WHERE xwalk.has_land = TRUE")```
ExactExtract Validations
Let’s do two validations: 1) Verify no water only tracts 2) check that 1/20/2023 is in the data - prior to systematic checks
Verify no water only tracts
## Get xwalk_tract10_landxwalk_tract10_land =dbGetQuery(con_prod,glue(" SELECT * FROM xwalk_tract10_land ")) %>%as_tibble()## Get all exactextract tractsdf_tracts_exact =dbGetQuery(con_prod,glue(" SELECT DISTINCT geoid, has_land FROM tract10_results_exact ")) %>%as_tibble()df_tracts_exact %>%verify(all(has_land) == T) %>%count(has_land, name ='n_tract10')
has_land
n_tract10
TRUE
72246
looks good its consistent with ESRI results (72,246 tracts all with land). Now let’s do a check for the missing date - 2023 was missing a single day 1/20/2023.
Verify 2023-01-20 data exists
## Get all data for 1/20/2023df_data_2023 =dbGetQuery(con_prod, glue(" SELECT * FROM tract10_results_exact WHERE date = '2023-01-20'")) %>%as_tibble()df_data_2023 %>%count(date, measure, name ='n_tracts')
date
measure
n_tracts
2023-01-20
ppt
72246
2023-01-20
tdmean
72246
2023-01-20
tmax
72246
2023-01-20
tmean
72246
2023-01-20
tmin
72246
2023-01-20
vpdmax
72246
2023-01-20
vpdmin
72246
It seems like 1/20/23 data exists. Looks okay for now. Lets do a comprehensive completeness test later on in section 5.
```{r}dbGetQuery(con_prod, "SHOW TABLES")dbExecute(con_prod, "DROP TABLE IF EXISTS tract10")dbExecute(con_prod, " CREATE TABLE tract10 AS ( (SELECT * FROM tract10_results_esri) UNION ALL (SELECT * EXCLUDE (has_land) FROM tract10_results_exact) )")```
Preview
Code
dfa =dbGetQuery(con_prod, "SELECT * FROM tract10 LIMIT 1000") %>%as_tibble()
EDA
```{r}#| code-summary: 'EDA'#| eval: false## Queryqc_query = glue(" SELECT * FROM tract10 WHERE state = 'DC' AND year = 2010;") df_qc <- dbGetQuery(con_prod, qc_query) |> as_tibble()## EDAdf_qc |> ggplot(aes(x = value)) + geom_histogram(bins = 50, fill = "steelblue", color = "white") + facet_wrap(~measure, scales = "free") + # Create separate plots for each measure theme_minimal() + labs( title = "Distribution of Values by Measure", x = "Value", y = "Count" ) + theme( plot.title = element_text(hjust = 0.5, size = 14), strip.text = element_text(size = 10, face = "bold"), axis.title = element_text(size = 10) )```
5. Validation
Let’s take a look at our results.
Completeness
Check for completeness of number of counties, measures, dates -right number of rows.
## Querydf_qc2 =dbGetQuery( con_prod,"SELECT geoid, measure, COUNT(DISTINCT date) as n_dates, COUNT(*) as n_rows FROM tract10 WHERE state IN ('PA','FL') GROUP BY geoid, measure ORDER BY geoid, measure") %>%as_tibble() ## Validationsdf_valid = df_qc2 %>%verify(n_dates ==6939) %>%verify(n_rows == n_dates)## Preview subsetdf_valid %>%sample_n(5)
geoid
measure
n_dates
n_rows
42133010510
tdmean
6939
6939
12086001701
tmin
6939
6939
42101980900
tdmean
6939
6939
12099005926
vpdmax
6939
6939
12099005942
tmean
6939
6939
We do two checks here for a subset of the data (PA and FL):
We validated the completeness of the data by verifying that each tract10/measure combinations should have 6939 days (all days in 200)
We validate uniquness of the data by verifying each tract10/measure combination should have 6939 rows (all tract10/measure/days only have one row/data-point)
Missing values
Now lets check there are no missing values.
df_qc3 =dbGetQuery( con_prod,"SELECT * FROM tract10 WHERE ISNAN(value)") %>%as_tibble() ## Validatevalid = df_qc3 %>%verify(nrow(.) ==0)
No missing values!
Garbage values
There are some garbage coded values for when processing throws an error. Lets check the distribution of that.
df_qc4 =dbGetQuery( con_prod,"SELECT * FROM tract10 WHERE value > 1000") %>%as_tibble() ## Validatevalid = df_qc4 %>%verify(nrow(.) ==0)
Lets plot a few things to make sure things look okay focusing on 4 tracts.
Get data for 4 counties
df_tracts_fips =tribble(~city, ~name, ~geoid,"Philadelphia", "Center City tract", "42101000100", "Miami", "Tract in Miami", "12086000100", ) |>mutate(i =row_number())
Inspect: overlap of tracts in Philadelphia
## Queryrow = df_tracts_fips |>slice(1)dfa =dbGetQuery(con_prod, glue(" SELECT * FROM tract10 WHERE state IN ('PA') AND geoid = 42101000100")) |>as_tibble()## Plotinspect_tract_data(dfa)
Inspect: overlap of tracts in Miami
## Queryrow = df_tracts_fips |>slice(2)dfb =dbGetQuery(con_prod, glue(" SELECT * FROM tract10 WHERE state IN ('FL') AND geoid = 12011030703")) |>as_tibble()## Plotinspect_tract_data(dfb)
Export
Now that things look good let’s expert to the server so everyone can use this.
Code
if (!file.exists('cache_sensitive/clean/tract10.parquet')) {dbExecute(con_prod, glue_sql(" COPY ( SELECT * FROM tract10 ) TO 'cache_sensitive/clean/tract10.parquet' (FORMAT 'parquet') ", .con = con_prod))}
```{r}library(duckdb)library(glue)# Create connection and read parquet datasetcon <- dbConnect(duckdb())# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet"# Filter the dataquery <- glue_sql(" SELECT * FROM {`prism_path`} WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020", .con = con)# Execute querydf <- dbGetQuery(con, query)# Print resultshead(df)# Close connectiondbDisconnect(con, shutdown = TRUE)```
---title: "PRISM zonal statistics - tract10"author: - name: Ran Li (Maintainer) orcid: 0000-0002-4699-4755date: last-modifiedformat: html: toc: true self-contained: true code-fold: true df-print: kable code-tools: true comments: hypothesis: theme: cleaneditor: sourceexecute: warning: false message: false eval: trueeditor_options: chunk_output_type: console---```{r}#| code-summary: 'Setup'#| code-fold: true## Dependenciespacman::p_load( tidyverse, assertr, arrow, tictoc, furrr, carrier, glue, exactextractr, terra, sfarrow, leaflet, sf, haven, ggpubr, cli, duckdb, reactable, fs )## Local Contextsource("PRISM - 0 - global context.R")context =c( global_context,lst(geo_level ='tract10',engine ='exactextract',path_prism ='//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1',path_prism_parquet =file.path(path_prism, 'ccuh_prism_zonal_stats_v1.parquet'),path_processing =file.path(path_prism, '_processing'),path_esri_tract10_compiled =file.path(path_processing, "compiled_esri_tract10_prism_zonal_stats.parquet"),path_server_tract10 =file.path( global_context$path_server_clean, "tract10.parquet") ))## Duckdbcon =dbConnect(duckdb::duckdb())con_prod <-dbConnect(duckdb::duckdb(), dbdir ="cache_sensitive/prism_zonal_stats_v1")```This notebook focuses on calculating spatial statistics for All USA tract10 boundaries with PRISM data.# 1. TemplatesPRISM processing scans across a parameters space between available PRISM inventory (measure, year, day, paths) and the zones (geo_level, state).## 1.1 RasterThe first component - the raster data parameter space - is operationalize in the [PRISM inventory notebook](https://drexelccuh.quarto.pub/prism---1---inventory/).```{r}#| code-summary: 'PRISM Raster template'df_prism_inventory = context$path_cache_df_prism_inventory |> arrow::read_parquet() |>mutate(r_path_bil =file.path('//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data', measure, year,str_replace(r_base_name, '.tif','.bil') ))df_prism_inventory |>utility__summarize_prism_inventory() ```## 1.2 ZoneThe second component - the zonal boundaries parameter space - is operationalize in each individual processing notebook - dependent on what geographic boundaries the notebook is intended to calculate zonal statistics for all counties in USA with 2010 census county delinations. Lets first operatiaonlize albers reprojection to do the zonal statistics from.```{r}#| code-summary: "Get the zones"df_zones =tibble(z_path_cache =list.files(context$path_server_zone, recursive =TRUE,full.names =TRUE,pattern =glue('{context$geo_level}_albers.shp'))) |>mutate(state = stringr::str_extract(basename(z_path_cache), "_([A-Z]+)_")) |># Remove the underscoresmutate(state = stringr::str_remove_all(state, "_")) |>filter(state =='CONUS') |>filter(!str_detect(z_path_cache,'.sr.lock'))head(df_zones)```We just append to the PRISM inventory other aspects:- state: the state of the boundary- geo_level: the geographic level of the boundary- z_path_cache: the path to the boundary shapefile- template_key: a unique key for the template rows (just row_number())```{r}#| code-summary: 'Add Zonal parameter space'## Add zonal parameter spacetemplate_raw = df_prism_inventory |>mutate(state =list(df_zones$state) ) |>unnest(state) |>left_join(df_zones, by ="state") |>mutate(geo_level = context$geo_level,engine = context$engine,template_key =row_number(),out_partition = base::file.path( glue::glue("engine={engine}"), glue::glue("geo_level={geo_level}"), glue::glue("measure={measure}"), glue::glue("state={state}"), glue::glue("year={year}") , glue::glue("month={month}") , glue::glue("day={day}") ),out_local = base::file.path("cache_sensitive",'hdfs_daily', out_partition ),out_local_file =file.path(out_local, 'data.parquet'),out_server = base::file.path( context$path_server_hdfs_daily, out_partition ) )## Previewglimpse(template_raw)```Thats our template for processing in this geographic scope.## 1.3 Remove already doneBefore we start processing lets just make sure that the results are not already available. Lets first get whats in the local cache (we assume the local cache reflects the server results). Then lets update our template subset to those that are not processed yet.### Curate local resultslets first inventory local results and make sure there ar eno small file sizes < 1MB that sometimes occur if a processing was interupted.```{r}## Initial local resultsdf_local_results = template_raw |>mutate(out_exists =file.exists(out_local_file) ) %>%select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>%filter(out_exists) %>%mutate(size =file.size(out_local_file)/10^6)## Remove files less than 0.1 MBdf_local_results %>%filter(size <0.1) %>%pull(out_local) %>%walk(~{unlink(.x, recursive =TRUE, force =TRUE) })## Refresh local resultsdf_local_results = template_raw |>mutate(out_local_file =file.path(out_local, 'data.parquet')) %>%mutate(out_exists =file.exists(out_local_file) ) %>%select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>%filter(out_exists) %>%mutate(size =file.size(out_local_file)/10^6)## Distirbution of file sizedf_local_results %>%ggplot(aes(x = measure, y = size)) +geom_boxplot() +labs(title ='Distribution of individual day-conus-measure parquet file',subtitle ='Most* individual parquet files are ~ 1MB',caption ='*ppt data is smaller because many 0 values',y ='Size (MB)' )```Looks good. we remove occasional placeholder parquet files (<0.1MB) that occur when write processess are interupted.Now lets filter any already processed results to determine what we still need to do.```{r}#| code-summary: 'Check what is already processed and remove'## Remove partitioned resultstic()template_remove_partitioned_results = template_raw |>mutate(out_exists =file.exists(out_local_file) ) %>%filter(!out_exists)toc()## Filter out ESRI Result combinationsif (!file.exists("cache/df_tract10_esri_date_measure.parquet")){ df_tract10_esri_date_measure <-dbGetQuery(con_prod,glue(" SELECT DISTINCT MEASURE as measure, DATEVAL as date FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' ")) df_tract10_esri_date_measure %>%write_parquet("cache/df_tract10_esri_date_measure.parquet")} else { df_tract10_esri_date_measure =read_parquet("cache/df_tract10_esri_date_measure.parquet")}template = template_remove_partitioned_results |>anti_join(df_tract10_esri_date_measure)## Summary if (nrow(template) ==0) { cli::cli_alert_info("All done!")} else { template |>utility__summarize_prism_inventory()}```# 2. Processing functionThis is an engine agnostic template for how this funciton is layed out. Note its a [crated function](https://github.com/r-lib/carrier) to enable parallel computing.```{r}#| code-summary: 'Processing function'geo_level_tmp = context$geo_levelmeasure_tmp ='tmax'state_tmp ='CONUS'year_tmp ='2020'month_tmp ='1'day_tmp ='12'datetxt_tmp ='20200112'dev = Fengine = context$enginecrate_calc_prism_zonal_exactextract =crate(template_raw = template_raw,template = template,function(geo_level_tmp, measure_tmp, state_tmp, year_tmp, month_tmp, day_tmp, datetxt_tmp,dev = F, engine ='exactextract'){ { ###### Setup ###### path_cache_folder ='cache_sensitive/hdfs_daily'if (dev == T) path_cache_folder ='cache_sensitive/result_dev' out_dir = base::file.path( path_cache_folder, glue::glue("engine={engine}"), glue::glue("geo_level={geo_level_tmp}"), glue::glue("measure={measure_tmp}"), glue::glue("state={state_tmp}"), glue::glue("year={year_tmp}") , glue::glue("month={month_tmp}") , glue::glue("day={day_tmp}") )|>as.character() cli::cli_alert_info("Started: {measure_tmp} {state_tmp} {year_tmp} {geo_level_tmp} {datetxt_tmp}") template_tmp = templateif (dev == T) template_tmp = template_raw row = template_tmp |> dplyr::filter( geo_level == geo_level_tmp, measure == measure_tmp, year == year_tmp, state == state_tmp, datetxt == datetxt_tmp ) out_file =file.path(out_dir,'data.parquet') } { ###### Input Validations ######if (file.exists(out_file)){ cli::cli_alert_info("Already processed: {geo_level_tmp} {measure_tmp} {state_tmp} {year_tmp} {datetxt_tmp}")return("Already processed") }## Iterate over state,yearif(nrow(row) ==0){ cli::cli_alert_info("No data: {geo_level_tmp} {measure_tmp} {state_tmp} {year_tmp} {datetxt_tmp}")return("No template rows found") }if (dev == T) print(row) } { ###### Computation ######## Imports r = terra::rast(x = row$r_path_cache) z = sf::st_read(row$z_path_cache, quiet = T)## Try .tif raster calculation value_results <-tryCatch({ exactextractr::exact_extract(r, z, 'mean', progress =FALSE) }, error =function(e) {# If there's an error, try the .bil raster r_bil <- terra::rast(x = row$r_path_bil) exactextractr::exact_extract(r_bil, z, 'mean', progress =FALSE) })## If .tif results have NA values then use .bil raster for calculationif (any(is.nan(value_results))){ r = terra::rast(x = row$r_path_bil) value_results = exactextractr::exact_extract( r, z, 'mean', progress =FALSE ) }## Produce results df_results_state_year = terra::as.data.frame(z) |> dplyr::select(state_abb, geoid) |> dplyr::mutate(date = row$date,value = value_results ) |> dplyr::select(geoid, date, state_abb, value) if (dev == T) print("DONE!!!!") } { ###### Exports ######if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T) df_results_state_year |> arrow::write_parquet(out_file) cli::cli_alert_success("Finished: {measure_tmp} {state_tmp} {geo_level_tmp} {datetxt_tmp}")return() } })rm(geo_level_tmp, measure_tmp, state_tmp, month_tmp, day_tmp, datetxt_tmp, year_tmp, dev, engine)```# 3. Parallel Calls## DevDo a development run to make sure parameters and structure is correct.```{r}#| code-summary: 'Dev checks for structure'#| eval: false## Input templateinput_dev = template |> filter(!server) |> select( geo_level, measure, state, year, month, day, datetxt) |> distinct() |> mutate(dev = T) |> group_by(measure) |> sample_n(2) |> ungroup()## Configure parallelplan(multisession, workers = 5)opts <- furrr_options(globals = FALSE, seed = TRUE)## Parallel runtic()furrr::future_pwalk( input_dev, crate_calc_prism_zonal_exactextract, .options = opts)toc()```Runs well!Quick look at results```{r}ds ="cache_sensitive/result_dev/"|> arrow::open_dataset() |>filter( engine == context$engine, geo_level == context$geo_level ) dfa = ds |>select(measure, geo_level, geoid, date, value) |>collect()```Lets do a calcualtion to see how long it would take to process all years, measures for the CONUS zones. In the dev run above we took:- work: 14 measure-CONUS-day processing units- time: 150 seconds$$\begin{align*}\frac{150 \text{ seconds}}{14 \text{ day-measure-CONUS}} =\frac{10.71 \text{ seconds}}{1 \text{ day-measure-CONUS}} =\frac{0.17857 \text{ minutes}}{1 \text{ day-measure-CONUS}} = \frac{0.0029762 \text{ hours}}{1 \text{ day-measure-CONUS}}\end{align*}$$Let see how many day-meausure-CONUS combinations we have```{r}template_raw |>count(date, measure, state) |>reactable()```There are 48,573 day-measure combinations.$$\begin{align*}\frac{0.0029762 \text{ hours}}{1 \text{ day-measure-CONUS}}\times {48,573 \text{ day-measure-CONUS}} = \mathbf{\underline{144 \text{ hours}}}= \mathbf{\underline{6 \text{ days}}}\end{align*}$$So we estimate that the whole processing for all counties, all years, all measures would take around 1 day to process all tract measures. completely.But we only need to process undone results which is around half of that so 3 days for tract10 processing of new data. Start today so ETA will be Friday?## Full- 150 rows for 30 minutes approximately- 300 rows for 1 hr- 1800 rows for 6 hr- 3600 for 12 hours- 3600 for 12 hours- 5400 for 30 hours- 7200 for 24 hours```{r}#| code-summary: 'Full Run'#| eval: false## Input templateinput = template |> filter(!file.exists(out_local)) |> select( geo_level, measure, state, year, month, day, datetxt) |> distinct() %>% slice(1:3600)## Configure parallelplan(multisession, workers = 5)opts <- furrr_options(globals = FALSE, seed = TRUE)## Parallel runtic()furrr::future_pwalk( input, crate_calc_prism_zonal_exactextract, .options = opts)toc()```# 4. Production## ImportHere we have several two formats the v1 processinga nd the v2 (robust to error) processing results. Lets compile these into a single dataset.#### ESRI Lets first standardize then load our ESRI results into duckdb.```{r}## Add state crosswalk to databasedbExecute(con_prod, " CREATE TABLE xwalk_state AS SELECT state_abb as state, state_census_geoid as state_geoid FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/xwalk_state_v1/xwalk_state_v1.parquet' ")dbGetQuery(con_prod,"SELECT * FROM xwalk_state")## Load ESRI results into databasedbGetQuery(con_prod, "SHOW TABLES")dbGetQuery(con_prod,"SELECT * FROM xwalk_state")dfa <- dbGetQuery(con_prod,glue(" SELECT GEOID10 AS geoid, DATEVAL AS date, 'tract10' AS geo, x.state as state, YEAR as year, MEASURE AS measure, VALUE AS value FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid LIMIT 1"))dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_esri")dbExecute(con_prod, " CREATE TABLE tract10_results_esri AS SELECT GEOID10 AS geoid, DATEVAL AS date, 'tract10' AS geo, x.state as state, YEAR as year, MEASURE AS measure, VALUE AS value FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid")```Preview```{r}df1 <-dbGetQuery(con_prod,glue(" SELECT * FROM tract10_results_esri LIMIT 10 ")) %>%as_tibble()df1```Looks good. But the whole thing from ESRI is a staggering 1.6 billion rows!### xwalk_tract10_landAlso for this particular tract10 dataset we want to subset to tracts that have more than Land_area > 0 (which is the input for the ESRI processing). Lets get a list of all GEOID in the ESRI (tracts with Land_area > 0)```{r}#| code-summary: "Operationalize xwalk_tract10_land"if (!'xwalk_tract10_land'%in%dbGetQuery(con_prod, "SHOW TABLES")$name){## Get tracts with land df_tract10_w_land =dbGetQuery(con_prod,glue(" SELECT DISTINCT geoid FROM tract10_results_esri ")) %>%as_tibble()## Opertaionlize all tracts + land crosswalk api <-"//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/tiger10_boundaries__v1.1/tiger_us_boundaries_v1.parquet"%>%open_dataset() xwalk_tract10_land = api %>%filter(vintage =='tract10') %>%select(geoid) %>%distinct() %>%collect() %>%mutate(has_land = geoid %in% df_tract10_w_land$geoid) %>%arrange(has_land) xwalk_tract10_land %>% arrow::write_parquet("cache/xwalk_tract10_land.parquet")## Add to databasedbExecute(con_prod, " CREATE TABLE xwalk_tract10_land AS SELECT * FROM 'cache/xwalk_tract10_land.parquet' ")}```Okay we added `xwalk_tract10_land` to our database. Lets preview that```{r}dbGetQuery(con_prod, " SELECT * FROM xwalk_tract10_land LIMIT 10")```Okay. looks good. ### ExactExtract Now lets bring in out ExactExtract results, link to has_land and filter for those tracts that only have land. ```{r}dfa = dbGetQuery(con_prod, " SELECT exact_results.geoid, exact_results.date, exact_results.geo_level as geo, exact_results.state_abb as state, exact_results.year, exact_results.measure, exact_results.value, xwalk.has_land FROM read_parquet( 'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet', hive_partitioning = true ) exact_results LEFT JOIN xwalk_tract10_land xwalk ON exact_results.geoid = xwalk.geoid WHERE xwalk.has_land = TRUE LIMIT 1")dbGetQuery(con_prod, "SHOW TABLES")dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_exact")dbExecute(con_prod, " CREATE TABLE tract10_results_exact AS SELECT exact_results.geoid, exact_results.date, exact_results.geo_level as geo, exact_results.state_abb as state, exact_results.year, exact_results.measure, exact_results.value, xwalk.has_land FROM read_parquet( 'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet', hive_partitioning = true ) exact_results LEFT JOIN xwalk_tract10_land xwalk ON exact_results.geoid = xwalk.geoid WHERE xwalk.has_land = TRUE")```### ExactExtract ValidationsLet's do two validations: 1) Verify no water only tracts 2) check that 1/20/2023 is in the data - prior to systematic checks```{r}#| code-summary: "Verify no water only tracts"## Get xwalk_tract10_landxwalk_tract10_land =dbGetQuery(con_prod,glue(" SELECT * FROM xwalk_tract10_land ")) %>%as_tibble()## Get all exactextract tractsdf_tracts_exact =dbGetQuery(con_prod,glue(" SELECT DISTINCT geoid, has_land FROM tract10_results_exact ")) %>%as_tibble()df_tracts_exact %>%verify(all(has_land) == T) %>%count(has_land, name ='n_tract10')```looks good its consistent with ESRI results (72,246 tracts all with land). Now let's do a check for the missing date - 2023 was missing a single day 1/20/2023.```{r}#| code-summary: "Verify 2023-01-20 data exists"## Get all data for 1/20/2023df_data_2023 =dbGetQuery(con_prod, glue(" SELECT * FROM tract10_results_exact WHERE date = '2023-01-20'")) %>%as_tibble()df_data_2023 %>%count(date, measure, name ='n_tracts')```It seems like 1/20/23 data exists. Looks okay for now. Lets do a comprehensive completeness test later on in section 5.Preview```{r}df2 =dbGetQuery( con_prod, " SELECT * FROM tract10_results_exact LIMIT 1000 ") %>%as_tibble()# # QC# dfa = df2 %>% # add_count(measure, geoid, date) %>% # arrange(measure,geoid, date)# dfa %>% filter(n>1) %>% distinct()head(df2) %>%reactable()```- 1,932,148,804 rows ## CompileNow lets compile into a single parquet file.```{r}dbGetQuery(con_prod, "SHOW TABLES")dbExecute(con_prod, "DROP TABLE IF EXISTS tract10")dbExecute(con_prod, " CREATE TABLE tract10 AS ( (SELECT * FROM tract10_results_esri) UNION ALL (SELECT * EXCLUDE (has_land) FROM tract10_results_exact) )")```Preview```{r}dfa =dbGetQuery(con_prod, "SELECT * FROM tract10 LIMIT 1000") %>%as_tibble()```## EDA```{r}#| code-summary: 'EDA'#| eval: false## Queryqc_query = glue(" SELECT * FROM tract10 WHERE state = 'DC' AND year = 2010;") df_qc <- dbGetQuery(con_prod, qc_query) |> as_tibble()## EDAdf_qc |> ggplot(aes(x = value)) + geom_histogram(bins = 50, fill = "steelblue", color = "white") + facet_wrap(~measure, scales = "free") + # Create separate plots for each measure theme_minimal() + labs( title = "Distribution of Values by Measure", x = "Value", y = "Count" ) + theme( plot.title = element_text(hjust = 0.5, size = 14), strip.text = element_text(size = 10, face = "bold"), axis.title = element_text(size = 10) )```# 5. Validation Let's take a look at our results.## Completeness Check for completeness of number of counties, measures, dates -right number of rows.```{r}#| code-summary: "Check for completeness"#| code-fold: false## Querydf_qc2 =dbGetQuery( con_prod,"SELECT geoid, measure, COUNT(DISTINCT date) as n_dates, COUNT(*) as n_rows FROM tract10 WHERE state IN ('PA','FL') GROUP BY geoid, measure ORDER BY geoid, measure") %>%as_tibble() ## Validationsdf_valid = df_qc2 %>%verify(n_dates ==6939) %>%verify(n_rows == n_dates)## Preview subsetdf_valid %>%sample_n(5)```We do two checks here for a subset of the data (PA and FL):- We validated the completeness of the data by verifying that each tract10/measure combinations should have 6939 days (all days in 200)- We validate uniquness of the data by verifying each tract10/measure combination should have 6939 rows (all tract10/measure/days only have one row/data-point)## Missing valuesNow lets check there are no missing values.```{r}#| code-fold: falsedf_qc3 =dbGetQuery( con_prod,"SELECT * FROM tract10 WHERE ISNAN(value)") %>%as_tibble() ## Validatevalid = df_qc3 %>%verify(nrow(.) ==0)```No missing values!## Garbage valuesThere are some garbage coded values for when processing throws an error. Lets check the distribution of that. ```{r}#| code-fold: falsedf_qc4 =dbGetQuery( con_prod,"SELECT * FROM tract10 WHERE value > 1000") %>%as_tibble() ## Validatevalid = df_qc4 %>%verify(nrow(.) ==0)```No garbage values!## Visual inspection```{r}#| code-summary: "Define visual inspection function"#| code-fold: true inspect_tract_data =function(df1){ df1 |>group_by(date, measure) |>summarise(mean_value =mean(value, na.rm =TRUE),.groups ="drop") |>mutate(year =year(date)) |>ggplot(aes(x = date, y = mean_value, color = measure)) +geom_line() +facet_wrap(~ measure, scales ="free_y", ncol =2) +theme_minimal() +labs(title =glue("Visual inspection {row$i}: Looks okay"),subtitle =glue(" {row$geoid} ({row$city} - {row$name})"),x ="Date",y ="Value",color ="Measure") +theme(legend.position ="bottom",strip.text =element_text(size =10, face ="bold"),axis.text.x =element_text(angle =45, hjust =1),panel.spacing =unit(1, "lines")) +scale_x_date(date_breaks ="2 years", date_labels ="%Y") +scale_color_viridis_d(end =0.9)}```Lets plot a few things to make sure things look okay focusing on 4 tracts.```{r}#| code-summary: 'Get data for 4 counties'df_tracts_fips =tribble(~city, ~name, ~geoid,"Philadelphia", "Center City tract", "42101000100", "Miami", "Tract in Miami", "12086000100", ) |>mutate(i =row_number())``````{r}#| code-summary: 'Inspect: overlap of tracts in Philadelphia'## Queryrow = df_tracts_fips |>slice(1)dfa =dbGetQuery(con_prod, glue(" SELECT * FROM tract10 WHERE state IN ('PA') AND geoid = 42101000100")) |>as_tibble()## Plotinspect_tract_data(dfa)``````{r}#| code-summary: 'Inspect: overlap of tracts in Miami'## Queryrow = df_tracts_fips |>slice(2)dfb =dbGetQuery(con_prod, glue(" SELECT * FROM tract10 WHERE state IN ('FL') AND geoid = 12011030703")) |>as_tibble()## Plotinspect_tract_data(dfb)```## ExportNow that things look good let's expert to the server so everyone can use this. ```{r}if (!file.exists('cache_sensitive/clean/tract10.parquet')) {dbExecute(con_prod, glue_sql(" COPY ( SELECT * FROM tract10 ) TO 'cache_sensitive/clean/tract10.parquet' (FORMAT 'parquet') ", .con = con_prod))}```This local result was copied over to the server!# 6. Access## Data::: {.panel-tabset}## R ```{r}library(arrow)library(tidyverse)ds_prism = "`r context$path_server_tract10`" |> arrow::open_dataset() df = ds_prism |> filter( state == 'PA', measure == 'tmean', year == 2020 ) |>collect()```## Python```{python}import polars as plds_prism = pl.scan_parquet("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet")df = (ds_prism .filter( (pl.col("state") == "PA") & (pl.col("measure") == "tmean") & (pl.col("year") == 2020) ) .collect())```## SQL ```{r}library(duckdb)library(glue)# Create connection and read parquet datasetcon <- dbConnect(duckdb())# Define the path to the parquet fileprism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet"# Filter the dataquery <- glue_sql(" SELECT * FROM {`prism_path`} WHERE engine = 'exactextract' AND geo_level = 'tract10' AND state = 'PA' AND measure = 'tmean' AND year = 2020", .con = con)# Execute querydf <- dbGetQuery(con, query)# Print resultshead(df)# Close connectiondbDisconnect(con, shutdown = TRUE)```:::## Metadata### PRISM Table```{r}#| code-summary: 'PRISM Table metadata'context$df_metadata %>%reactable(searchable = T,defaultPageSize =99 )```### PRISM Variables```{r}#| code-summary: 'PRISM measures metadata'context$df_prism_measures %>%reactable(searchable = T,defaultPageSize =99 )```