Skip to content

Commit

Permalink
Fix more build warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Nov 29, 2024
1 parent ba26e7e commit c1174a5
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 48 deletions.
3 changes: 1 addition & 2 deletions cpp/include/ucxx/request_tag_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,8 @@ class RequestTagMulti : public Request {
* may still verify each of the underlying requests individually.
*
* @param[in] status the status of the request being completed.
* @param[in] request the `ucxx::BufferRequest` object containing a single tag .
*/
void markCompleted(ucs_status_t status, RequestCallbackUserData request);
void markCompleted(ucs_status_t status);

/**
* @brief Callback to submit request to receive new header or frames.
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/delayed_submission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ GenericDelayedSubmissionCollection::GenericDelayedSubmissionCollection(const std
{
}

void GenericDelayedSubmissionCollection::scheduleLog(ItemIdType id,
DelayedSubmissionCallbackType item)
void GenericDelayedSubmissionCollection::scheduleLog(ItemIdType id, DelayedSubmissionCallbackType)
{
ucxx_trace_req("Registered %s [%lu]", _name.c_str(), id);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ std::shared_ptr<Request> Endpoint::close(const bool enablePythonFuture,
bool force = _endpointErrorHandling;

auto combineCallbacksFunction = [this, &callbackFunction, &callbackData](
ucs_status_t status, EndpointCloseCallbackUserData unused) {
ucs_status_t status,
EndpointCloseCallbackUserData /* callbackData */) {
_status = status;
if (callbackFunction) callbackFunction(status, callbackData);
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/internal/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void RecvAmMessage::setUcpRequest(void* request) { _request->_request = request;
void RecvAmMessage::callback(void* request, ucs_status_t status)
{
std::visit(data::dispatch{
[this, request, status](data::AmReceive amReceive) {
[this, request, status](data::AmReceive) {
_request->callback(request, status);
{
std::lock_guard<std::mutex> lock(_amData->_mutex);
Expand Down
14 changes: 12 additions & 2 deletions cpp/src/remote_key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ SerializedRemoteKey RemoteKey::serialize() const
{
std::stringstream ss;

if (_packedRemoteKeySize > std::numeric_limits<std::streamsize>::max())
// We should never have a remote key this big, but just in case.
throw std::overflow_error("Remote key is too large to deserialize");

ss.write(reinterpret_cast<char const*>(&_packedRemoteKeySize), sizeof(_packedRemoteKeySize));
ss.write(reinterpret_cast<char const*>(_packedRemoteKey), _packedRemoteKeySize);
ss.write(reinterpret_cast<char const*>(_packedRemoteKey),
static_cast<std::streamsize>(_packedRemoteKeySize));
ss.write(reinterpret_cast<char const*>(&_memoryBaseAddress), sizeof(_memoryBaseAddress));
ss.write(reinterpret_cast<char const*>(&_memorySize), sizeof(_memorySize));

Expand Down Expand Up @@ -133,7 +138,12 @@ void RemoteKey::deserialize(const SerializedRemoteKey& serializedRemoteKey)
_packedRemoteKeyVector = std::vector<char>(_packedRemoteKeySize);
_packedRemoteKey = _packedRemoteKeyVector.data();

ss.read(reinterpret_cast<char*>(_packedRemoteKey), _packedRemoteKeySize);
if (_packedRemoteKeySize > std::numeric_limits<std::streamsize>::max())
// We should never have a remote key this big, but just in case.
throw std::overflow_error("Remote key is too large to deserialize");

ss.read(reinterpret_cast<char*>(_packedRemoteKey),
static_cast<std::streamsize>(_packedRemoteKeySize));
ss.read(reinterpret_cast<char*>(&_memoryBaseAddress), sizeof(_memoryBaseAddress));
ss.read(reinterpret_cast<char*>(&_memorySize), sizeof(_memorySize));
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ RequestAm::RequestAm(std::shared_ptr<Component> endpointOrWorker,
callbackData)
{
std::visit(data::dispatch{
[this](data::AmSend amSend) {
[this](data::AmSend) {
if (_endpoint == nullptr)
throw ucxx::Error("An endpoint is required to send active messages");
},
[](data::AmReceive amReceive) {},
[](data::AmReceive) {},
},
requestData);
}
Expand Down Expand Up @@ -183,7 +183,7 @@ static void _amSendCallback(void* request, ucs_status_t status, void* user_data)

static void _recvCompletedCallback(void* request,
ucs_status_t status,
size_t length,
size_t /* length */,
void* user_data)
{
internal::RecvAmMessage* recvAmMessage = static_cast<internal::RecvAmMessage*>(user_data);
Expand Down Expand Up @@ -415,7 +415,7 @@ void RequestAm::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::AmSend amSend) {
[this](data::AmSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/request_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ RequestMem::RequestMem(std::shared_ptr<Endpoint> endpoint,
callbackData)
{
std::visit(data::dispatch{
[this](data::MemPut memPut) {
[this](data::MemPut) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to send memory messages.");
},
[this](data::MemGet memGet) {
[this](data::MemGet) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to receive memory messages.");
},
Expand Down Expand Up @@ -126,15 +126,15 @@ void RequestMem::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::MemPut memPut) {
[this](data::MemPut) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::MemGet memGet) {
[this](data::MemGet) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/request_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ RequestStream::RequestStream(std::shared_ptr<Endpoint> endpoint,
: Request(endpoint, data::getRequestData(requestData), operationName, enablePythonFuture)
{
std::visit(data::dispatch{
[this](data::StreamSend streamSend) {
[this](data::StreamSend) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to send stream messages.");
},
[this](data::StreamReceive streamReceive) {
[this](data::StreamReceive) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to receive stream messages.");
},
Expand Down Expand Up @@ -96,15 +96,15 @@ void RequestStream::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::StreamSend streamSend) {
[this](data::StreamSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::StreamReceive streamReceive) {
[this](data::StreamReceive) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Worker was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down Expand Up @@ -162,10 +162,13 @@ void RequestStream::callback(void* request, ucs_status_t status, size_t length)

if (status == UCS_ERR_MESSAGE_TRUNCATED) {
const char* fmt = "length mismatch: %llu (got) != %llu (expected)";
size_t len = std::snprintf(nullptr, 0, fmt, length, streamReceive._length);
_status_msg = std::string(len + 1, '\0'); // +1 for null terminator
std::snprintf(
_status_msg.data(), _status_msg.size(), fmt, length, streamReceive._length);
int charsLen = std::snprintf(nullptr, 0, fmt, length, streamReceive._length);
if (charsLen > 0) {
_status_msg = std::string(static_cast<size_t>(charsLen) + 1,
'\0'); // +1 for null terminator
std::snprintf(
_status_msg.data(), _status_msg.size(), fmt, length, streamReceive._length);
}
}

Request::callback(request, status);
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/request_tag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ RequestTag::RequestTag(std::shared_ptr<Component> endpointOrWorker,
callbackData)
{
std::visit(data::dispatch{
[this](data::TagSend tagSend) {
[this](data::TagSend) {
if (_endpoint == nullptr)
throw ucxx::Error("An endpoint is required to send tag messages");
},
[](data::TagReceive tagReceive) {},
[](data::TagReceive) {},
},
requestData);
}

void RequestTag::callback(void* request, ucs_status_t status, const ucp_tag_recv_info_t* info)
void RequestTag::callback(void* request, ucs_status_t status, const ucp_tag_recv_info_t* /* info */)
{
// TODO: Decide on behavior. See https://github.com/rapidsai/ucxx/issues/104 .
// if (status != UCS_ERR_CANCELED && info->length != _length) {
Expand Down Expand Up @@ -143,15 +143,15 @@ void RequestTag::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::TagSend tagSend) {
[this](data::TagSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::TagReceive tagReceive) {
[this](data::TagReceive) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Worker was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
24 changes: 12 additions & 12 deletions cpp/src/request_tag_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ void RequestTagMulti::recvFrames()
tagPair.first,
tagPair.second,
false,
[this](ucs_status_t status, RequestCallbackUserData arg) {
return this->markCompleted(status, arg);
[this](ucs_status_t status, RequestCallbackUserData /* callbackData */) {
return this->markCompleted(status);
},
bufferRequest);
bufferRequest->buffer = buf;
Expand All @@ -167,7 +167,7 @@ void RequestTagMulti::recvFrames()
_isFilled);
};

void RequestTagMulti::markCompleted(ucs_status_t status, RequestCallbackUserData request)
void RequestTagMulti::markCompleted(ucs_status_t status)
{
/**
* Prevent reference count to self from going to zero and thus cause self to be destroyed
Expand Down Expand Up @@ -256,7 +256,7 @@ void RequestTagMulti::recvHeader()
tagPair.first,
tagPair.second,
false,
[this](ucs_status_t status, RequestCallbackUserData arg) {
[this](ucs_status_t status, RequestCallbackUserData /* callbackData */) {
return this->recvCallback(status);
});

Expand Down Expand Up @@ -348,14 +348,14 @@ void RequestTagMulti::send()
for (size_t i = 0; i < _totalFrames; ++i) {
auto bufferRequest = std::make_shared<BufferRequest>();
_bufferRequests.push_back(bufferRequest);
bufferRequest->request =
_endpoint->tagSend(tagMultiSend._buffer[i],
tagMultiSend._length[i],
tagMultiSend._tag,
false,
[this](ucs_status_t status, RequestCallbackUserData arg) {
return this->markCompleted(status, arg);
});
bufferRequest->request = _endpoint->tagSend(
tagMultiSend._buffer[i],
tagMultiSend._length[i],
tagMultiSend._tag,
false,
[this](ucs_status_t status, RequestCallbackUserData /* callbackData */) {
return this->markCompleted(status);
});
}

_isFilled = true;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/utils/file_descriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ FILE* createTextFileDescriptor()

std::string decodeTextFileDescriptor(FILE* textFileDescriptor)
{
size_t size;

rewind(textFileDescriptor);
fseek(textFileDescriptor, 0, SEEK_END);
size = ftell(textFileDescriptor);
int64_t pos = ftell(textFileDescriptor);
if (pos == -1L) throw std::ios_base::failure("ftell() failed");
size_t size = static_cast<size_t>(pos);
rewind(textFileDescriptor);

std::string textString(size, '\0');
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Worker::Worker(std::shared_ptr<Context> context,

static void _drainCallback(void* request,
ucs_status_t status,
const ucp_tag_recv_info_t* info,
void* arg)
const ucp_tag_recv_info_t* /* info */,
void* /* arg */)
{
*reinterpret_cast<ucs_status_t*>(request) = status;
}
Expand Down Expand Up @@ -426,7 +426,7 @@ void Worker::clearFuturesPool() { THROW_FUTURE_NOT_IMPLEMENTED(); }

std::shared_ptr<Future> Worker::getFuture() { THROW_FUTURE_NOT_IMPLEMENTED(); }

RequestNotifierWaitState Worker::waitRequestNotifier(uint64_t periodNs)
RequestNotifierWaitState Worker::waitRequestNotifier(uint64_t /* periodNs */)
{
THROW_FUTURE_NOT_IMPLEMENTED();
}
Expand Down Expand Up @@ -594,7 +594,7 @@ bool Worker::tagProbe(const Tag tag)
}

ucp_tag_recv_info_t info;
ucp_tag_message_h tag_message = ucp_tag_probe_nb(_handle, tag, -1, 0, &info);
ucp_tag_message_h tag_message = ucp_tag_probe_nb(_handle, tag, TagMaskFull, 0, &info);

return tag_message != NULL;
}
Expand Down

0 comments on commit c1174a5

Please sign in to comment.