Skip to content

Commit

Permalink
Whitespace normalization of nested column coerced as string column in…
Browse files Browse the repository at this point in the history
… JSONL inputs (#16759)

Addresses #15280 

Whitespace normalization is expected to remove unquoted whitespace characters in JSON lines inputs. However, in the cases where the JSON line is invalid due to an unquoted whitespace occurring in between numbers or literals, the existing normalization implementation is incorrect since it removes these invalidating whitespaces and makes the line valid.

This PR implements the normalization as a post-processing step on only nested columns forced as string columns.
Idea:
1. Create a single buffer by concatenating the rows of the string column. Create segment offsets and lengths array for concatenated buffer
2. Run a complementary whitespace normalization FST i.e. NOP for non-whitespace and quoted whitespace characters, and output indices of unquoted whitespace characters
3. Update segment lengths based on the number of output indices between segment offsets
4. Remove characters at output indices from concatenated buffer.
5. Return updated buffer, segment lengths and updated segment offsets

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #16759
  • Loading branch information
shrshi authored Sep 19, 2024
1 parent a0c6fc8 commit 30e3946
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 189 deletions.
16 changes: 13 additions & 3 deletions cpp/include/cudf/io/detail/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,21 @@ void normalize_single_quotes(datasource::owning_buffer<rmm::device_buffer>& inda
* @brief Normalize unquoted whitespace (space and tab characters) using FST
*
* @param indata Input device buffer
* @param col_offsets Offsets to column contents in input buffer
* @param col_lengths Length of contents of each row in column
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @returns Tuple of the normalized column, offsets to each row in column, and lengths of contents
* of each row
*/
void normalize_whitespace(datasource::owning_buffer<rmm::device_buffer>& indata,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::
tuple<rmm::device_uvector<char>, rmm::device_uvector<size_type>, rmm::device_uvector<size_type>>
normalize_whitespace(device_span<char const> d_input,
device_span<size_type const> col_offsets,
device_span<size_type const> col_lengths,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace io::json::detail
} // namespace CUDF_EXPORT cudf
149 changes: 100 additions & 49 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/io/detail/json.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -625,6 +626,8 @@ void make_device_json_column(device_span<SymbolT const> input,
auto ignore_vals = cudf::detail::make_host_vector<uint8_t>(num_columns, stream);
std::vector<uint8_t> is_mixed_type_column(num_columns, 0);
std::vector<uint8_t> is_pruned(num_columns, 0);
// for columns that are not mixed type but have been forced as string
std::vector<bool> forced_as_string_column(num_columns);
columns.try_emplace(parent_node_sentinel, std::ref(root));

std::function<void(NodeIndexT, device_json_column&)> remove_child_columns =
Expand Down Expand Up @@ -695,11 +698,14 @@ void make_device_json_column(device_span<SymbolT const> input,
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);

// if parent is mixed type column or this column is pruned, ignore this column.
// if parent is mixed type column or this column is pruned or if parent
// has been forced as string, ignore this column.
if (parent_col_id != parent_node_sentinel &&
(is_mixed_type_column[parent_col_id] || is_pruned[this_col_id])) {
(is_mixed_type_column[parent_col_id] || is_pruned[this_col_id]) ||
forced_as_string_column[parent_col_id]) {
ignore_vals[this_col_id] = 1;
if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; }
if (forced_as_string_column[parent_col_id]) { forced_as_string_column[this_col_id] = true; }
continue;
}

Expand Down Expand Up @@ -765,22 +771,26 @@ void make_device_json_column(device_span<SymbolT const> input,
}

auto this_column_category = column_categories[this_col_id];
if (is_enabled_mixed_types_as_string) {
// get path of this column, check if it is a struct/list forced as string, and enforce it
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if ((column_categories[this_col_id] == NC_STRUCT or
column_categories[this_col_id] == NC_LIST) and
user_dtype.has_value() and user_dtype.value().id() == type_id::STRING) {
is_mixed_type_column[this_col_id] = 1;
this_column_category = NC_STR;
}
// get path of this column, check if it is a struct/list forced as string, and enforce it
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if ((column_categories[this_col_id] == NC_STRUCT or
column_categories[this_col_id] == NC_LIST) and
user_dtype.has_value() and user_dtype.value().id() == type_id::STRING) {
this_column_category = NC_STR;
}

CUDF_EXPECTS(parent_col.child_columns.count(name) == 0, "duplicate column name: " + name);
// move into parent
device_json_column col(stream, mr);
initialize_json_columns(this_col_id, col, this_column_category);
if ((column_categories[this_col_id] == NC_STRUCT or
column_categories[this_col_id] == NC_LIST) and
user_dtype.has_value() and user_dtype.value().id() == type_id::STRING) {
col.forced_as_string_column = true;
forced_as_string_column[this_col_id] = true;
}

auto inserted = parent_col.child_columns.try_emplace(name, std::move(col)).second;
CUDF_EXPECTS(inserted, "child column insertion failed, duplicate column name in the parent");
if (not replaced) parent_col.column_order.push_back(name);
Expand All @@ -802,12 +812,30 @@ void make_device_json_column(device_span<SymbolT const> input,
is_mixed_type_column[this_col_id] == 1)
column_categories[this_col_id] = NC_STR;
}
cudaMemcpyAsync(d_column_tree.node_categories.begin(),
column_categories.data(),
column_categories.size() * sizeof(column_categories[0]),
cudaMemcpyDefault,
stream.value());
cudf::detail::cuda_memcpy_async(d_column_tree.node_categories.begin(),
column_categories.data(),
column_categories.size() * sizeof(column_categories[0]),
cudf::detail::host_memory_kind::PAGEABLE,
stream);
}

// ignore all children of columns forced as string
for (auto const this_col_id : unique_col_ids) {
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id != parent_node_sentinel and forced_as_string_column[parent_col_id]) {
forced_as_string_column[this_col_id] = true;
ignore_vals[this_col_id] = 1;
}
// Convert only mixed type columns as string (so to copy), but not its children
if (parent_col_id != parent_node_sentinel and not forced_as_string_column[parent_col_id] and
forced_as_string_column[this_col_id])
column_categories[this_col_id] = NC_STR;
}
cudf::detail::cuda_memcpy_async(d_column_tree.node_categories.begin(),
column_categories.data(),
column_categories.size() * sizeof(column_categories[0]),
cudf::detail::host_memory_kind::PAGEABLE,
stream);

// restore unique_col_ids order
std::sort(h_range_col_id_it, h_range_col_id_it + num_columns, [](auto const& a, auto const& b) {
Expand Down Expand Up @@ -982,39 +1010,58 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
"string offset, string length mismatch");
rmm::device_uvector<char_length_pair_t> d_string_data(col_size, stream);
// TODO how about directly storing pair<char*, size_t> in json_column?
auto offset_length_it =
thrust::make_zip_iterator(json_col.string_offsets.begin(), json_col.string_lengths.begin());

data_type target_type{};
auto [result_bitmask, null_count] = make_validity(json_col);

if (schema.has_value()) {
data_type target_type{};
std::unique_ptr<column> col{};
if (options.normalize_whitespace && json_col.forced_as_string_column) {
CUDF_EXPECTS(prune_columns || options.mixed_types_as_string,
"Whitespace normalization of nested columns requested as string requires "
"either prune_columns or mixed_types_as_string to be enabled");
auto [normalized_d_input, col_offsets, col_lengths] =
cudf::io::json::detail::normalize_whitespace(
d_input, json_col.string_offsets, json_col.string_lengths, stream, mr);
auto offset_length_it = thrust::make_zip_iterator(col_offsets.begin(), col_lengths.begin());
target_type = data_type{type_id::STRING};
// Convert strings to the inferred data type
col = parse_data(normalized_d_input.data(),
offset_length_it,
col_size,
target_type,
std::move(result_bitmask),
null_count,
options.view(),
stream,
mr);
} else {
auto offset_length_it = thrust::make_zip_iterator(json_col.string_offsets.begin(),
json_col.string_lengths.begin());
if (schema.has_value()) {
#ifdef NJP_DEBUG_PRINT
std::cout << "-> explicit type: "
<< (schema.has_value() ? std::to_string(static_cast<int>(schema->type.id()))
: "n/a");
std::cout << "-> explicit type: "
<< (schema.has_value() ? std::to_string(static_cast<int>(schema->type.id()))
: "n/a");
#endif
target_type = schema.value().type;
} else if (json_col.forced_as_string_column) {
target_type = data_type{type_id::STRING};
}
// Infer column type, if we don't have an explicit type for it
else {
target_type = cudf::io::detail::infer_data_type(
options.json_view(), d_input, offset_length_it, col_size, stream);
target_type = schema.value().type;
}
// Infer column type, if we don't have an explicit type for it
else {
target_type = cudf::io::detail::infer_data_type(
options.json_view(), d_input, offset_length_it, col_size, stream);
}
// Convert strings to the inferred data type
col = parse_data(d_input.data(),
offset_length_it,
col_size,
target_type,
std::move(result_bitmask),
null_count,
options.view(),
stream,
mr);
}

auto [result_bitmask, null_count] = make_validity(json_col);
// Convert strings to the inferred data type
auto col = parse_data(d_input.data(),
offset_length_it,
col_size,
target_type,
std::move(result_bitmask),
null_count,
options.view(),
stream,
mr);

// Reset nullable if we do not have nulls
// This is to match the existing JSON reader's behaviour:
// - Non-string columns will always be returned as nullable
Expand Down Expand Up @@ -1120,11 +1167,15 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
const auto [tokens_gpu, token_indices_gpu] =
get_token_stream(d_input, options, stream, cudf::get_current_device_resource_ref());
// gpu tree generation
return get_tree_representation(tokens_gpu,
token_indices_gpu,
options.is_enabled_mixed_types_as_string(),
stream,
cudf::get_current_device_resource_ref());
// Note that to normalize whitespaces in nested columns coerced to be string, we need the column
// to either be of mixed type or we need to request the column to be returned as string by
// pruning it with the STRING dtype
return get_tree_representation(
tokens_gpu,
token_indices_gpu,
options.is_enabled_mixed_types_as_string() || options.is_enabled_prune_columns(),
stream,
cudf::get_current_device_resource_ref());
}(); // IILE used to free memory of token data.
#ifdef NJP_DEBUG_PRINT
auto h_input = cudf::detail::make_host_vector_async(d_input, stream);
Expand Down
Loading

0 comments on commit 30e3946

Please sign in to comment.