From 4900506d9b2c00813f2c58435efce94b35aeebf3 Mon Sep 17 00:00:00 2001 From: GeorgeC Date: Wed, 4 Dec 2024 06:48:09 -0500 Subject: [PATCH] Add CSV loader for phenotype data Introduced CSVLoaderNewSearch class to facilitate loading phenotype data from CSV files. Enhancements were made to the LoadingStore class, including writing column metadata to a CSV file. These changes improve data processing and metadata management functionality within the ETL workflow. --- .../etl/phenotype/CSVLoaderNewSearch.java | 119 ++++++++++++++++++ .../hpds/etl/phenotype/LoadingStore.java | 60 ++++++++- 2 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java new file mode 100644 index 00000000..575bf154 --- /dev/null +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/CSVLoaderNewSearch.java @@ -0,0 +1,119 @@ +package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; + +import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Date; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class CSVLoaderNewSearch { + + private static final LoadingStore store = new LoadingStore(); + + private static final Logger log = LoggerFactory.getLogger(CSVLoaderNewSearch.class); + + private static final int PATIENT_NUM = 0; + + private static final int CONCEPT_PATH = 1; + + private static final int NUMERIC_VALUE = 2; + + private static final int TEXT_VALUE = 3; + + private static final int DATETIME = 4; + + private static boolean DO_VARNAME_ROLLUP = false; + + private static final String HPDS_DIRECTORY = "/opt/local/hpds/"; + + public static void main(String[] args) throws IOException { + if (args.length > 1) { + if (args[0].equalsIgnoreCase("NO_ROLLUP")) { + log.info("NO_ROLLUP SET."); + DO_VARNAME_ROLLUP = false; + } + } + store.allObservationsStore = new RandomAccessFile(HPDS_DIRECTORY + "allObservationsStore.javabin", "rw"); + initialLoad(); + store.saveStore(HPDS_DIRECTORY); + store.dumpStatsAndColumnMeta(HPDS_DIRECTORY); + } + + private static void initialLoad() throws IOException { + Crypto.loadDefaultKey(); + Reader in = new FileReader(HPDS_DIRECTORY + "allConcepts.csv"); + Iterable records = CSVFormat.DEFAULT.withSkipHeaderRecord().withFirstRecordAsHeader().parse(new BufferedReader(in, 1024 * 1024)); + + final PhenoCube[] currentConcept = new PhenoCube[1]; + for (CSVRecord record : records) { + processRecord(currentConcept, record); + } + } + + private static void processRecord(final PhenoCube[] currentConcept, CSVRecord record) { + if (record.size() < 4) { + log.info("Record number {} had less records than we expected so we are skipping it.", record.getRecordNumber()); + return; + } + + String conceptPath = getSanitizedConceptPath(record); + String numericValue = record.get(NUMERIC_VALUE); + boolean isAlpha = (numericValue == null || numericValue.isEmpty()); + String value = isAlpha ? record.get(TEXT_VALUE) : numericValue; + currentConcept[0] = getPhenoCube(currentConcept[0], conceptPath, isAlpha); + + if (value != null && !value.trim().isEmpty() && + ((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class))) { + value = value.trim(); + currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES); + int patientId = Integer.parseInt(record.get(PATIENT_NUM)); + Date date = null; + if (record.size() > 4 && record.get(DATETIME) != null && !record.get(DATETIME).isEmpty()) { + date = new Date(Long.parseLong(record.get(DATETIME))); + } + currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date); + store.allIds.add(patientId); + } + } + + private static PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPath, boolean isAlpha) { + if (currentConcept == null || !currentConcept.name.equals(conceptPath)) { + currentConcept = store.store.getIfPresent(conceptPath); + if (currentConcept == null) { + currentConcept = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class); + store.store.put(conceptPath, currentConcept); + } + } + + return currentConcept; + } + + private static String getSanitizedConceptPath(CSVRecord record) { + String conceptPathFromRow = record.get(CONCEPT_PATH); + String[] segments = conceptPathFromRow.split("\\\\"); + for (int x = 0; x < segments.length; x++) { + segments[x] = segments[x].trim(); + } + conceptPathFromRow = String.join("\\", segments) + "\\"; + conceptPathFromRow = conceptPathFromRow.replaceAll("\\ufffd", ""); + String textValueFromRow = record.get(TEXT_VALUE) == null ? null : record.get(TEXT_VALUE).trim(); + if (textValueFromRow != null) { + textValueFromRow = textValueFromRow.replaceAll("\\ufffd", ""); + } + String conceptPath; + + if (DO_VARNAME_ROLLUP) { + conceptPath = conceptPathFromRow.endsWith("\\" + textValueFromRow + "\\") ? conceptPathFromRow.replaceAll("\\\\[^\\\\]*\\\\$", "\\\\") : conceptPathFromRow; + } else { + conceptPath = conceptPathFromRow; + } + return conceptPath; + } + + +} diff --git a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java index 2a514261..91c8d605 100644 --- a/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java +++ b/etl/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/etl/phenotype/LoadingStore.java @@ -1,11 +1,17 @@ package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype; import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,12 +112,12 @@ public PhenoCube load(String key) throws Exception { public TreeSet allIds = new TreeSet(); - public void saveStore(String hpdsDirectory) throws FileNotFoundException, IOException { + public void saveStore(String hpdsDirectory) throws IOException { System.out.println("Invalidating store"); store.invalidateAll(); store.cleanUp(); System.out.println("Writing metadata"); - ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(new File(hpdsDirectory + "columnMeta.javabin")))); + ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(hpdsDirectory + "columnMeta.javabin"))); metaOut.writeObject(metadataMap); metaOut.writeObject(allIds); metaOut.flush(); @@ -149,5 +155,55 @@ public void dumpStats() { } } + /** + * This method will display counts for the objects stored in the metadata. + * This will also write out a csv file used by the data dictionary importer. + */ + public void dumpStatsAndColumnMeta(String hpdsDirectory) { + try (ObjectInputStream objectInputStream = + new ObjectInputStream(new GZIPInputStream(new FileInputStream(hpdsDirectory + "columnMeta.javabin")));){ + TreeMap metastore = (TreeMap) objectInputStream.readObject(); + try(BufferedWriter writer = Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) { + CSVPrinter printer = new CSVPrinter(writer, CSVFormat.DEFAULT); + for(String key : metastore.keySet()) { + ColumnMeta columnMeta = metastore.get(key); + Object[] columnMetaOut = new Object[11]; + + StringBuilder listQuoted = new StringBuilder(); + AtomicInteger x = new AtomicInteger(1); + + if(columnMeta.getCategoryValues() != null){ + if(!columnMeta.getCategoryValues().isEmpty()) { + columnMeta.getCategoryValues().forEach(string -> { + listQuoted.append(string); + if(x.get() != columnMeta.getCategoryValues().size()) listQuoted.append("ยต"); + x.incrementAndGet(); + }); + } + } + + columnMetaOut[0] = columnMeta.getName(); + columnMetaOut[1] = String.valueOf(columnMeta.getWidthInBytes()); + columnMetaOut[2] = String.valueOf(columnMeta.getColumnOffset()); + columnMetaOut[3] = String.valueOf(columnMeta.isCategorical()); + // this should nest the list of values in a list inside the String array. + columnMetaOut[4] = listQuoted; + columnMetaOut[5] = String.valueOf(columnMeta.getMin()); + columnMetaOut[6] = String.valueOf(columnMeta.getMax()); + columnMetaOut[7] = String.valueOf(columnMeta.getAllObservationsOffset()); + columnMetaOut[8] = String.valueOf(columnMeta.getAllObservationsLength()); + columnMetaOut[9] = String.valueOf(columnMeta.getObservationCount()); + columnMetaOut[10] = String.valueOf(columnMeta.getPatientCount()); + + printer.printRecord(columnMetaOut); + } + + writer.flush(); + } + + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Could not load metastore", e); + } + } }