From fb742fbbdbdcfc40540fb3e24d68f739d0f0a994 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 13 Aug 2024 19:00:16 +0000 Subject: [PATCH] Some updates --- cpp/src/io/parquet/chunk_dict.cu | 10 ++++------ cpp/src/io/parquet/parquet_gpu.cuh | 6 +++--- cpp/src/io/parquet/writer_impl.cu | 14 +++++++++++--- cpp/tests/io/parquet_writer_test.cpp | 25 ++++++++++++------------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index bd5f27ce9f6..9b333e350db 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -250,8 +250,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) [[maybe_unused]] auto const tile = cg::tiled_partition(cg::this_thread_block()); auto const t = cg::this_thread_block().thread_rank(); - storage_ref_type const storage_ref{chunk.dict_map_size, map_storage + chunk.dict_map_offset}; - __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; if (t == 0) { new (&counter) cuda::atomic{0}; } @@ -259,17 +257,17 @@ CUDF_KERNEL void __launch_bounds__(block_size) for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { if (t + i < chunk.dict_map_size) { - auto* slot = reinterpret_cast(storage_ref.data() + chunk.dict_map_offset + t + i); - auto key = slot->first; + auto* slot = map_storage + chunk.dict_map_offset + t + i; + auto const key = slot->data()->first; if (key != KEY_SENTINEL) { auto loc = counter.fetch_add(1, memory_order_relaxed); cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size"); - printf("Writing %d at loc: %d\n", key, loc); + // printf("Writing %d at loc: %d\n", key, loc); chunk.dict_data[loc] = key; // If sorting dict page ever becomes a hard requirement, enable the following statement and // add a dict sorting step before storing into the slot's second field. // chunk.dict_data_idx[loc] = t + i; - slot->second = loc; + slot->data()->second = loc; } } } diff --git a/cpp/src/io/parquet/parquet_gpu.cuh b/cpp/src/io/parquet/parquet_gpu.cuh index a97cbd02681..315a9cf4449 100644 --- a/cpp/src/io/parquet/parquet_gpu.cuh +++ b/cpp/src/io/parquet/parquet_gpu.cuh @@ -37,9 +37,9 @@ using mapped_type = size_type; auto constexpr cg_size = 1; ///< A CUDA Cooperative Group of 8 threads to handle each subset auto constexpr window_size = 1; ///< Number of concurrent slots handled by each thread -auto constexpr KEY_SENTINEL = key_type{std::numeric_limits::max()}; -auto constexpr VALUE_SENTINEL = mapped_type{std::numeric_limits::max()}; -auto constexpr SCOPE = cuda::thread_scope_device; +auto constexpr KEY_SENTINEL = key_type{-1}; +auto constexpr VALUE_SENTINEL = mapped_type{-1}; +auto constexpr SCOPE = cuda::thread_scope_block; using slot_type = cuco::pair; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 8c30b8f5f79..8f674385f30 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1301,8 +1301,10 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, valid_chunk_sizes.emplace_back(static_cast(0)); } else { chunk.use_dictionary = true; - valid_chunk_sizes.emplace_back(static_cast( - cuco::make_window_extent(static_cast(chunk.num_values)))); + valid_chunk_sizes.emplace_back( + static_cast(cuco::make_window_extent( + // Multiplying by 1/0.7 = 1.43 to target a 70% occupancy factor. + static_cast(chunk.num_values * 1.43)))); chunk.dict_map_size = valid_chunk_sizes.back(); } }); @@ -1320,11 +1322,17 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, map_storage.initialize(cuco::pair{KEY_SENTINEL, VALUE_SENTINEL}, cuda::stream_ref{stream.value()}); + std::cout << "Offsets: " << h_chunks.size() << std::endl; + std::for_each(thrust::make_counting_iterator(static_cast(0)), thrust::make_counting_iterator(h_chunks.size()), [&](auto const idx) { auto& chunk = h_chunks[idx]; - if (chunk.use_dictionary) { chunk.dict_map_offset = map_offsets[idx]; } + if (chunk.use_dictionary) { + chunk.dict_map_offset = map_offsets[idx]; + std::cout << "off: " << map_offsets[idx] << ", size: " << chunk.dict_map_size + << std::endl; + } }); chunks.host_to_device_async(stream); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 5e39f9c9cfb..9f171d23e7d 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -39,7 +39,7 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc { std::default_random_engine generator; std::uniform_int_distribution distribution_d(0, 30); - auto sequence_d = cudf::detail::make_counting_transform_iterator( + [[maybe_unused]]auto sequence_d = cudf::detail::make_counting_transform_iterator( 0, [&](auto i) { return distribution_d(generator); }); std::uniform_int_distribution distribution_s(0, 86400); @@ -52,8 +52,8 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc auto mask = cudf::detail::make_counting_transform_iterator(0, mask_op); - constexpr auto num_rows = 20; - // Durations longer than a day are not exactly valid, but cudf should be able to round trip + constexpr auto num_rows = 5650; // WORKS UNTIL 5649, fails beyond 5650 + // Durations longer than a day are not exactly valid, but cudf should be able to round trip auto durations_d = cudf::test::fixed_width_column_wrapper( sequence_d, sequence_d + num_rows, mask); auto durations_s = cudf::test::fixed_width_column_wrapper( @@ -65,7 +65,7 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc auto durations_ns = cudf::test::fixed_width_column_wrapper( sequence, sequence + num_rows, mask); - auto expected = table_view{{durations_d}}; + auto expected = table_view{{/*durations_d, */durations_s,/*durations_ms, durations_us, durations_ns*/}}; if (use_byte_stream_split) { cudf::io::table_input_metadata expected_metadata(expected); @@ -85,23 +85,22 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_sc cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) .use_arrow_schema(arrow_schema); auto result = cudf::io::read_parquet(in_opts); - +/* auto durations_d_got = cudf::cast(result.tbl->view().column(0), cudf::data_type{cudf::type_id::DURATION_DAYS}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_d, durations_d_got->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_d, durations_d_got->view());*/ - /* if (arrow_schema) { - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, result.tbl->view().column(1)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, result.tbl->view().column(0)); } else { auto durations_s_got = - cudf::cast(result.tbl->view().column(1), cudf::data_type{cudf::type_id::DURATION_SECONDS}); + cudf::cast(result.tbl->view().column(0), cudf::data_type{cudf::type_id::DURATION_SECONDS}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, durations_s_got->view()); } - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ms, result.tbl->view().column(2)); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_us, result.tbl->view().column(3)); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ns, result.tbl->view().column(4));*/ +/* + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ms, result.tbl->view().column(1)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_us, result.tbl->view().column(2)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ns, result.tbl->view().column(3));*/ } TEST_F(ParquetWriterTest, Durations)