Skip to content

Commit

Permalink
Merge pull request #458 from dash-project/feat-dash-asyncatomic
Browse files Browse the repository at this point in the history
Add support for async atomics and fix constness in GlobRefs
  • Loading branch information
devreal authored Mar 6, 2018
2 parents 2e59fa1 + 68bc30c commit 79b6886
Show file tree
Hide file tree
Showing 15 changed files with 1,195 additions and 202 deletions.
28 changes: 28 additions & 0 deletions dart-if/include/dash/dart/if/dart_communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,34 @@ dart_ret_t dart_accumulate(
dart_datatype_t dtype,
dart_operation_t op) DART_NOTHROW;


/**
* Perform an element-wise atomic update on the values pointed to by \c gptr
* by applying the operation \c op with the corresponding value in \c value
* on them.
*
* DART Equivalent to MPI_Accumulate. In contrast to \ref dart_accumulate, this
* function blocks until the local buffer can be reused.
*
* \param gptr A global pointer determining the target of the accumulate
* operation.
* \param values The local buffer holding the elements to accumulate.
* \param nelem The number of local elements to accumulate per unit.
* \param dtype The data type to use in the accumulate operation \c op.
* \param op The accumulation operation to perform.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe_data{team}
* \ingroup DartCommunication
*/
dart_ret_t dart_accumulate_blocking_local(
dart_gptr_t gptr,
const void * values,
size_t nelem,
dart_datatype_t dtype,
dart_operation_t op) DART_NOTHROW;

