PRISM zonal statistics - tract10

Author

Ran Li (Maintainer)

Published

January 17, 2025

Setup
## Dependencies
pacman::p_load(
  tidyverse, assertr, arrow,
  tictoc, furrr, carrier, glue, 
  exactextractr,
  terra, sfarrow, leaflet, sf, haven, ggpubr, cli, duckdb, reactable, fs )

## Local Context
source("PRISM - 0 - global context.R")
context = c(
  global_context,
  lst(
    geo_level = 'tract10',
    engine = 'exactextract',
    path_prism = '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1',
    path_prism_parquet = file.path(path_prism, 'ccuh_prism_zonal_stats_v1.parquet'),
    path_processing = file.path(path_prism, '_processing'),
    path_esri_tract10_compiled = file.path(path_processing, "compiled_esri_tract10_prism_zonal_stats.parquet"),
    path_server_tract10 = file.path(
      global_context$path_server_clean,  "tract10.parquet") 
  )
)

## Duckdb
con = dbConnect(duckdb::duckdb())
con_prod <- dbConnect(duckdb::duckdb(), dbdir = "cache_sensitive/prism_zonal_stats_v1")

This notebook focuses on calculating spatial statistics for All USA tract10 boundaries with PRISM data.

1. Templates

PRISM processing scans across a parameters space between available PRISM inventory (measure, year, day, paths) and the zones (geo_level, state).

1.1 Raster

The first component - the raster data parameter space - is operationalize in the PRISM inventory notebook.

PRISM Raster template
df_prism_inventory = context$path_cache_df_prism_inventory |> 
  arrow::read_parquet() |> 
  mutate(r_path_bil = file.path(
    '//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data',
    measure,
    year,
    str_replace(r_base_name, '.tif','.bil')
  ))

df_prism_inventory |> 
  utility__summarize_prism_inventory() 
measure n_years year_range n_days
ppt 19 2005 - 2023 6939
tdmean 19 2005 - 2023 6939
tmax 19 2005 - 2023 6939
tmean 19 2005 - 2023 6939
tmin 19 2005 - 2023 6939
vpdmax 19 2005 - 2023 6939
vpdmin 19 2005 - 2023 6939

1.2 Zone

The second component - the zonal boundaries parameter space - is operationalize in each individual processing notebook - dependent on what geographic boundaries the notebook is intended to calculate zonal statistics for all counties in USA with 2010 census county delinations. Lets first operatiaonlize albers reprojection to do the zonal statistics from.

Get the zones
df_zones = tibble(
  z_path_cache = list.files(context$path_server_zone, 
                          recursive = TRUE,
                          full.names = TRUE,
                          pattern = glue('{context$geo_level}_albers.shp'))
) |> 
  mutate(state = stringr::str_extract(basename(z_path_cache), "_([A-Z]+)_")) |>
  # Remove the underscores
  mutate(state = stringr::str_remove_all(state, "_")) |>
  filter(state == 'CONUS') |> 
  filter(!str_detect(z_path_cache,'.sr.lock'))

head(df_zones)
z_path_cache state
//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/tiger10_boundaries__v1.1/lake_albers_shp/tl_CONUS_tract10_albers/tl_CONUS_tract10_albers.shp CONUS

We just append to the PRISM inventory other aspects:

  • state: the state of the boundary
  • geo_level: the geographic level of the boundary
  • z_path_cache: the path to the boundary shapefile
  • template_key: a unique key for the template rows (just row_number())
Add Zonal parameter space
## Add zonal parameter space
template_raw = df_prism_inventory |> 
  mutate(
    state = list(df_zones$state)
  ) |> 
  unnest(state) |> 
  left_join(df_zones, by = "state") |> 
  mutate(
    geo_level = context$geo_level,
    engine = context$engine,
    template_key = row_number(),
    out_partition = base::file.path(
      glue::glue("engine={engine}"),
      glue::glue("geo_level={geo_level}"),
      glue::glue("measure={measure}"),
      glue::glue("state={state}"),
      glue::glue("year={year}") ,
      glue::glue("month={month}") ,
      glue::glue("day={day}")   
    ),
    out_local = base::file.path(
      "cache_sensitive",
      'hdfs_daily',
      out_partition
    ),
    out_local_file = file.path(out_local, 'data.parquet'),
    out_server = base::file.path(
      context$path_server_hdfs_daily,
      out_partition
    )
  )

