From 3e69851348b0122d8881a7886684741aa30d4ffb Mon Sep 17 00:00:00 2001 From: Tim Fuller Date: Mon, 27 Apr 2020 12:50:21 -0600 Subject: [PATCH] Implement hierarhial unpacking --- .../tpetra/core/src/Tpetra_CrsMatrix_def.hpp | 24 +- .../core/src/Tpetra_Details_Behavior.cpp | 42 +- .../core/src/Tpetra_Details_Behavior.hpp | 11 + .../src/Tpetra_Details_packCrsMatrix_def.hpp | 17 +- ..._Details_unpackCrsMatrixAndCombine_def.hpp | 505 +++++++++++++++--- .../test/CrsMatrix/CrsMatrix_PackUnpack.cpp | 9 + .../core/test/ImportExport/UnpackLongRows.cpp | 331 ++++++++++-- 7 files changed, 794 insertions(+), 145 deletions(-) diff --git a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp index 6f00e9645b32..12b261b0552e 100644 --- a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp +++ b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp @@ -4747,9 +4747,7 @@ namespace Tpetra { // Read parameters from the input ParameterList. // { - Details::ProfilingRegion region( - "Tpetra::CrsMatrix::fillCompete", - "ParameterList"); + Details::ProfilingRegion region_fc("Tpetra::CrsMatrix::fillCompete", "ParameterList"); // If true, the caller promises that no process did nonlocal // changes since the last call to fillComplete. @@ -4798,9 +4796,7 @@ namespace Tpetra { } } if (this->isStaticGraph ()) { - Details::ProfilingRegion region( - "Tpetra::CrsMatrix::fillCompete", - "isStaticGraph"); + Details::ProfilingRegion region_isg("Tpetra::CrsMatrix::fillCompete", "isStaticGraph"); // FIXME (mfh 14 Nov 2016) In order to fix #843, I enable the // checks below only in debug mode. It would be nicer to do a // local check, then propagate the error state in a deferred @@ -4850,9 +4846,7 @@ namespace Tpetra { this->fillLocalMatrix (params); } else { - Details::ProfilingRegion region( - "Tpetra::CrsMatrix::fillCompete", - "isNotStaticGraph"); + Details::ProfilingRegion region_insg("Tpetra::CrsMatrix::fillCompete", "isNotStaticGraph"); // Set the graph's domain and range Maps. This will clear the // Import if the domain Map has changed (is a different // pointer), and the Export if the range Map has changed (is a @@ -4906,9 +4900,9 @@ namespace Tpetra { } { - Details::ProfilingRegion region( - "Tpetra::CrsMatrix::fillCompete", - "callComputeGlobalConstamnts"); + Details::ProfilingRegion region_ccgc( + "Tpetra::CrsMatrix::fillCompete", "callComputeGlobalConstamnts" + ); const bool callComputeGlobalConstants = params.get () == nullptr || params->get ("compute global constants", true); if (callComputeGlobalConstants) { @@ -4920,9 +4914,9 @@ namespace Tpetra { this->fillComplete_ = true; // Now we're fill complete! { - Details::ProfilingRegion region( - "Tpetra::CrsMatrix::fillCompete", - "checkInternalState"); + Details::ProfilingRegion region_cis( + "Tpetra::CrsMatrix::fillCompete", "checkInternalState" + ); this->checkInternalState (); } } diff --git a/packages/tpetra/core/src/Tpetra_Details_Behavior.cpp b/packages/tpetra/core/src/Tpetra_Details_Behavior.cpp index f1875e72a431..4e630610500d 100644 --- a/packages/tpetra/core/src/Tpetra_Details_Behavior.cpp +++ b/packages/tpetra/core/src/Tpetra_Details_Behavior.cpp @@ -189,12 +189,16 @@ namespace { // (anonymous) else { // This could throw invalid_argument or out_of_range. // Go ahead and let it do so. - const long long val = std::stoll(stringToUpper(varVal)); - TEUCHOS_TEST_FOR_EXCEPTION - (val < static_cast(0), std::out_of_range, - prefix << "Environment variable \"" - << environmentVariableName << "\" is supposed to be a size, " - "but it has a negative integer value " << val << "."); + long long val = std::stoll(stringToUpper(varVal)); + if (val < static_cast(0)) { + // If negative - user has requested threshold be lifted + return std::numeric_limits::max(); + } +// TEUCHOS_TEST_FOR_EXCEPTION +// (val < static_cast(0), std::out_of_range, +// prefix << "Environment variable \"" +// << environmentVariableName << "\" is supposed to be a size, " +// "but it has a negative integer value " << val << "."); if (sizeof(long long) > sizeof(size_t)) { // It's hard to test this code, but I want to try writing it // at least, in case we ever have to run on 32-bit machines or @@ -284,6 +288,10 @@ namespace { // (anonymous) #endif // TPETRA_ASSUME_CUDA_AWARE_MPI } + constexpr bool hierarchicalUnpackDefault () { + return true; + } + } // namespace (anonymous) bool Behavior::debug () @@ -459,6 +467,28 @@ bool Behavior::timing (const char name[]) envVarName, defaultValue); } + +void Behavior::enable_timing() { + BehaviorDetails::timingDisabled_ = false; +} + +void Behavior::disable_timing() { + BehaviorDetails::timingDisabled_ = true; +} + +bool Behavior::hierarchicalUnpack () +{ + constexpr char envVarName[] = "TPETRA_HIERARCHICAL_UNPACK"; + constexpr bool defaultValue = hierarchicalUnpackDefault(); + + static bool value_ = defaultValue; + static bool initialized_ = false; + return idempotentlyGetEnvironmentVariableAsBool (value_, + initialized_, + envVarName, + defaultValue); +} + } // namespace Details } // namespace Tpetra diff --git a/packages/tpetra/core/src/Tpetra_Details_Behavior.hpp b/packages/tpetra/core/src/Tpetra_Details_Behavior.hpp index 4cedfbdda0a5..665b544d4c7a 100644 --- a/packages/tpetra/core/src/Tpetra_Details_Behavior.hpp +++ b/packages/tpetra/core/src/Tpetra_Details_Behavior.hpp @@ -168,6 +168,12 @@ class Behavior { /// "CrsGraph::insertLocalIndices". static bool timing (const char name[]); + /// \brief Disable timing, programatically + static void disable_timing(); + + /// \brief Enable timing, programatically + static void enable_timing(); + /// \brief Whether to assume that MPI is CUDA aware. /// /// An MPI implementation is "CUDA aware" if it can accept CUDA @@ -214,6 +220,7 @@ class Behavior { /// separate question. static size_t longRowMinNumEntries (); +<<<<<<< HEAD /// \brief the threshold for transitioning from device to host /// /// If the number of elements in the multivector does not exceed this @@ -222,6 +229,10 @@ class Behavior { /// By default this is 10000, but may be altered by the environment /// variable TPETRA_VECTOR_DEVICE_THRESHOLD static size_t multivectorKernelLocationThreshold (); +======= + /// \brief Unpack rows of a matrix using hierarchical unpacking + static bool hierarchicalUnpack (); +>>>>>>> Implement hierarhial unpacking /// \brief Use Teuchos::Timer in Tpetra::ProfilingRegion /// diff --git a/packages/tpetra/core/src/Tpetra_Details_packCrsMatrix_def.hpp b/packages/tpetra/core/src/Tpetra_Details_packCrsMatrix_def.hpp index f1bcd1d18e8e..71477576e13c 100644 --- a/packages/tpetra/core/src/Tpetra_Details_packCrsMatrix_def.hpp +++ b/packages/tpetra/core/src/Tpetra_Details_packCrsMatrix_def.hpp @@ -935,15 +935,14 @@ packCrsMatrix (const CrsMatrix& sourceMatrix, template void -packCrsMatrixNew (const CrsMatrix& sourceMatrix, - Kokkos::DualView::buffer_device_type>& exports, - const Kokkos::DualView::buffer_device_type>& numPacketsPerLID, - const Kokkos::DualView::buffer_device_type>& exportLIDs, - size_t& constantNumPackets, - Distributor& distor) +packCrsMatrixNew( + const CrsMatrix& sourceMatrix, + Kokkos::DualView::buffer_device_type>& exports, + const Kokkos::DualView::buffer_device_type>& numPacketsPerLID, + const Kokkos::DualView::buffer_device_type>& exportLIDs, + size_t& constantNumPackets, + Distributor& distor +) { using device_type = typename CrsMatrix::device_type; using buffer_device_type = typename DistObject::buffer_device_type; diff --git a/packages/tpetra/core/src/Tpetra_Details_unpackCrsMatrixAndCombine_def.hpp b/packages/tpetra/core/src/Tpetra_Details_unpackCrsMatrixAndCombine_def.hpp index 1586001fa937..e5caa70fb86d 100644 --- a/packages/tpetra/core/src/Tpetra_Details_unpackCrsMatrixAndCombine_def.hpp +++ b/packages/tpetra/core/src/Tpetra_Details_unpackCrsMatrixAndCombine_def.hpp @@ -48,7 +48,6 @@ #include "Tpetra_Details_createMirrorView.hpp" #include "Tpetra_Details_OrdinalTraits.hpp" #include "Tpetra_Details_PackTraits.hpp" -#include "Tpetra_Details_Profiling.hpp" #include "Tpetra_CrsMatrix_decl.hpp" #include "Tpetra_Details_getEntryOnHost.hpp" #include "Kokkos_Core.hpp" @@ -108,7 +107,7 @@ unpackRow(const typename PackTraits::output_array_type& gids_out, const size_t offset, const size_t /* num_bytes */, const size_t num_ent, - const size_t num_bytes_per_value) + const size_t bytes_per_value) { if (num_ent == 0) { // Empty rows always take zero bytes, to ensure sparsity. @@ -129,7 +128,7 @@ unpackRow(const typename PackTraits::output_array_type& gids_out, size_t (0); const size_t vals_beg = gids_beg + gids_len + pids_len; - const size_t vals_len = num_ent * num_bytes_per_value; + const size_t vals_len = num_ent * bytes_per_value; const char* const num_ent_in = imports + num_ent_beg; const char* const gids_in = imports + gids_beg; @@ -173,6 +172,98 @@ unpackRow(const typename PackTraits::output_array_type& gids_out, return 0; // no errors } +/// \brief Unpack a single row, in parallel, of a CrsMatrix +/// +/// \tparam ST The type of the numerical entries of the matrix. +/// (You can use real-valued or complex-valued types here, unlike +/// in Epetra, where the scalar type is always \c double.) +/// \tparam LO The type of local indices. See the +/// documentation of Map for requirements. +/// \tparam GO The type of global indices. See the +/// documentation of Map for requirements. +template +KOKKOS_FUNCTION int +unpackRowP( + typename Kokkos::TeamPolicy::member_type team_member, + const LO lid_no, + const typename PackTraits::output_array_type& gids_out, + const typename PackTraits::output_array_type& pids_out, + const typename PackTraits::output_array_type& vals_out, + const char imports[], + const size_t offset, + const size_t /* num_bytes */, + const size_t num_entries_in_row, + const size_t bytes_per_value, + const LO batch_no, + const size_t batch_size, + const size_t num_entries_in_batch +) +{ + + if (num_entries_in_row == 0) { + // Empty rows always take zero bytes, to ensure sparsity. + return 0; + } + bool unpack_pids = pids_out.size() > 0; + + const size_t bytes_per_lid = PackTraits::packValueCount(LO(0)); + const size_t num_ent_start = offset; + const size_t num_ent_end = num_ent_start + bytes_per_lid; + + const size_t bytes_per_gid = PackTraits::packValueCount(GO(0)); + const size_t gids_start = num_ent_end; + const size_t gids_end = gids_start + num_entries_in_row * bytes_per_gid; + + const size_t bytes_per_pid = PackTraits::packValueCount(int(0)); + const size_t pids_start = gids_end; + const size_t pids_end = pids_start + (unpack_pids ? size_t(num_entries_in_row * bytes_per_pid) : size_t(0)); + + const size_t vals_start = pids_end; + // const size_t vals_end = vals_start + num_entries_in_row * bytes_per_value; + + const size_t shift = batch_no * batch_size; + const char* const num_ent_in = imports + num_ent_start; + const char* const gids_in = imports + gids_start + shift * bytes_per_gid; + const char* const pids_in = unpack_pids ? imports + pids_start + shift * bytes_per_pid : nullptr; + const char* const vals_in = imports + vals_start + shift * bytes_per_value; + + LO num_ent_out; + (void)PackTraits::unpackValue(num_ent_out, num_ent_in); + if (static_cast(num_ent_out) != num_entries_in_row) { + return 20; // error code + } + + Kokkos::parallel_for( + Kokkos::TeamThreadRange(team_member, num_entries_in_batch), + KOKKOS_LAMBDA(const LO& j) + { + size_t distance = 0; + + GO gid_out; + distance = j * bytes_per_gid; + (void) PackTraits::unpackValue(gid_out, gids_in + distance); + gids_out(j) = gid_out; + + if (unpack_pids) { + int pid_out; + distance = j * bytes_per_pid; + (void) PackTraits::unpackValue(pid_out, pids_in + distance); + pids_out(j) = pid_out; + } + + // assume that ST is default constructible + ST val_out; + distance = j * bytes_per_value; + (void) PackTraits::unpackValue(val_out, vals_in + distance); + vals_out(j) = val_out; + } + ); + + team_member.team_barrier(); + + return 0; // no errors +} + /// \brief Unpacks and combines a single row of the CrsMatrix. /// /// \tparam LocalMatrix KokkosSparse::CrsMatrix specialization. @@ -183,6 +274,9 @@ unpackRow(const typename PackTraits::output_array_type& gids_out, /// Data (bytes) describing the row of the CRS matrix are "unpacked" /// from a single (concatenated) (view of) char* directly into the /// row of the matrix. +struct FlatUnpackTag {}; +struct HierarchicalUnpackTag {}; + template struct UnpackCrsMatrixAndCombineFunctor { typedef LocalMatrix local_matrix_type; @@ -206,6 +300,7 @@ struct UnpackCrsMatrixAndCombineFunctor { typedef Kokkos::View vals_scratch_type; typedef Kokkos::pair value_type; + using member_type = typename Kokkos::TeamPolicy::member_type; static_assert (std::is_same::value, "LocalMap::local_ordinal_type and " @@ -216,11 +311,12 @@ struct UnpackCrsMatrixAndCombineFunctor { input_buffer_type imports; num_packets_per_lid_type num_packets_per_lid; import_lids_type import_lids; + Kokkos::View batch_info; offsets_type offsets; Tpetra::CombineMode combine_mode; - size_t max_num_ent; + size_t batch_size; bool unpack_pids; - size_t num_bytes_per_value; + size_t bytes_per_value; bool atomic; Kokkos::Experimental::UniqueToken tokens; lids_scratch_type lids_scratch; @@ -234,28 +330,30 @@ struct UnpackCrsMatrixAndCombineFunctor { const input_buffer_type& imports_in, const num_packets_per_lid_type& num_packets_per_lid_in, const import_lids_type& import_lids_in, + const Kokkos::View& batch_info_in, const offsets_type& offsets_in, const Tpetra::CombineMode combine_mode_in, - const size_t max_num_ent_in, + const size_t batch_size_in, const bool unpack_pids_in, - const size_t num_bytes_per_value_in, + const size_t bytes_per_value_in, const bool atomic_in) : local_matrix (local_matrix_in), local_col_map (local_col_map_in), imports (imports_in), num_packets_per_lid (num_packets_per_lid_in), import_lids (import_lids_in), + batch_info (batch_info_in), offsets (offsets_in), combine_mode (combine_mode_in), - max_num_ent (max_num_ent_in), + batch_size (batch_size_in), unpack_pids (unpack_pids_in), - num_bytes_per_value (num_bytes_per_value_in), + bytes_per_value (bytes_per_value_in), atomic (atomic_in), tokens (XS()), - lids_scratch (Kokkos::view_alloc("lids_scratch", Kokkos::WithoutInitializing), tokens.size() * max_num_ent), - gids_scratch (Kokkos::view_alloc("gids_scratch", Kokkos::WithoutInitializing), tokens.size() * max_num_ent), - pids_scratch (Kokkos::view_alloc("pids_scratch", Kokkos::WithoutInitializing), tokens.size() * max_num_ent), - vals_scratch (Kokkos::view_alloc("vals_scratch", Kokkos::WithoutInitializing), tokens.size() * max_num_ent) + lids_scratch (Kokkos::view_alloc("lids_scratch", Kokkos::WithoutInitializing), tokens.size() * batch_size), + gids_scratch (Kokkos::view_alloc("gids_scratch", Kokkos::WithoutInitializing), tokens.size() * batch_size), + pids_scratch (Kokkos::view_alloc("pids_scratch", Kokkos::WithoutInitializing), tokens.size() * batch_size), + vals_scratch (Kokkos::view_alloc("vals_scratch", Kokkos::WithoutInitializing), tokens.size() * batch_size) {} KOKKOS_INLINE_FUNCTION void init(value_type& dst) const @@ -284,8 +382,14 @@ struct UnpackCrsMatrixAndCombineFunctor { } } + KOKKOS_INLINE_FUNCTION void + join (const FlatUnpackTag&, volatile value_type& dst, const volatile value_type& src) const + { + join(dst, src); + } + KOKKOS_INLINE_FUNCTION - void operator()(const LO i, value_type& dst) const + void operator()(const FlatUnpackTag&, const LO i, value_type& dst) const { using Kokkos::View; using Kokkos::subview; @@ -341,7 +445,7 @@ struct UnpackCrsMatrixAndCombineFunctor { // is an integer in [0, tokens.size()). It is used to grab a unique (to // this thread) subview of the scratch arrays. const size_type token = tokens.acquire(); - const size_t a = static_cast(token) * max_num_ent; + const size_t a = static_cast(token) * batch_size; const size_t b = a + num_ent; lids_out_type lids_out = subview(lids_scratch, slice(a, b)); gids_out_type gids_out = subview(gids_scratch, slice(a, b)); @@ -352,7 +456,7 @@ struct UnpackCrsMatrixAndCombineFunctor { int unpack_err = unpackRow(gids_out, pids_out, vals_out, imports.data(), offset, num_bytes, - num_ent, num_bytes_per_value); + num_ent, bytes_per_value); if (unpack_err != 0) { dst = Kokkos::make_pair (unpack_err, i); // unpack error tokens.release (token); @@ -400,6 +504,170 @@ struct UnpackCrsMatrixAndCombineFunctor { tokens.release (token); } + + KOKKOS_INLINE_FUNCTION void + join( + const HierarchicalUnpackTag&, + volatile value_type& dst, + const volatile value_type& src + ) const + { + join(dst, src); + } + + KOKKOS_INLINE_FUNCTION + void operator()(const HierarchicalUnpackTag&, member_type team_member, value_type& dst) const + { + using Kokkos::View; + using Kokkos::subview; + using Kokkos::MemoryUnmanaged; + using size_type = typename XS::size_type; + using slice = Kokkos::pair; + + typedef View lids_out_type; + typedef View pids_out_type; + typedef View gids_out_type; + typedef View vals_out_type; + + const LO batch = team_member.league_rank(); + const LO lid_no = batch_info(batch, 0); + const LO batch_no = batch_info(batch, 1); + + const size_t num_bytes = num_packets_per_lid(lid_no); + + // Only unpack data if there is a nonzero number of bytes. + if (num_bytes == 0) + return; + + // there is actually something in the row + const LO import_lid = import_lids(lid_no); + const size_t buf_size = imports.size(); + const size_t offset = offsets(lid_no); + + // Get the number of entries to expect in the received data for this row. + LO num_ent_LO = 0; + const char* const in_buf = imports.data() + offset; + (void) PackTraits::unpackValue(num_ent_LO, in_buf); + const size_t num_entries_in_row = static_cast(num_ent_LO); + + // Count the number of bytes expected to unpack + size_t expected_num_bytes = 0; + { + expected_num_bytes += PackTraits::packValueCount(LO(0)); + expected_num_bytes += num_entries_in_row * PackTraits::packValueCount(GO(0)); + if (unpack_pids) { + expected_num_bytes += num_entries_in_row * PackTraits::packValueCount(int(0)); + } + expected_num_bytes += num_entries_in_row * PackTraits::packValueCount(ST()); + } + + if (expected_num_bytes > num_bytes) { + dst = Kokkos::make_pair(1, lid_no); // wrong number of bytes + return; + } + + if (offset > buf_size || offset + num_bytes > buf_size) { + dst = Kokkos::make_pair(2, lid_no); // out of bounds + return; + } + + // Determine the number of entries to unpack in this batch + size_t num_entries_in_batch = 0; + if (num_entries_in_row <= batch_size) + num_entries_in_batch = num_entries_in_row; + else if (num_entries_in_row >= (batch_no + 1) * batch_size) + num_entries_in_batch = batch_size; + else + num_entries_in_batch = num_entries_in_row - batch_no * batch_size; + + // Get subviews in to the scratch arrays. The token returned from acquire + // is an integer in [0, tokens.size()). It is used to grab a unique (to + // this thread) subview of the scratch arrays. + const size_type token = tokens.acquire(); + const size_t a = static_cast(token) * batch_size; + const size_t b = a + num_entries_in_batch; + lids_out_type lids_out = subview(lids_scratch, slice(a, b)); + gids_out_type gids_out = subview(gids_scratch, slice(a, b)); + pids_out_type pids_out = subview(pids_scratch, slice(a, (unpack_pids ? b : a))); + vals_out_type vals_out = subview(vals_scratch, slice(a, b)); + + // Unpack this row! + int unpack_err = unpackRowP( + team_member, + lid_no, + gids_out, + pids_out, + vals_out, + imports.data(), + offset, + num_bytes, + num_entries_in_row, + bytes_per_value, + batch_no, + batch_size, + num_entries_in_batch + ); + if (unpack_err != 0) { + dst = Kokkos::make_pair(unpack_err, lid_no); // unpack error + tokens.release(token); + return; + } + + // Column indices come in as global indices, in case the + // source object's column Map differs from the target object's + // (this's) column Map, and must be converted local index values + for (size_t i=0; i(lids_out.data()); + const ST* const vals_raw = const_cast(vals_out.data()); + LO num_modified = 0; + + constexpr bool matrix_has_sorted_rows = true; // see #6282 + if (combine_mode == ADD) { + // NOTE (mfh 20 Nov 2019) Must assume atomic is required, unless + // different threads don't touch the same row (i.e., no + // duplicates in incoming LIDs list). + const bool use_atomic_updates = atomic; + num_modified += local_matrix.sumIntoValues( + import_lid, + lids_raw, + num_entries_in_batch, + vals_raw, + matrix_has_sorted_rows, + use_atomic_updates + ); + } + else if (combine_mode == REPLACE) { + // NOTE (mfh 20 Nov 2019): It's never correct to use REPLACE + // combine mode with multiple incoming rows that touch the same + // target matrix entries, so we never need atomic updates. + const bool use_atomic_updates = false; + num_modified += local_matrix.replaceValues( + import_lid, + lids_raw, + num_entries_in_batch, + vals_raw, + matrix_has_sorted_rows, + use_atomic_updates + ); + } + else { + dst = Kokkos::make_pair(4, lid_no); // invalid combine mode + tokens.release(token); + return; + } + + tokens.release(token); + } }; struct MaxNumEntTag {}; @@ -530,6 +798,53 @@ compute_total_num_entries ( return tot_num_ent; } +template +KOKKOS_INLINE_FUNCTION +size_t +unpackRowCount(const char imports[], + const size_t offset, + const size_t num_bytes) +{ + using PT = PackTraits; + + LO num_ent_LO = 0; + if (num_bytes > 0) { + const size_t p_num_bytes = PT::packValueCount(num_ent_LO); + if (p_num_bytes > num_bytes) { + return OrdinalTraits::invalid(); + } + const char* const in_buf = imports + offset; + (void) PT::unpackValue(num_ent_LO, in_buf); + } + return static_cast(num_ent_LO); +} + +/// \brief Compute the index and batch number associated with each batch +/// +/// batch_info(i, 0) is the local index of the ith batch +/// batch_info(i, 1) is the local batch number of the ith batch +template +KOKKOS_INLINE_FUNCTION +bool +compute_batch_info( + const View1& batches_per_lid, + View2& batch_info +) +{ + using LO = typename View2::value_type; + size_t batch = 0; + for (size_t i=0; i(i); + batch_info(batch, 1) = batch_no; + batch++; + } + } + return batch == batch_info.extent(0); +} + /// \brief Perform the unpack operation for the matrix /// /// \tparam LocalMatrix the specialization of the KokkosSparse::CrsMatrix @@ -552,16 +867,6 @@ unpackAndCombineIntoCrsMatrix( using LO = typename LocalMap::local_ordinal_type; using DT = typename LocalMap::device_type; using XS = typename DT::execution_space; - using range_policy = - Kokkos::RangePolicy >; - using unpack_functor_type = - UnpackCrsMatrixAndCombineFunctor; - - ::Tpetra::Details::ProfilingRegion region_unpack_and_combine_into_crs_matrix( - "Tpetra::Details::UnpackAndCombineCrsMatrixImpl::unpackAndCombineIntoCrsMatrix", - "Import/Export" - ); const char prefix[] = "Tpetra::Details::UnpackAndCombineCrsMatrixImpl::" "unpackAndCombineIntoCrsMatrix: "; @@ -611,23 +916,89 @@ unpackAndCombineIntoCrsMatrix( size_t max_num_ent = compute_maximum_num_entries( num_packets_per_lid, offsets, imports); + const bool do_hierarchical_unpack = Tpetra::Details::Behavior::hierarchicalUnpack(); + //const bool do_hierarchical_unpack = false; + const size_t long_row_min_entries = + do_hierarchical_unpack ? + Tpetra::Details::Behavior::longRowMinNumEntries() : + std::numeric_limits::max(); + const size_t batch_size = std::min(long_row_min_entries, max_num_ent); + + // To achieve some balance amongst threads, unpack each row in equal size batches + size_t num_batches = 0; + Kokkos::View batch_info("", num_batches); + Kokkos::View batches_per_lid("", num_import_lids); + if (!do_hierarchical_unpack) + { + num_batches = num_import_lids; + } else + { + // Compute meta data that allows batch unpacking + Kokkos::parallel_reduce( + Kokkos::RangePolicy>(0, num_import_lids), + KOKKOS_LAMBDA(const size_t i, size_t& batches) + { + const size_t num_entries_in_row = unpackRowCount( + imports.data(), offsets(i), num_packets_per_lid(i) + ); + batches_per_lid(i) = + (num_entries_in_row <= batch_size) ? + 1 : + num_entries_in_row / batch_size + (num_entries_in_row % batch_size != 0); + batches += batches_per_lid(i); + }, + num_batches + ); + Kokkos::resize(batch_info, num_batches); + + Kokkos::HostSpace host_space; + auto batches_per_lid_h = Kokkos::create_mirror_view(host_space, batches_per_lid); + Kokkos::deep_copy(batches_per_lid_h, batches_per_lid); + + auto batch_info_h = Kokkos::create_mirror_view(host_space, batch_info); + + (void) compute_batch_info(batches_per_lid_h, batch_info_h); + Kokkos::deep_copy(batch_info, batch_info_h); + } + // FIXME (TJF SEP 2017) // The scalar type is not necessarily default constructible - size_t num_bytes_per_value = PackTraits::packValueCount(ST()); + size_t bytes_per_value = PackTraits::packValueCount(ST()); // Now do the actual unpack! const bool atomic = XS::concurrency() != 1; - unpack_functor_type f(local_matrix, local_map, - imports, num_packets_per_lid, import_lids, offsets, combine_mode, - max_num_ent, unpack_pids, num_bytes_per_value, atomic); + using functor = UnpackCrsMatrixAndCombineFunctor; + functor f( + local_matrix, + local_map, + imports, + num_packets_per_lid, + import_lids, + batch_info, + offsets, + combine_mode, + batch_size, + unpack_pids, + bytes_per_value, + atomic + ); - typename unpack_functor_type::value_type x; - Kokkos::parallel_reduce(range_policy(0, static_cast(num_import_lids)), f, x); - auto x_h = x.to_std_pair(); - TEUCHOS_TEST_FOR_EXCEPTION(x_h.first != 0, std::runtime_error, - prefix << "UnpackCrsMatrixAndCombineFunctor reported error code " - << x_h.first << " for the first bad row " << x_h.second); + typename functor::value_type x; + if (!do_hierarchical_unpack) { + using policy = Kokkos::RangePolicy, FlatUnpackTag>; + Kokkos::parallel_reduce(policy(0, static_cast(num_batches)), f, x); + } else { + using policy = Kokkos::TeamPolicy, HierarchicalUnpackTag>; + Kokkos::parallel_reduce(policy(static_cast(num_batches), Kokkos::AUTO), f, x); + } + auto x_h = x.to_std_pair(); + TEUCHOS_TEST_FOR_EXCEPTION( + x_h.first != 0, + std::runtime_error, + prefix << "UnpackCrsMatrixAndCombineFunctor reported error code " + << x_h.first << " for the first bad row " << x_h.second + ); } template @@ -688,27 +1059,6 @@ unpackAndCombineWithOwningPIDsCount( return count; } -template -KOKKOS_INLINE_FUNCTION -size_t -unpackRowCount(const char imports[], - const size_t offset, - const size_t num_bytes) -{ - using PT = PackTraits; - - LO num_ent_LO = 0; - if (num_bytes > 0) { - const size_t p_num_bytes = PT::packValueCount(num_ent_LO); - if (p_num_bytes > num_bytes) { - return OrdinalTraits::invalid(); - } - const char* const in_buf = imports + offset; - (void) PT::unpackValue(num_ent_LO, in_buf); - } - return static_cast(num_ent_LO); -} - /// \brief Setup row pointers for remotes template int @@ -875,7 +1225,7 @@ unpackAndCombineIntoCrsArrays2( const LocalMatrix& /* local_matrix */, const LocalMap /*& local_col_map*/, const int my_pid, - const size_t num_bytes_per_value) + const size_t bytes_per_value) { using Kokkos::View; using Kokkos::subview; @@ -927,7 +1277,7 @@ unpackAndCombineIntoCrsArrays2( k_error += unpackRow(gids_out, pids_out, vals_out, imports.data(), offset, num_bytes, - num_ent, num_bytes_per_value); + num_ent, bytes_per_value); // Correct target PIDs. for (size_t j = 0; j < static_cast(num_ent); ++j) { @@ -958,7 +1308,7 @@ unpackAndCombineIntoCrsArrays( const size_t tgt_num_rows, const size_t tgt_num_nonzeros, const int my_tgt_pid, - const size_t num_bytes_per_value) + const size_t bytes_per_value) { using Kokkos::View; using Kokkos::subview; @@ -1055,7 +1405,7 @@ unpackAndCombineIntoCrsArrays( int unpack_err = unpackAndCombineIntoCrsArrays2(tgt_colind, tgt_pids, tgt_vals, new_start_row, offsets, import_lids, imports, num_packets_per_lid, - local_matrix, local_col_map, my_pid, num_bytes_per_value); + local_matrix, local_col_map, my_pid, bytes_per_value); TEUCHOS_TEST_FOR_EXCEPTION( unpack_err != 0, std::logic_error, prefix << "unpack loop failed. This " "should never happen. Please report this bug to the Tpetra developers."); @@ -1177,11 +1527,6 @@ unpackCrsMatrixAndCombineNew( "crs_matrix_type::device_type and local_matrix_type::device_type " "must be the same."); - ::Tpetra::Details::ProfilingRegion region_unpack_crs_matrix_and_combine_new( - "Tpetra::Details::unpackAndCombineNew", - "Import/Export" - ); - if (numPacketsPerLID.need_sync_device()) { numPacketsPerLID.sync_device (); } @@ -1315,14 +1660,9 @@ unpackAndCombineWithOwningPIDsCount ( numPacketsPerLID.size (), true, "num_packets_per_lid"); - ::Tpetra::Details::ProfilingRegion unpack_and_combine_with_owning_pids_count( - "Tpetra::Details::unpackAndCombineWithOwningPIDsCount", - "Import/Export" - ); - size_t nonzeros = UnpackAndCombineCrsMatrixImpl::unpackAndCombineWithOwningPIDsCount( + return UnpackAndCombineCrsMatrixImpl::unpackAndCombineWithOwningPIDsCount( local_matrix, permute_from_lids_d, imports_d, num_packets_per_lid_d, numSameIDs); - return nonzeros; } /// \brief unpackAndCombineIntoCrsArrays @@ -1470,10 +1810,10 @@ unpackAndCombineIntoCrsArrays ( create_mirror_view_from_raw_host_array(outputDevice, TargetPids.getRawPtr(), TargetPids.size(), true, "tgt_pids"); - size_t num_bytes_per_value = 0; + size_t bytes_per_value = 0; if (PackTraits::compileTimeSize) { // assume that ST is default constructible - num_bytes_per_value = PackTraits::packValueCount(ST()); + bytes_per_value = PackTraits::packValueCount(ST()); } else { // Since the packed data come from the source matrix, we can use the source @@ -1484,18 +1824,18 @@ unpackAndCombineIntoCrsArrays ( // a Scalar value is. Of course, if no processes have any entries, then no // values should be packed (though this does assume that in our packing // scheme, rows with zero entries take zero bytes). - size_t num_bytes_per_value_l = 0; + size_t bytes_per_value_l = 0; if (local_matrix.values.extent(0) > 0) { const ST& val = local_matrix.values(0); - num_bytes_per_value_l = PackTraits::packValueCount(val); + bytes_per_value_l = PackTraits::packValueCount(val); } else { const ST& val = crs_vals_d(0); - num_bytes_per_value_l = PackTraits::packValueCount(val); + bytes_per_value_l = PackTraits::packValueCount(val); } Teuchos::reduceAll(*(sourceMatrix.getComm()), Teuchos::REDUCE_MAX, - num_bytes_per_value_l, - outArg(num_bytes_per_value)); + bytes_per_value_l, + outArg(bytes_per_value)); } #ifdef HAVE_TPETRA_INST_COMPLEX_DOUBLE @@ -1506,17 +1846,12 @@ unpackAndCombineIntoCrsArrays ( "never happen, since std::complex does not work in Kokkos::View objects."); #endif // HAVE_TPETRA_INST_COMPLEX_DOUBLE - ::Tpetra::Details::ProfilingRegion unpack_and_combine_into_crs_arrays( - "Tpetra::Details::unpackAndCombineIntoCrsArrays", - "Import/Export" - ); - UnpackAndCombineCrsMatrixImpl::unpackAndCombineIntoCrsArrays( local_matrix, local_col_map, import_lids_d, imports_d, num_packets_per_lid_d, permute_to_lids_d, permute_from_lids_d, crs_rowptr_d, crs_colind_d, crs_vals_d, src_pids_d, tgt_pids_d, numSameIDs, TargetNumRows, TargetNumNonzeros, MyTargetPID, - num_bytes_per_value); + bytes_per_value); // Copy outputs back to host typename decltype(crs_rowptr_d)::HostMirror crs_rowptr_h( diff --git a/packages/tpetra/core/test/CrsMatrix/CrsMatrix_PackUnpack.cpp b/packages/tpetra/core/test/CrsMatrix/CrsMatrix_PackUnpack.cpp index a9fde1aa4f66..7dd7a65952ac 100644 --- a/packages/tpetra/core/test/CrsMatrix/CrsMatrix_PackUnpack.cpp +++ b/packages/tpetra/core/test/CrsMatrix/CrsMatrix_PackUnpack.cpp @@ -273,6 +273,12 @@ TEUCHOS_UNIT_TEST_TEMPLATE_4_DECL(CrsMatrix, PackThenUnpackAndCombine, SC, LO, G << "B[" << i << "]=" << B_values[i] << "!\n"; ++curNumErrors; } + else + { + errStrm << "INFO: Proc " << world_rank << ", row " << lclRow + << ", A[" << i << "]=" << A_values[i] << ", and " + << "B[" << i << "]=" << B_values[i] << "!\n"; + } } lclNumErrors += curNumErrors; } @@ -349,6 +355,9 @@ TEUCHOS_UNIT_TEST_TEMPLATE_4_DECL(CrsMatrix, PackThenUnpackAndCombine, SC, LO, G ArrayView B_indices; ArrayView B_values; B->getLocalRowView(loc_row, B_indices, B_values); +// std::cout << "A_values: " << A_values << "\n"; +// std::cout << "B_values: " << B_values << "\n"; +// std::cout << std::flush; TEST_EQUALITY( A_indices.size (), B_indices.size () ); diff --git a/packages/tpetra/core/test/ImportExport/UnpackLongRows.cpp b/packages/tpetra/core/test/ImportExport/UnpackLongRows.cpp index 349423413a12..423184a872f2 100644 --- a/packages/tpetra/core/test/ImportExport/UnpackLongRows.cpp +++ b/packages/tpetra/core/test/ImportExport/UnpackLongRows.cpp @@ -40,6 +40,8 @@ */ #include +#include "Kokkos_Core.hpp" + #include "Tpetra_TestingUtilities.hpp" #include "TpetraCore_ETIHelperMacros.h" @@ -86,10 +88,12 @@ namespace { // anonymous // This test is constructed as such to isolate slow downs in export operations // experienced by Aria when constructing a linear system that has a Poisson // structure with additional dense rows. -template -Teuchos::RCP -generate_crs_matrix( +template +void +generate_graphs( Teuchos::RCP> const &comm, + Teuchos::RCP& owned, + Teuchos::RCP& shared, int const rows_per_rank, int const overlap, int const dense_rows @@ -98,12 +102,9 @@ generate_crs_matrix( using Teuchos::rcp; using Teuchos::RCP; - using Teuchos::Time; using Teuchos::Array; - using Teuchos::TimeMonitor; - using map_type = typename matrix_type::map_type; - using real = typename matrix_type::scalar_type; - using GO = typename matrix_type::global_ordinal_type; + using map_type = typename graph_type::map_type; + using GO = typename graph_type::global_ordinal_type; auto const rank = comm->getRank(); auto const procs = comm->getSize(); @@ -111,15 +112,10 @@ generate_crs_matrix( using gsize_t = Tpetra::global_size_t; const gsize_t global_rows = rows_per_rank * procs + dense_rows; - const real mone = static_cast(-1.0); - const real one = static_cast(1.0); - const real four = static_cast(4.0); - + INFO("GENERATING GRAPHS\n"); // one-to-one map for entries on my rank RCP owned_map; { - RCP