/**
* Perform an element-wise atomic update on the value of type \c dtype pointed
* to by \c gptr by applying the operation \c op with \c value on it and
Expand Down
115 changes: 103 additions & 12 deletions dart-impl/mpi/src/dart_communication.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,16 +589,13 @@ dart_ret_t dart_accumulate(
dart_datatype_t dtype,
dart_operation_t op)
{
MPI_Datatype mpi_dtype;
MPI_Op mpi_op;
dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid);
uint64_t offset = gptr.addr_or_offs.offset;
int16_t seg_id = gptr.segid;
dart_team_t teamid = gptr.teamid;

CHECK_IS_BASICTYPE(dtype);
mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
mpi_op = dart__mpi__op(op);
MPI_Op mpi_op = dart__mpi__op(op);


dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid);
Expand Down Expand Up @@ -651,24 +648,118 @@ dart_ret_t dart_accumulate(
DART_LOG_TRACE("dart_accumulate: MPI_Accumulate (src %p, size %zu)",
src_ptr, remainder);

CHECK_MPI_RET(
MPI_Accumulate(
MPI_Datatype mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
CHECK_MPI_RET(
MPI_Accumulate(
src_ptr,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
remainder,
mpi_dtype,
mpi_op,
win),
"MPI_Accumulate");
}

DART_LOG_DEBUG("dart_accumulate > finished");
return DART_OK;
}


dart_ret_t dart_accumulate_blocking_local(
dart_gptr_t gptr,
const void * values,
size_t nelem,
dart_datatype_t dtype,
dart_operation_t op)
{
dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid);
uint64_t offset = gptr.addr_or_offs.offset;
int16_t seg_id = gptr.segid;
dart_team_t teamid = gptr.teamid;

CHECK_IS_BASICTYPE(dtype);
MPI_Op mpi_op = dart__mpi__op(op);

dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid);
if (dart__unlikely(team_data == NULL)) {
DART_LOG_ERROR("dart_accumulate ! failed: Unknown team %i!", teamid);
return DART_ERR_INVAL;
}

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d",
nelem, dtype, op, team_unit_id.id);

dart_segment_info_t *seginfo = dart_segment_get_info(
&(team_data->segdata), seg_id);
if (dart__unlikely(seginfo == NULL)) {
DART_LOG_ERROR("dart_accumulate ! "
"Unknown segment %i on team %i", seg_id, teamid);
return DART_ERR_INVAL;
}

MPI_Win win = seginfo->win;
offset += dart_segment_disp(seginfo, team_unit_id);

// chunk up the put
const size_t nchunks = nelem / MAX_CONTIG_ELEMENTS;
const size_t remainder = nelem % MAX_CONTIG_ELEMENTS;
const char * src_ptr = (const char*) values;

MPI_Request reqs[2];
int num_reqs = 0;

if (nchunks > 0) {
DART_LOG_TRACE("dart_accumulate: MPI_Raccumulate (src %p, size %zu)",
src_ptr, nchunks * MAX_CONTIG_ELEMENTS);
CHECK_MPI_RET(
MPI_Raccumulate(
src_ptr,
remainder,
mpi_dtype,
nchunks,
dart__mpi__datatype_maxtype(dtype),
team_unit_id.id,
offset,
remainder,
mpi_dtype,
nchunks,
dart__mpi__datatype_maxtype(dtype),
mpi_op,
win),
"MPI_Accumulate");
win,
&reqs[num_reqs++]),
"MPI_Accumulate");
offset += nchunks * MAX_CONTIG_ELEMENTS;
src_ptr += nchunks * MAX_CONTIG_ELEMENTS;
}

if (remainder > 0) {
DART_LOG_TRACE("dart_accumulate: MPI_Raccumulate (src %p, size %zu)",
src_ptr, remainder);

MPI_Datatype mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
CHECK_MPI_RET(
MPI_Raccumulate(
src_ptr,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
remainder,
mpi_dtype,
mpi_op,
win,
&reqs[num_reqs++]),
"MPI_Accumulate");
}

MPI_Waitall(num_reqs, reqs, MPI_STATUSES_IGNORE);

DART_LOG_DEBUG("dart_accumulate > finished");
return DART_OK;
}


dart_ret_t dart_fetch_and_op(
dart_gptr_t gptr,
const void * value,
Expand Down
10 changes: 5 additions & 5 deletions dash/include/dash/Array.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ class AsyncArrayRef
typedef T * pointer;
typedef const T * const_pointer;

typedef GlobAsyncRef< T> async_reference;
typedef GlobAsyncRef<const T> const_async_reference;
typedef GlobAsyncRef<T> async_reference;
typedef typename GlobAsyncRef<T>::const_type const_async_reference;

public:
typedef std::integral_constant<dim_t, 1>
Expand Down Expand Up @@ -363,7 +363,7 @@ class AsyncArrayRef
* Subscript operator, access to local array element at given position.
*/
constexpr const_async_reference operator[](const size_type n) const {
return async_reference(
return const_async_reference(
(*(_array->begin() + n)).dart_gptr());
}

Expand Down Expand Up @@ -660,8 +660,8 @@ class Array
typedef std::reverse_iterator< iterator> reverse_iterator;
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;

typedef GlobRef< value_type> reference;
typedef GlobRef<const value_type> const_reference;
typedef GlobRef<value_type> reference;
typedef typename GlobRef<value_type>::const_type const_reference;

typedef GlobIter< value_type, PatternType> pointer;
typedef GlobIter<const value_type, PatternType> const_pointer;
Expand Down
3 changes: 2 additions & 1 deletion dash/include/dash/Atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace dash {
* dash::atomic::load(array.lbegin()) // not allowed
* \endcode
* \endnote
*
*
* \code
* dash::Array<dash::Atomic<int>> array(100);
* // supported as Atomic<value_t>(value_t T) is available
Expand Down Expand Up @@ -115,6 +115,7 @@ std::ostream & operator<<(

#include <dash/atomic/Type_traits.h>
#include <dash/atomic/GlobAtomicRef.h>
#include <dash/atomic/GlobAtomicAsyncRef.h>
#include <dash/atomic/Operation.h>

#endif // DASH__ATOMIC_H__INCLUDED
Expand Down
Loading

0 comments on commit 79b6886

Please sign in to comment.