## Preview
glimpse(template_raw)
Rows: 48,573
Columns: 20
$ r_base_name    <chr> "prism_ppt_us_30s_20200101.tif", "prism_ppt_us_30s_2020…
$ measure        <chr> "ppt", "ppt", "ppt", "ppt", "ppt", "ppt", "ppt", "ppt",…
$ datetxt        <chr> "20200101", "20200102", "20200103", "20200104", "202001…
$ date           <date> 2020-01-01, 2020-01-02, 2020-01-03, 2020-01-04, 2020-0…
$ r_path_cache   <chr> "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/CC…
$ server         <lgl> TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, T…
$ year           <dbl> 2020, 2020, 2020, 2020, 2020, 2020, 2020, 2020, 2020, 2…
$ month          <dbl> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1…
$ day            <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, …
$ raster_size    <dbl> 19548982, 27923632, 28350200, 26485044, 20795666, 14635…
$ r_path_bil     <chr> "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PR…
$ state          <chr> "CONUS", "CONUS", "CONUS", "CONUS", "CONUS", "CONUS", "…
$ z_path_cache   <chr> "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/C…
$ geo_level      <chr> "tract10", "tract10", "tract10", "tract10", "tract10", …
$ engine         <chr> "exactextract", "exactextract", "exactextract", "exacte…
$ template_key   <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, …
$ out_partition  <chr> "engine=exactextract/geo_level=tract10/measure=ppt/stat…
$ out_local      <chr> "cache_sensitive/hdfs_daily/engine=exactextract/geo_lev…
$ out_local_file <chr> "cache_sensitive/hdfs_daily/engine=exactextract/geo_lev…
$ out_server     <chr> "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/C…

Thats our template for processing in this geographic scope.

1.3 Remove already done

Before we start processing lets just make sure that the results are not already available. Lets first get whats in the local cache (we assume the local cache reflects the server results). Then lets update our template subset to those that are not processed yet.

Curate local results

lets first inventory local results and make sure there ar eno small file sizes < 1MB that sometimes occur if a processing was interupted.

Code
## Initial local results
df_local_results = template_raw |> 
  mutate(out_exists = file.exists(out_local_file) ) %>% 
  select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>% 
  filter(out_exists) %>% 
  mutate(size = file.size(out_local_file)/10^6)

## Remove files less than 0.1 MB
df_local_results %>% 
  filter(size < 0.1) %>% 
  pull(out_local) %>% 
  walk(~{
      unlink(.x, recursive = TRUE, force = TRUE)
  })

## Refresh local results
df_local_results = template_raw |> 
  mutate(out_local_file = file.path(out_local, 'data.parquet')) %>% 
  mutate(out_exists = file.exists(out_local_file) ) %>% 
  select(r_base_name, measure, date, out_exists, out_local_file, out_local) %>% 
  filter(out_exists) %>% 
  mutate(size = file.size(out_local_file)/10^6)

## Distirbution of file size
df_local_results %>% 
  ggplot(aes(x = measure, y = size)) +
  geom_boxplot() + 
  labs(
    title = 'Distribution of individual day-conus-measure parquet file',
    subtitle = 'Most* individual parquet files are ~ 1MB',
    caption = '*ppt data is smaller because many 0 values',
    y = 'Size (MB)'
  )

Looks good. we remove occasional placeholder parquet files (<0.1MB) that occur when write processess are interupted.

Now lets filter any already processed results to determine what we still need to do.

