Skip to content

Commit

Permalink
Some updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Aug 13, 2024
1 parent 089f909 commit fb742fb
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
10 changes: 4 additions & 6 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -250,26 +250,24 @@ CUDF_KERNEL void __launch_bounds__(block_size)
[[maybe_unused]] auto const tile = cg::tiled_partition<cg_size>(cg::this_thread_block());
auto const t = cg::this_thread_block().thread_rank();

storage_ref_type const storage_ref{chunk.dict_map_size, map_storage + chunk.dict_map_offset};

__shared__ cuda::atomic<size_type, cuda::thread_scope_block> counter;
using cuda::std::memory_order_relaxed;
if (t == 0) { new (&counter) cuda::atomic<size_type, cuda::thread_scope_block>{0}; }
__syncthreads();

for (size_type i = 0; i < chunk.dict_map_size; i += block_size) {
if (t + i < chunk.dict_map_size) {
auto* slot = reinterpret_cast<slot_type*>(storage_ref.data() + chunk.dict_map_offset + t + i);
auto key = slot->first;
auto* slot = map_storage + chunk.dict_map_offset + t + i;
auto const key = slot->data()->first;
if (key != KEY_SENTINEL) {
auto loc = counter.fetch_add(1, memory_order_relaxed);
cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size");
printf("Writing %d at loc: %d\n", key, loc);
// printf("Writing %d at loc: %d\n", key, loc);
chunk.dict_data[loc] = key;
// If sorting dict page ever becomes a hard requirement, enable the following statement and
// add a dict sorting step before storing into the slot's second field.
// chunk.dict_data_idx[loc] = t + i;
slot->second = loc;
slot->data()->second = loc;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/parquet/parquet_gpu.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ using mapped_type = size_type;
auto constexpr cg_size = 1; ///< A CUDA Cooperative Group of 8 threads to handle each subset
auto constexpr window_size = 1; ///< Number of concurrent slots handled by each thread

auto constexpr KEY_SENTINEL = key_type{std::numeric_limits<key_type>::max()};
auto constexpr VALUE_SENTINEL = mapped_type{std::numeric_limits<mapped_type>::max()};
auto constexpr SCOPE = cuda::thread_scope_device;
auto constexpr KEY_SENTINEL = key_type{-1};
auto constexpr VALUE_SENTINEL = mapped_type{-1};
auto constexpr SCOPE = cuda::thread_scope_block;

using slot_type = cuco::pair<key_type, mapped_type>;

Expand Down
14 changes: 11 additions & 3 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,10 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
valid_chunk_sizes.emplace_back(static_cast<std::size_t>(0));
} else {
chunk.use_dictionary = true;
valid_chunk_sizes.emplace_back(static_cast<std::size_t>(
cuco::make_window_extent<cg_size, window_size>(static_cast<size_t>(chunk.num_values))));
valid_chunk_sizes.emplace_back(
static_cast<std::size_t>(cuco::make_window_extent<cg_size, window_size>(
// Multiplying by 1/0.7 = 1.43 to target a 70% occupancy factor.
static_cast<size_t>(chunk.num_values * 1.43))));
chunk.dict_map_size = valid_chunk_sizes.back();
}
});
Expand All @@ -1320,11 +1322,17 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
map_storage.initialize(cuco::pair{KEY_SENTINEL, VALUE_SENTINEL},
cuda::stream_ref{stream.value()});

std::cout << "Offsets: " << h_chunks.size() << std::endl;

std::for_each(thrust::make_counting_iterator(static_cast<std::size_t>(0)),
thrust::make_counting_iterator(h_chunks.size()),
[&](auto const idx) {
auto& chunk = h_chunks[idx];
if (chunk.use_dictionary) { chunk.dict_map_offset = map_offsets[idx]; }
if (chunk.use_dictionary) {
chunk.dict_map_offset = map_offsets[idx];
std::cout << "off: " << map_offsets[idx] << ", size: " << chunk.dict_map_size
<< std::endl;
}
});

chunks.host_to_device_async(stream);
Expand Down
25 changes: 12 additions & 13 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc
{
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution_d(0, 30);
auto sequence_d = cudf::detail::make_counting_transform_iterator(
[[maybe_unused]]auto sequence_d = cudf::detail::make_counting_transform_iterator(
0, [&](auto i) { return distribution_d(generator); });

std::uniform_int_distribution<int> distribution_s(0, 86400);
Expand All @@ -52,8 +52,8 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc

auto mask = cudf::detail::make_counting_transform_iterator(0, mask_op);

constexpr auto num_rows = 20;
// Durations longer than a day are not exactly valid, but cudf should be able to round trip
constexpr auto num_rows = 5650; // WORKS UNTIL 5649, fails beyond 5650
// Durations longer than a day are not exactly valid, but cudf should be able to round trip
auto durations_d = cudf::test::fixed_width_column_wrapper<cudf::duration_D, int64_t>(
sequence_d, sequence_d + num_rows, mask);
auto durations_s = cudf::test::fixed_width_column_wrapper<cudf::duration_s, int64_t>(
Expand All @@ -65,7 +65,7 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc
auto durations_ns = cudf::test::fixed_width_column_wrapper<cudf::duration_ns, int64_t>(
sequence, sequence + num_rows, mask);

auto expected = table_view{{durations_d}};
auto expected = table_view{{/*durations_d, */durations_s,/*durations_ms, durations_us, durations_ns*/}};

if (use_byte_stream_split) {
cudf::io::table_input_metadata expected_metadata(expected);
Expand All @@ -85,23 +85,22 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.use_arrow_schema(arrow_schema);
auto result = cudf::io::read_parquet(in_opts);

/*
auto durations_d_got =
cudf::cast(result.tbl->view().column(0), cudf::data_type{cudf::type_id::DURATION_DAYS});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_d, durations_d_got->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_d, durations_d_got->view());*/

/*
if (arrow_schema) {
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, result.tbl->view().column(1));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, result.tbl->view().column(0));
} else {
auto durations_s_got =
cudf::cast(result.tbl->view().column(1), cudf::data_type{cudf::type_id::DURATION_SECONDS});
cudf::cast(result.tbl->view().column(0), cudf::data_type{cudf::type_id::DURATION_SECONDS});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, durations_s_got->view());
}
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ms, result.tbl->view().column(2));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_us, result.tbl->view().column(3));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ns, result.tbl->view().column(4));*/
/*
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ms, result.tbl->view().column(1));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_us, result.tbl->view().column(2));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ns, result.tbl->view().column(3));*/
}

TEST_F(ParquetWriterTest, Durations)
Expand Down

0 comments on commit fb742fb

Please sign in to comment.