Skip to content

Commit

Permalink
lci pp: fix messages larger than INT_MAX
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Nov 8, 2024
1 parent 6e66cd5 commit 83686d0
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 36 deletions.
18 changes: 8 additions & 10 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
if (tchunk_size <= max_header_size - current_header_size)
{
current_header_size += tchunk_size;
}
Expand Down Expand Up @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
set<pos_numbytes_tchunk>(static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <= max_header_size - current_header_size)
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand All @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_.resize(num_zero_copy_chunks);
for (int j = 0; j < num_zero_copy_chunks; ++j)
{
std::size_t chunk_size =
buffer.transmission_chunks_[j].second;
size_t chunk_size = buffer.transmission_chunks_[j].second;
HPX_ASSERT(iovec.lbuffers[i].length == chunk_size);
buffer.chunks_[j] = serialization::create_pointer_chunk(
iovec.lbuffers[i].address, chunk_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
rcvd_chunks,
locked
};
LCI_comp_t unified_recv(void* address, int length);
LCI_comp_t unified_recv(void* address, size_t length);
return_t receive_transmission_chunks();
return_t receive_data();
return_t receive_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
locked,
};
return_t send_header();
return_t unified_followup_send(void* address, int length);
return_t unified_followup_send(void* address, size_t length);
return_t send_transmission_chunks();
return_t send_data();
return_t send_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci {
std::vector<
typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
iovec.lbuffers[i].address = tchunks.data();
iovec.lbuffers[i].length = tchunks_length;
if (config_t::reg_mem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand Down Expand Up @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci {
}

LCI_comp_t receiver_connection_sendrecv::unified_recv(
void* address, int length)
void* address, size_t length)
{
LCI_comp_t completion =
device_p->completion_manager_p->recv_followup->alloc_completion();
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t mbuffer;
mbuffer.address = address;
Expand Down Expand Up @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_tchunks)
{
auto& tchunks = buffer.transmission_chunks_;
int tchunk_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunk_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length);
state.store(next_state, std::memory_order_release);
Expand All @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_data)
{
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(
buffer.data_.data(), static_cast<int>(buffer.data_.size()));
LCI_comp_t completion =
unified_recv(buffer.data_.data(), buffer.data_.size());
state.store(next_state, std::memory_order_release);
return {false, completion};
}
Expand Down Expand Up @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(chunk_size);

state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand All @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_[idx] =
serialization::create_pointer_chunk(chunk.data(), chunk.size());
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci {
}

sender_connection_sendrecv::return_t
sender_connection_sendrecv::unified_followup_send(void* address, int length)
sender_connection_sendrecv::unified_followup_send(
void* address, size_t length)
{
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t buffer;
buffer.address = address;
Expand Down Expand Up @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci {

std::vector<typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_size = (int) tchunks.size() *
size_t tchunks_size = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
auto ret = unified_followup_send(tchunks.data(), tchunks_size);
Expand Down Expand Up @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci {
{
state.store(
connection_state::locked, std::memory_order_relaxed);
auto ret =
unified_followup_send(const_cast<void*>(chunk.data_.cpos_),
static_cast<int>(chunk.size_));
auto ret = unified_followup_send(
const_cast<void*>(chunk.data_.cpos_), chunk.size_);
if (ret.status == return_status_t::done)
{
++send_chunks_idx;
Expand Down

0 comments on commit 83686d0

Please sign in to comment.