external__prism_legacy_parquet_conversion

Originally written by Ran will be deprecated with Joe’s notebook. Goal is to convert SAS files to parquet for more performant access.
Author

Ran Li

Published

April 5, 2024

Code
## Dependencies
library(pacman) 
pacman::p_load(tidyverse, haven, arrow, purrr, cli, tictoc, glue)


## Local Context
local_context = lst(
  ## Identifiers
  model_id = 'external__prism_legacy_parquet_conversion',
  ## Paths
  path_prism = '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM',
  path_sam = file.path(path_prism, 'SAM'),
  path_legacy_sas_results = file.path(path_prism, 'TractAverages'),
  path_output_hdfs = file.path(path_sam, 'processed', 'hdfs_esri_reference_year')
)

The overall goal of this pipeline is to pre-process raw legacy PRISM results which were a data lake of .sas7bdat files into a HDFS. Mainly just to make these results accessible analysts with regular workstations either for EDA or QC. In our case we need these lehcacy results to validate our new workflow.

Processing

Inventory raw files

We have a bunch of reference results that are stored in SAS files. Lets inventory what we are starting with.

raw_reference_files = local_context$path_legacy_sas_results %>% 
  list.files(pattern = '.sas7bdat$',
                                 recursive =  T,
                                 full.names = T)  

## Preview
head(raw_reference_files)
[1] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2008.sas7bdat"
[2] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2009.sas7bdat"
[3] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2010.sas7bdat"
[4] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2011.sas7bdat"
[5] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2012.sas7bdat"
[6] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2013.sas7bdat"

so we have 60 .sasbdat files each around 1GB in size. Lets convert this into a HDFS.

Store as HDFS

## Convert each to parquet and export
raw_reference_files |> 
  walk(~{
    
    { ## Setup
      in_path = raw_reference_files |> keep(~str_detect(.x, 'ct10tdmean2016'))
      in_path = .x
      in_file_name = basename(in_path) |> stringr::str_remove("\\.sas7bdat")
      measure_tmp = in_file_name %>% 
        str_remove_all('ct10') %>% 
        str_remove_all("[0-9]") %>% 
        unlist()
      year_tmp = in_file_name %>% 
        str_remove_all('ct10') %>% 
        str_extract_all("[0-9][0-9][0-9][0-9]") %>% 
        unlist()
      expect_out = file.path(
        local_context$path_output_hdfs,
        glue("MEASURE={measure_tmp}"),
        glue("YEAR={year_tmp}")
      )
      if (dir.exists(expect_out)){
        cli_alert("skipping: {in_file_name} as it already exists")
        return()
      }
    }
    
    { ## Import and process
      cli_alert("start processing: {in_file_name}")
      prism_reference = in_path |> 
        haven::read_sas()  |> 
        mutate(
          YEAR = lubridate::year(DATEVAL),
          MONTH = lubridate::month(DATEVAL),
          DAY = lubridate::day(DATEVAL) ) |> 
        select(-DATETXT) 
      
      prism_reference_valid = prism_reference|> 
        filter(!is.na(YEAR)) 
      ##DANGER  `ct10tdmean2016` is missing dates for half of data!
      
      arrow::write_dataset(
        dataset = prism_reference_valid,
        path = block_downstream, 
        basename_template = paste0(in_file_name, "_part-{i}.parquet"),
        format = "parquet", 
        partitioning = c("MEASURE", "YEAR"))
    }
    
  })

Test out HDFS of reference results

Lets use Arrow to access the results.

con = local_context$path_output_hdfs %>% 
  arrow::open_dataset()


df_tmean_2019_jan = con %>%
  filter(
    MEASURE == 'tmean',
    YEAR == 2019,
    MONTH == 1
  ) %>% 
  collect()

head(df_tmean_2019_jan)
GEOID10 DATEVAL VALUE MONTH DAY MEASURE YEAR
56025000502 2019-01-01 -15.22001 1 1 tmean 2019
56025001000 2019-01-01 -16.40092 1 1 tmean 2019
56025000902 2019-01-01 -16.56511 1 1 tmean 2019
56025001401 2019-01-01 -18.68902 1 1 tmean 2019
56025000700 2019-01-01 -15.61124 1 1 tmean 2019
56025000300 2019-01-01 -15.21548 1 1 tmean 2019

THat query took less than a second and now we have this result! Looks good to me.

Access

```{r}

library(arrow)
library(dplyr)
con = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/SAM/processed/hdfs_esri_reference_year" %>% 
  arrow::open_dataset()
df1 = con %>%
  filter(
    MEASURE == 'tmean',
    YEAR == 2019,
    MONTH == 1
  ) %>% 
  collect()
```