Code
## Dependencies
library (pacman)
pacman:: p_load (tidyverse, haven, arrow, purrr, cli, tictoc, glue)
## Local Context
local_context = lst (
## Identifiers
model_id = 'external__prism_legacy_parquet_conversion' ,
## Paths
path_prism = '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM' ,
path_sam = file.path (path_prism, 'SAM' ),
path_legacy_sas_results = file.path (path_prism, 'TractAverages' ),
path_output_hdfs = file.path (path_sam, 'processed' , 'hdfs_esri_reference_year' )
)
The overall goal of this pipeline is to pre-process raw legacy PRISM results which were a data lake of .sas7bdat files into a HDFS. Mainly just to make these results accessible analysts with regular workstations either for EDA or QC. In our case we need these lehcacy results to validate our new workflow.
Processing
Inventory raw files
We have a bunch of reference results that are stored in SAS files. Lets inventory what we are starting with.
raw_reference_files = local_context$ path_legacy_sas_results %>%
list.files (pattern = '.sas7bdat$' ,
recursive = T,
full.names = T)
## Preview
head (raw_reference_files)
[1] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2008.sas7bdat"
[2] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2009.sas7bdat"
[3] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2010.sas7bdat"
[4] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2011.sas7bdat"
[5] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2012.sas7bdat"
[6] "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/TractAverages/ct10ppt2013.sas7bdat"
so we have 60 .sasbdat files each around 1GB in size. Lets convert this into a HDFS.
Store as HDFS
## Convert each to parquet and export
raw_reference_files |>
walk (~ {
{ ## Setup
in_path = raw_reference_files |> keep (~ str_detect (.x, 'ct10tdmean2016' ))
in_path = .x
in_file_name = basename (in_path) |> stringr:: str_remove (" \\ .sas7bdat" )
measure_tmp = in_file_name %>%
str_remove_all ('ct10' ) %>%
str_remove_all ("[0-9]" ) %>%
unlist ()
year_tmp = in_file_name %>%
str_remove_all ('ct10' ) %>%
str_extract_all ("[0-9][0-9][0-9][0-9]" ) %>%
unlist ()
expect_out = file.path (
local_context$ path_output_hdfs,
glue ("MEASURE={measure_tmp}" ),
glue ("YEAR={year_tmp}" )
)
if (dir.exists (expect_out)){
cli_alert ("skipping: {in_file_name} as it already exists" )
return ()
}
}
{ ## Import and process
cli_alert ("start processing: {in_file_name}" )
prism_reference = in_path |>
haven:: read_sas () |>
mutate (
YEAR = lubridate:: year (DATEVAL),
MONTH = lubridate:: month (DATEVAL),
DAY = lubridate:: day (DATEVAL) ) |>
select (- DATETXT)
prism_reference_valid = prism_reference|>
filter (! is.na (YEAR))
##DANGER `ct10tdmean2016` is missing dates for half of data!
arrow:: write_dataset (
dataset = prism_reference_valid,
path = block_downstream,
basename_template = paste0 (in_file_name, "_part-{i}.parquet" ),
format = "parquet" ,
partitioning = c ("MEASURE" , "YEAR" ))
}
})
Test out HDFS of reference results
Lets use Arrow to access the results.
con = local_context$ path_output_hdfs %>%
arrow:: open_dataset ()
df_tmean_2019_jan = con %>%
filter (
MEASURE == 'tmean' ,
YEAR == 2019 ,
MONTH == 1
) %>%
collect ()
head (df_tmean_2019_jan)
56025000502
2019-01-01
-15.22001
1
1
tmean
2019
56025001000
2019-01-01
-16.40092
1
1
tmean
2019
56025000902
2019-01-01
-16.56511
1
1
tmean
2019
56025001401
2019-01-01
-18.68902
1
1
tmean
2019
56025000700
2019-01-01
-15.61124
1
1
tmean
2019
56025000300
2019-01-01
-15.21548
1
1
tmean
2019
THat query took less than a second and now we have this result! Looks good to me.
Access
```{r}
library(arrow)
library(dplyr)
con = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/PRISM/SAM/processed/hdfs_esri_reference_year" %>%
arrow::open_dataset()
df1 = con %>%
filter(
MEASURE == 'tmean',
YEAR == 2019,
MONTH == 1
) %>%
collect()
```