From c32e097dd595e74f0f58d9fd7155e126dfd8c6b7 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 1 Jun 2023 15:16:37 -0500 Subject: [PATCH] Update JNI JSON reader column compatability for Spark (#13477) This moves the logic to update the columns returned from the JSON reader to java. It also updated the code to be able to deal with requested columns that were not in the data. It is not perfect because it will not work if the input file had no columns at all in it. ``` {} {} ``` But it fixes issues for a file that has valid columns in it, but none of them are the columns that we requested. This is a work around for https://github.com/rapidsai/cudf/issues/13473, but is not perfect. Authors: - Robert (Bobby) Evans (https://github.com/revans2) Approvers: - Jason Lowe (https://github.com/jlowe) - MithunR (https://github.com/mythrocks) URL: https://github.com/rapidsai/cudf/pull/13477 --- java/src/main/java/ai/rapids/cudf/Schema.java | 13 +++- java/src/main/java/ai/rapids/cudf/Table.java | 62 ++++++++++++++++--- java/src/main/native/src/TableJni.cpp | 61 ++++-------------- 3 files changed, 77 insertions(+), 59 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Schema.java b/java/src/main/java/ai/rapids/cudf/Schema.java index c90d27efa97..79e66cb608e 100644 --- a/java/src/main/java/ai/rapids/cudf/Schema.java +++ b/java/src/main/java/ai/rapids/cudf/Schema.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,6 +75,17 @@ int[] getTypeScales() { return ret; } + DType[] getTypes() { + if (types == null) { + return null; + } + DType[] ret = new DType[types.size()]; + for (int i = 0; i < types.size(); i++) { + ret[i] = types.get(i); + } + return ret; + } + public static class Builder { private final List names = new ArrayList<>(); private final List types = new ArrayList<>(); diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index b553cf9913b..aeee667c547 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -233,7 +233,10 @@ private static native long[] readCSV(String[] columnNames, byte comment, String[] nullValues, String[] trueValues, String[] falseValues) throws CudfException; - private static native long[] readJSON(String[] columnNames, + /** + * read JSON data and return a pointer to a TableWithMeta object. + */ + private static native long readJSON(String[] columnNames, int[] dTypeIds, int[] dTypeScales, String filePath, long address, long length, boolean dayFirst, boolean lines) throws CudfException; @@ -968,6 +971,42 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) { return readJSON(schema, opts, buffer, 0, buffer.length); } + private static Table gatherJSONColumns(Schema schema, TableWithMeta twm) { + String[] neededColumns = schema.getColumnNames(); + if (neededColumns == null || neededColumns.length == 0) { + return twm.releaseTable(); + } else { + String[] foundNames = twm.getColumnNames(); + HashMap indices = new HashMap<>(); + for (int i = 0; i < foundNames.length; i++) { + indices.put(foundNames[i], i); + } + // We might need to rearrange the columns to match what we want. + DType[] types = schema.getTypes(); + ColumnVector[] columns = new ColumnVector[neededColumns.length]; + try (Table tbl = twm.releaseTable()) { + for (int i = 0; i < columns.length; i++) { + String neededColumnName = neededColumns[i]; + Integer index = indices.get(neededColumnName); + if (index != null) { + columns[i] = tbl.getColumn(index).incRefCount(); + } else { + try (Scalar s = Scalar.fromNull(types[i])) { + columns[i] = ColumnVector.fromScalar(s, (int)tbl.getRowCount()); + } + } + } + return new Table(columns); + } finally { + for (ColumnVector c: columns) { + if (c != null) { + c.close(); + } + } + } + } + } + /** * Read a JSON file. * @param schema the schema of the file. You may use Schema.INFERRED to infer the schema. @@ -976,11 +1015,14 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) { * @return the file parsed as a table on the GPU. */ public static Table readJSON(Schema schema, JSONOptions opts, File path) { - return new Table( - readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(), - path.getAbsolutePath(), - 0, 0, - opts.isDayFirst(), opts.isLines())); + try (TableWithMeta twm = new TableWithMeta( + readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(), + path.getAbsolutePath(), + 0, 0, + opts.isDayFirst(), opts.isLines()))) { + + return gatherJSONColumns(schema, twm); + } } /** @@ -1043,9 +1085,11 @@ public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer b assert len > 0; assert len <= buffer.length - offset; assert offset >= 0 && offset < buffer.length; - return new Table(readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(), - null, buffer.getAddress() + offset, len, - opts.isDayFirst(), opts.isLines())); + try (TableWithMeta twm = new TableWithMeta(readJSON(schema.getColumnNames(), + schema.getTypeIds(), schema.getTypeScales(), null, + buffer.getAddress() + offset, len, opts.isDayFirst(), opts.isLines()))) { + return gatherJSONColumns(schema, twm); + } } /** diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index c21650bc202..9a59e9b4a82 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -1410,20 +1410,19 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_TableWithMeta_releaseTable(JNIE CATCH_STD(env, nullptr); } -JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON( +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON( JNIEnv *env, jclass, jobjectArray col_names, jintArray j_types, jintArray j_scales, jstring inputfilepath, jlong buffer, jlong buffer_length, jboolean day_first, jboolean lines) { bool read_buffer = true; if (buffer == 0) { - JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL); + JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", 0); read_buffer = false; } else if (inputfilepath != NULL) { JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", - "cannot pass in both a buffer and an inputfilepath", NULL); + "cannot pass in both a buffer and an inputfilepath", 0); } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", - NULL); + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0); } try { @@ -1433,13 +1432,13 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON( cudf::jni::native_jintArray n_scales(env, j_scales); if (n_types.is_null() != n_scales.is_null()) { JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match null", - NULL); + 0); } std::vector data_types; if (!n_types.is_null()) { if (n_types.size() != n_scales.size()) { JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match size", - NULL); + 0); } data_types.reserve(n_types.size()); std::transform(n_types.begin(), n_types.end(), n_scales.begin(), @@ -1450,8 +1449,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON( cudf::jni::native_jstring filename(env, inputfilepath); if (!read_buffer && filename.is_empty()) { - JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty", - NULL); + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty", 0); } auto source = read_buffer ? cudf::io::source_info{reinterpret_cast(buffer), @@ -1465,7 +1463,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON( if (!n_col_names.is_null() && data_types.size() > 0) { if (n_col_names.size() != n_types.size()) { JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", - "types and column names must match size", NULL); + "types and column names must match size", 0); } std::map map; @@ -1481,47 +1479,12 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON( // should infer the types } - cudf::io::table_with_metadata result = cudf::io::read_json(opts.build()); - - // there is no need to re-order columns when inferring schema - if (result.metadata.schema_info.empty() || n_col_names.size() <= 0) { - return convert_table_for_return(env, result.tbl); - } else { - // json reader will not return the correct column order, - // so we need to re-order the column of table according to table meta. - - // turn name and its index in table into map - std::map m; - std::transform(result.metadata.schema_info.cbegin(), result.metadata.schema_info.cend(), - thrust::make_counting_iterator(0), std::inserter(m, m.end()), - [](auto const &column_info, auto const &index) { - return std::make_pair(column_info.name, index); - }); - - auto col_names_vec = n_col_names.as_cpp_vector(); - std::vector indices; - - bool match = true; - for (size_t i = 0; i < col_names_vec.size(); i++) { - if (m.find(col_names_vec[i]) == m.end()) { - match = false; - break; - } else { - indices.push_back(m.at(col_names_vec[i])); - } - } + auto result = + std::make_unique(cudf::io::read_json(opts.build())); - if (!match) { - // can't find some input column names in table meta, return what json reader reads. - return convert_table_for_return(env, result.tbl); - } else { - auto tbv = result.tbl->view().select(std::move(indices)); - auto table = std::make_unique(tbv); - return convert_table_for_return(env, table); - } - } + return reinterpret_cast(result.release()); } - CATCH_STD(env, NULL); + CATCH_STD(env, 0); } JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(