Skip to content

Commit

Permalink
All tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Aug 13, 2024
1 parent ac826ff commit 393eaa6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
27 changes: 7 additions & 20 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,8 @@ struct equality_functor {
__device__ bool operator()(key_type const lhs_idx, key_type const rhs_idx) const
{
// We don't call this for nulls so this is fine
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
auto const result = equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
/*printf("col_type_id:%d, equality idx1:%d, idx2:%d, eq:%d\n",
col.type().id(),
lhs_idx,
rhs_idx,
result);*/
return result;
auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{};
return equal(col.element<T>(lhs_idx), col.element<T>(rhs_idx));
}
};

Expand All @@ -56,9 +50,7 @@ struct hash_functor {
column_device_view const& col;
__device__ auto operator()(key_type idx) const
{
auto const hashed = cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
// printf("hashing idx: %d = %d\n", idx, hashed);
return hashed; // cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
return cudf::hashing::detail::MurmurHash3_x86_32<T>{}(col.element<T>(idx));
}
};

Expand Down Expand Up @@ -91,12 +83,10 @@ struct map_insert_fn {
{},
storage_ref};

// Create another map with insert operator
auto map_insert_ref = hash_map_ref.with_operators(cuco::insert_and_find);
// Insert
auto [iter, found] = map_insert_ref.insert_and_find(cuco::pair{i, i});
// printf("Inserted k=%d, v=%d, unique=%d\n", iter->first, iter->second, found);
return found;
// Create another map ref with the insert operator
auto map_insert_ref = hash_map_ref.with_operators(cuco::insert);
// Insert into the hash map
return map_insert_ref.insert(cuco::pair{i, i});
} else {
CUDF_UNREACHABLE("Unsupported type to insert in map");
}
Expand Down Expand Up @@ -143,7 +133,6 @@ struct map_find_fn {
"Unable to find value in map in dictionary index construction");

// Return a pair of the found key and value.
// printf("Find=%d, Found slot: k=%d, v=%d\n", i, found_slot->first, found_slot->second);
return {found_slot->first, found_slot->second};
} else {
CUDF_UNREACHABLE("Unsupported type to find in map");
Expand Down Expand Up @@ -262,12 +251,10 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
// printf("Writing %d at loc: %d\n", key, loc);
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
// chunk.dict_data_idx[loc] = t + i;
// printf("Replacing slot->data()->second: %d, %d\n", slot->data()->second, loc);
slot->data()->second = loc;
}
}
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1310,19 +1310,26 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
}
});

std::vector<std::size_t> map_offsets(valid_chunk_sizes.size() + 1, 0);
std::vector<std::size_t> map_offsets(valid_chunk_sizes.size(), 0);
std::exclusive_scan(valid_chunk_sizes.begin(),
valid_chunk_sizes.end(),
map_offsets.begin(),
static_cast<size_t>(0));
map_offsets.back() = map_offsets[valid_chunk_sizes.size() - 1] + valid_chunk_sizes.back();

// Create a single bulk storage used by all subsets
auto map_storage = storage_type{map_offsets.back()};
// Initializes the storage with the given sentinel
// Compute total map_storage
auto const map_storage_size = map_offsets.back() + valid_chunk_sizes.back();

// No chunk needs to create a dictionary, exit early
if (map_storage_size == 0) { return {std::move(dict_data), std::move(dict_index)}; }

// Create a single bulk storage used by all sub-hashmaps
auto map_storage = storage_type{map_storage_size};

// Only initialize storage with the given sentinel if and only if non-zero size
map_storage.initialize(cuco::pair{KEY_SENTINEL, VALUE_SENTINEL},
cuda::stream_ref{stream.value()});

// Populate chunk dictionary offsets
std::for_each(
thrust::make_zip_iterator(thrust::make_tuple(h_chunks.begin(), map_offsets.begin())),
thrust::make_zip_iterator(thrust::make_tuple(h_chunks.end(), map_offsets.end())),
Expand All @@ -1331,10 +1338,11 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
if (chunk.use_dictionary) { chunk.dict_map_offset = thrust::get<1>(elem); }
});

// Synchronize
chunks.host_to_device_async(stream);

// Populate the hash map for each chunk
populate_chunk_hash_maps(map_storage.data(), frags, stream);

// Synchronize again
chunks.device_to_host_sync(stream);

// Make decision about which chunks have dictionary
Expand Down

0 comments on commit 393eaa6

Please sign in to comment.