Check what is already processed and remove
## Remove partitioned results
tic()
template_remove_partitioned_results = template_raw |> 
  mutate(out_exists = file.exists(out_local_file) ) %>% 
  filter(!out_exists)
toc()
3.17 sec elapsed
Check what is already processed and remove
## Filter out ESRI Result combinations
if (!file.exists("cache/df_tract10_esri_date_measure.parquet")){
  df_tract10_esri_date_measure  <- dbGetQuery(con_prod,glue("
    SELECT DISTINCT
      MEASURE as measure, DATEVAL as date
    FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet'  
 "))
  df_tract10_esri_date_measure  %>% write_parquet("cache/df_tract10_esri_date_measure.parquet")
} else {
  df_tract10_esri_date_measure = read_parquet("cache/df_tract10_esri_date_measure.parquet")
}
template = template_remove_partitioned_results |>
  anti_join(df_tract10_esri_date_measure)
  

## Summary 
if (nrow(template) == 0) {
  cli::cli_alert_info("All done!")
} else {
  template |> 
  utility__summarize_prism_inventory()
}

2. Processing function

This is an engine agnostic template for how this funciton is layed out. Note its a crated function to enable parallel computing.

Processing function
geo_level_tmp = context$geo_level
measure_tmp = 'tmax'
state_tmp = 'CONUS'
year_tmp =  '2020'
month_tmp = '1'
day_tmp = '12'
datetxt_tmp = '20200112'
dev = F
engine = context$engine

crate_calc_prism_zonal_exactextract = crate(
  template_raw = template_raw,
  template = template,
  function(geo_level_tmp, measure_tmp, state_tmp, 
           year_tmp, month_tmp, day_tmp, datetxt_tmp,
           dev = F, engine = 'exactextract'){
    
    { ###### Setup ######
      path_cache_folder = 'cache_sensitive/hdfs_daily'
      if (dev == T) path_cache_folder = 'cache_sensitive/result_dev'
      out_dir = base::file.path(
        path_cache_folder,
        glue::glue("engine={engine}"),
        glue::glue("geo_level={geo_level_tmp}"),
        glue::glue("measure={measure_tmp}"),
        glue::glue("state={state_tmp}"),
        glue::glue("year={year_tmp}") ,
        glue::glue("month={month_tmp}")  ,
        glue::glue("day={day_tmp}")   
      )|>
        as.character()
      cli::cli_alert_info("Started: {measure_tmp} {state_tmp} {year_tmp} {geo_level_tmp} {datetxt_tmp}")

      template_tmp = template
      if (dev == T) template_tmp = template_raw
    
      row = template_tmp |>
        dplyr::filter(
          geo_level ==  geo_level_tmp,
          measure ==  measure_tmp,
          year == year_tmp,
          state == state_tmp,
          datetxt == datetxt_tmp
        )
      out_file = file.path(out_dir,'data.parquet')
  
    }
    
    { ###### Input Validations ######
      if (file.exists(out_file)){
        cli::cli_alert_info("Already processed:  {geo_level_tmp} {measure_tmp} {state_tmp} {year_tmp} {datetxt_tmp}")
        return("Already processed")
      }
      
      ## Iterate over state,year
      if(nrow(row) == 0){
        cli::cli_alert_info("No data: {geo_level_tmp} {measure_tmp} {state_tmp} {year_tmp} {datetxt_tmp}")
        return("No template rows found")
      }
      
      if (dev == T) print(row)
    }
   
   
    { ###### Computation ######
      
      ## Imports
      r = terra::rast(x = row$r_path_cache)
      z = sf::st_read(row$z_path_cache, quiet = T)

      ## Try .tif raster calculation
      value_results <- tryCatch({
        exactextractr::exact_extract(r, z, 'mean', progress = FALSE)
      }, error = function(e) {
        # If there's an error, try the .bil raster
        r_bil <- terra::rast(x = row$r_path_bil)
        exactextractr::exact_extract(r_bil, z, 'mean', progress = FALSE)
      })
      
      ## If .tif results have NA values then use .bil raster for calculation
      if (any(is.nan(value_results))){
        r = terra::rast(x = row$r_path_bil)
        value_results = exactextractr::exact_extract(
          r, z, 'mean', 
          progress = FALSE )
      }
      
      ## Produce results
      df_results_state_year = terra::as.data.frame(z) |> 
        dplyr::select(state_abb, geoid) |>
        dplyr::mutate(
          date = row$date,
          value = value_results )  |>
        dplyr::select(geoid, date, state_abb, value) 
      
      if (dev == T) print("DONE!!!!")
    }
    
    { ###### Exports ######
      if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
      df_results_state_year |> arrow::write_parquet(out_file)
      cli::cli_alert_success("Finished: {measure_tmp} {state_tmp}  {geo_level_tmp} {datetxt_tmp}")
      return()
      
    }
    
  })
 
rm(geo_level_tmp, measure_tmp, state_tmp, 
   month_tmp, day_tmp, datetxt_tmp, year_tmp, dev, engine)

3. Parallel Calls

Dev

Do a development run to make sure parameters and structure is correct.

```{r}
#| code-summary: 'Dev checks for structure'
#| eval: false

## Input template
input_dev = template |> 
  filter(!server) |> 
  select(
    geo_level, measure, state, 
    year, month, day, datetxt) |> 
  distinct() |> 
  mutate(dev = T) |> 
  group_by(measure) |> 
  sample_n(2) |> 
  ungroup()


## Configure parallel
plan(multisession, workers = 5)
opts <- furrr_options(globals = FALSE, seed = TRUE)

## Parallel run
tic()
furrr::future_pwalk(
  input_dev, 
  crate_calc_prism_zonal_exactextract,
  .options = opts
)
toc()

```

Runs well!

Quick look at results

Code
ds = "cache_sensitive/result_dev/" |>
  arrow::open_dataset() |> 
  filter(
    engine == context$engine, 
    geo_level == context$geo_level
    ) 

dfa = ds |>
  select(measure, geo_level, geoid, date, value) |> 
  collect()

Lets do a calcualtion to see how long it would take to process all years, measures for the CONUS zones. In the dev run above we took:

  • work: 14 measure-CONUS-day processing units
  • time: 150 seconds

\[ \begin{align*} \frac{150 \text{ seconds}}{14 \text{ day-measure-CONUS}} = \frac{10.71 \text{ seconds}}{1 \text{ day-measure-CONUS}} = \frac{0.17857 \text{ minutes}}{1 \text{ day-measure-CONUS}} = \frac{0.0029762 \text{ hours}}{1 \text{ day-measure-CONUS}} \end{align*} \]

Let see how many day-meausure-CONUS combinations we have

Code
template_raw |> 
  count(date, measure, state) |> 
  reactable()

There are 48,573 day-measure combinations.

\[ \begin{align*} \frac{0.0029762 \text{ hours}}{1 \text{ day-measure-CONUS}} \times {48,573 \text{ day-measure-CONUS}} = \mathbf{\underline{144 \text{ hours}}}= \mathbf{\underline{6 \text{ days}}} \end{align*} \]

So we estimate that the whole processing for all counties, all years, all measures would take around 1 day to process all tract measures. completely.

But we only need to process undone results which is around half of that so 3 days for tract10 processing of new data. Start today so ETA will be Friday?

Full

  • 150 rows for 30 minutes approximately
  • 300 rows for 1 hr
  • 1800 rows for 6 hr
  • 3600 for 12 hours
  • 3600 for 12 hours
  • 5400 for 30 hours
  • 7200 for 24 hours
```{r}
#| code-summary: 'Full Run'
#| eval: false

## Input template
input = template |> 
  filter(!file.exists(out_local)) |> 
  select(
    geo_level, measure, state, 
    year, month, day, datetxt) |> 
  distinct() %>% 
  slice(1:3600)
 
## Configure parallel
plan(multisession, workers = 5)
opts <- furrr_options(globals = FALSE, seed = TRUE)

## Parallel run
tic()
furrr::future_pwalk(
  input, 
  crate_calc_prism_zonal_exactextract,
  .options = opts
)
toc()

```

4. Production

Import

Here we have several two formats the v1 processinga nd the v2 (robust to error) processing results. Lets compile these into a single dataset.

ESRI

Lets first standardize then load our ESRI results into duckdb.

```{r}

## Add state crosswalk to database
dbExecute(con_prod, "
  CREATE TABLE xwalk_state AS
  SELECT 
    state_abb as state,
    state_census_geoid as state_geoid
  FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/xwalk_state_v1/xwalk_state_v1.parquet'  
")

dbGetQuery(con_prod,"SELECT * FROM xwalk_state")
 

## Load ESRI results into database
dbGetQuery(con_prod, "SHOW TABLES")

dbGetQuery(con_prod,"SELECT * FROM xwalk_state")

dfa <- dbGetQuery(con_prod,glue("
    SELECT 
      GEOID10 AS geoid,
      DATEVAL AS date,
      'tract10' AS geo,
      x.state as state,
      YEAR as year,
      MEASURE AS measure, 
      VALUE AS value
    FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p
    LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid
    LIMIT 1
"))
 
dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_esri")

dbExecute(con_prod, "
  CREATE TABLE tract10_results_esri AS
    SELECT 
      GEOID10 AS geoid,
      DATEVAL AS date,
      'tract10' AS geo,
      x.state as state,
      YEAR as year,
      MEASURE AS measure, 
      VALUE AS value
    FROM '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/prism_zonal_stats_v1/_processing/compiled_esri_tract10_prism_zonal_stats.parquet' p
    LEFT JOIN xwalk_state x ON p.ST_FIPS = x.state_geoid
")



```

Preview

Code
df1 <- dbGetQuery(con_prod,glue("
    SELECT * FROM tract10_results_esri
    LIMIT 10
  ")) %>% 
  as_tibble()
df1
geoid date geo state year measure value
01121010900 2008-07-27 tract10 AL 2008 ppt 0.0143709
01121010700 2008-07-27 tract10 AL 2008 ppt 0.3526481
01121011100 2008-07-27 tract10 AL 2008 ppt 12.9364470
01121011900 2008-07-27 tract10 AL 2008 ppt 9.2121465
01073011904 2008-07-27 tract10 AL 2008 ppt 23.4812120
01073012703 2008-07-27 tract10 AL 2008 ppt 29.6871533
01073011301 2008-07-27 tract10 AL 2008 ppt 19.3599464
01073014410 2008-07-27 tract10 AL 2008 ppt 17.7761800
01073012912 2008-07-27 tract10 AL 2008 ppt 22.3617439
01073013400 2008-07-27 tract10 AL 2008 ppt 17.6457378

Looks good. But the whole thing from ESRI is a staggering 1.6 billion rows!

xwalk_tract10_land

Also for this particular tract10 dataset we want to subset to tracts that have more than Land_area > 0 (which is the input for the ESRI processing). Lets get a list of all GEOID in the ESRI (tracts with Land_area > 0)

Operationalize xwalk_tract10_land
if (!'xwalk_tract10_land' %in% dbGetQuery(con_prod, "SHOW TABLES")$name){
  
  ## Get tracts with land
  df_tract10_w_land =  dbGetQuery(con_prod,glue("
    SELECT DISTINCT geoid FROM tract10_results_esri
  ")) %>% 
    as_tibble()

  ## Opertaionlize all tracts + land crosswalk
  api <- "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH/ccuh-server/freeze/tiger10_boundaries__v1.1/tiger_us_boundaries_v1.parquet" %>% 
    open_dataset()
  xwalk_tract10_land = api %>% 
    filter(vintage  == 'tract10') %>% 
    select(geoid) %>% 
    distinct() %>% 
    collect() %>% 
    mutate(has_land = geoid %in% df_tract10_w_land$geoid) %>% 
    arrange(has_land)
  xwalk_tract10_land %>%
    arrow::write_parquet("cache/xwalk_tract10_land.parquet")
  
  ## Add to database
  dbExecute(con_prod, "
  CREATE TABLE xwalk_tract10_land AS
  SELECT *
  FROM 'cache/xwalk_tract10_land.parquet'  
")
  
}

Okay we added xwalk_tract10_land to our database. Lets preview that

Code
dbGetQuery(con_prod, "
  SELECT *
  FROM xwalk_tract10_land
  LIMIT 10
")
geoid has_land
01003990000 FALSE
01097990000 FALSE
02013000100 FALSE
02016000200 FALSE
02016000100 FALSE
02240000100 FALSE
02240000400 FALSE
02270000100 FALSE
02070000200 FALSE
02070000100 FALSE

Okay. looks good.

ExactExtract

Now lets bring in out ExactExtract results, link to has_land and filter for those tracts that only have land.

```{r}

dfa = dbGetQuery(con_prod, "
  SELECT 
    exact_results.geoid,
    exact_results.date,
    exact_results.geo_level as geo,
    exact_results.state_abb as state,
    exact_results.year, 
    exact_results.measure, 
    exact_results.value,
    xwalk.has_land
  FROM read_parquet(
    'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet',
    hive_partitioning = true
  ) exact_results
  LEFT JOIN xwalk_tract10_land xwalk 
    ON exact_results.geoid = xwalk.geoid
  WHERE xwalk.has_land = TRUE
  LIMIT 1
")

dbGetQuery(con_prod, "SHOW TABLES")

dbExecute(con_prod, "DROP TABLE IF EXISTS tract10_results_exact")

dbExecute(con_prod, "
  CREATE TABLE tract10_results_exact AS
    SELECT 
    exact_results.geoid,
    exact_results.date,
    exact_results.geo_level as geo,
    exact_results.state_abb as state,
    exact_results.year, 
    exact_results.measure, 
    exact_results.value,
    xwalk.has_land
  FROM read_parquet(
    'cache_sensitive/hdfs_daily/engine=exactextract/geo_level=tract10/*/*/*/*/*/*.parquet',
    hive_partitioning = true
  ) exact_results
  LEFT JOIN xwalk_tract10_land xwalk 
    ON exact_results.geoid = xwalk.geoid
  WHERE xwalk.has_land = TRUE
")



```

ExactExtract Validations

Let’s do two validations: 1) Verify no water only tracts 2) check that 1/20/2023 is in the data - prior to systematic checks

Verify no water only tracts
## Get xwalk_tract10_land
xwalk_tract10_land =  dbGetQuery(con_prod,glue("
    SELECT * FROM xwalk_tract10_land
  ")) %>% 
  as_tibble()

## Get all exactextract tracts
df_tracts_exact =  dbGetQuery(con_prod,glue("
    SELECT DISTINCT geoid, has_land FROM tract10_results_exact
  ")) %>% 
  as_tibble()

df_tracts_exact %>% 
  verify(all(has_land) == T) %>% 
  count(has_land, name = 'n_tract10')
has_land n_tract10
TRUE 72246

looks good its consistent with ESRI results (72,246 tracts all with land). Now let’s do a check for the missing date - 2023 was missing a single day 1/20/2023.

Verify 2023-01-20 data exists
## Get all data for 1/20/2023
df_data_2023 = dbGetQuery(con_prod, glue("
    SELECT  * 
    FROM tract10_results_exact
    WHERE date = '2023-01-20'
")) %>% 
  as_tibble()


df_data_2023 %>% 
  count(date, measure, name = 'n_tracts')
date measure n_tracts
2023-01-20 ppt 72246
2023-01-20 tdmean 72246
2023-01-20 tmax 72246
2023-01-20 tmean 72246
2023-01-20 tmin 72246
2023-01-20 vpdmax 72246
2023-01-20 vpdmin 72246

It seems like 1/20/23 data exists. Looks okay for now. Lets do a comprehensive completeness test later on in section 5.

Preview

Code
df2 =  dbGetQuery(
  con_prod, 
  "
  SELECT * 
  FROM tract10_results_exact
  LIMIT 1000
  ") %>% 
  as_tibble()

# # QC
# dfa = df2 %>% 
#   add_count(measure, geoid, date) %>% 
#   arrange(measure,geoid, date)
# dfa %>% filter(n>1) %>% distinct()


head(df2) %>% 
  reactable()
  • 1,932,148,804 rows

Compile

Now lets compile into a single parquet file.

```{r}

dbGetQuery(con_prod, "SHOW TABLES")

dbExecute(con_prod, "DROP TABLE IF EXISTS tract10")

dbExecute(con_prod, "
  CREATE TABLE tract10 AS (
    (SELECT * FROM tract10_results_esri)
    UNION ALL 
    (SELECT * EXCLUDE (has_land) FROM tract10_results_exact)
  )
")

```

Preview

Code
dfa = dbGetQuery(con_prod, "SELECT * FROM tract10 LIMIT 1000") %>% 
  as_tibble()

EDA

```{r}
#| code-summary: 'EDA'
#| eval: false

## Query
qc_query = glue("
  SELECT *
  FROM tract10
  WHERE state = 'DC' AND year = 2010;
") 
df_qc <- dbGetQuery(con_prod,  qc_query) |> as_tibble()

## EDA
df_qc |>
  ggplot(aes(x = value)) +
  geom_histogram(bins = 50, fill = "steelblue", color = "white") +
  facet_wrap(~measure, scales = "free") +  # Create separate plots for each measure
  theme_minimal() +
  labs(
    title = "Distribution of Values by Measure",
    x = "Value",
    y = "Count"
  ) +
  theme(
    plot.title = element_text(hjust = 0.5, size = 14),
    strip.text = element_text(size = 10, face = "bold"),
    axis.title = element_text(size = 10)
  )
  
```

5. Validation

Let’s take a look at our results.

Completeness

Check for completeness of number of counties, measures, dates -right number of rows.

## Query
df_qc2 = dbGetQuery(
  con_prod,
  "SELECT 
    geoid,
    measure,
    COUNT(DISTINCT date) as n_dates,
    COUNT(*) as n_rows
   FROM tract10
   WHERE state IN ('PA','FL') 
   GROUP BY geoid, measure
   ORDER BY geoid, measure"
) %>%
  as_tibble() 

## Validations
df_valid = df_qc2 %>%
  verify(n_dates == 6939) %>%
  verify(n_rows == n_dates)

## Preview subset
df_valid %>%
  sample_n(5)
geoid measure n_dates n_rows
42133010510 tdmean 6939 6939
12086001701 tmin 6939 6939
42101980900 tdmean 6939 6939
12099005926 vpdmax 6939 6939
12099005942 tmean 6939 6939

We do two checks here for a subset of the data (PA and FL):

  • We validated the completeness of the data by verifying that each tract10/measure combinations should have 6939 days (all days in 200)
  • We validate uniquness of the data by verifying each tract10/measure combination should have 6939 rows (all tract10/measure/days only have one row/data-point)

Missing values

Now lets check there are no missing values.

df_qc3 = dbGetQuery(
  con_prod,
  "SELECT *
   FROM tract10
   WHERE ISNAN(value)"
) %>%
  as_tibble() 


## Validate
valid = df_qc3 %>% 
  verify(nrow(.) == 0)

No missing values!

Garbage values

There are some garbage coded values for when processing throws an error. Lets check the distribution of that.

df_qc4 = dbGetQuery(
  con_prod,
  "SELECT *
   FROM tract10
   WHERE value > 1000"
) %>%
  as_tibble() 


## Validate
valid = df_qc4 %>% 
  verify(nrow(.) == 0)

No garbage values!

Visual inspection

Define visual inspection function
inspect_tract_data = function(df1){
  df1 |>
    group_by(date, measure) |>
    summarise(mean_value = mean(value, na.rm = TRUE),
              .groups = "drop") |>
    mutate(year = year(date)) |>
    ggplot(aes(x = date, y = mean_value, color = measure)) +
    geom_line() +
    facet_wrap(~ measure, scales = "free_y", ncol = 2) +
    theme_minimal()  + 
    labs(title = glue("Visual inspection {row$i}: Looks okay"),
         subtitle = glue(" {row$geoid} ({row$city} - {row$name})"),
         x = "Date",
         y = "Value",
         color = "Measure") +
    theme(legend.position = "bottom",
          strip.text = element_text(size = 10, face = "bold"),
          axis.text.x = element_text(angle = 45, hjust = 1),
          panel.spacing = unit(1, "lines")) +
    scale_x_date(date_breaks = "2 years", date_labels = "%Y") +
    scale_color_viridis_d(end = 0.9)
}

Lets plot a few things to make sure things look okay focusing on 4 tracts.

Get data for 4 counties
df_tracts_fips = tribble(
 ~city,         ~name,           ~geoid,
 "Philadelphia", "Center City tract",    "42101000100", 
 "Miami", "Tract in Miami",    "12086000100", 
) |> 
  mutate(i = row_number())
Inspect: overlap of tracts in Philadelphia
## Query
row = df_tracts_fips |> slice(1)
dfa = dbGetQuery(con_prod, glue("
  SELECT * FROM tract10
  WHERE state IN ('PA') AND geoid = 42101000100
")) |> as_tibble()

## Plot
inspect_tract_data(dfa)

Inspect: overlap of tracts in Miami
## Query
row = df_tracts_fips |> slice(2)
dfb = dbGetQuery(con_prod, glue("
  SELECT * FROM tract10
  WHERE state IN ('FL') AND geoid = 12011030703
")) |> as_tibble()

## Plot
inspect_tract_data(dfb)

Export

Now that things look good let’s expert to the server so everyone can use this.

Code
if (!file.exists('cache_sensitive/clean/tract10.parquet')) {

  dbExecute(con_prod, glue_sql("
    COPY (
      SELECT *
      FROM tract10
    )
    TO 'cache_sensitive/clean/tract10.parquet' (FORMAT 'parquet')
  ", .con = con_prod))
  
}

This local result was copied over to the server!

6. Access

Data

```{r}
library(arrow)
library(tidyverse)

ds_prism = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet" |> 
  arrow::open_dataset() 
  
df = ds_prism |>
  filter(
    state == 'PA',
    measure == 'tmean',
    year == 2020
  ) |>
collect()
```
```{python}
import polars as pl

ds_prism = pl.scan_parquet("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet")

df = (ds_prism
    .filter(
        (pl.col("state") == "PA") &
        (pl.col("measure") == "tmean") &
        (pl.col("year") == 2020)
    )
    .collect()
)

```
```{r}
library(duckdb)
library(glue)

# Create connection and read parquet dataset
con <- dbConnect(duckdb())


# Define the path to the parquet file
prism_path = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/CCUH_read_only_access/PRISM/prism_zonal_stats_v1/clean/tract10.parquet"

# Filter the data
query <- glue_sql("
  SELECT *
  FROM {`prism_path`}
  WHERE engine = 'exactextract'
    AND geo_level = 'tract10'
    AND state = 'PA'
    AND measure = 'tmean'
    AND year = 2020
", .con = con)


# Execute query
df <- dbGetQuery(con, query)

# Print results
head(df)

# Close connection
dbDisconnect(con, shutdown = TRUE)
```

Metadata

PRISM Table

PRISM Table metadata
context$df_metadata %>% 
  reactable(
    searchable = T,
    defaultPageSize = 99
  )

PRISM Variables

PRISM measures metadata
context$df_prism_measures %>% 
  reactable(
    searchable = T,
    defaultPageSize = 99
  )