Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hard-coded deadline for reflection stream. #150

Merged
merged 8 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,9 @@ namespace cli
}
}

//before calling the RPC, close the DescDb connection with a timeout.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline);
if (not dbDescStatus.ok())
{
std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED)
{
std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl;
}
return -1;
}
//before calling the RPC, close the DescDb connection with a default timeout. We still continue with rpc call
//but remove the cache file if the stream was not closed gracefully so it fetches the reflection data again next time.
ConnectionManager::getInstance().closeDescDbStream(serverAddress);

grpc::testing::CliCall call(channel, methodStr, clientMetadata, deadline);

Expand Down
7 changes: 3 additions & 4 deletions src/libCli/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,16 @@ namespace cli
return m_connections[f_serverAddress].descPool;
}

grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ConnectionManager::closeDescDbStream(std::string f_serverAddress)
{
if (m_connections[f_serverAddress].descDbProxy == nullptr)
{
std::cerr << "Error: Unable to close DescDb connection!" << std::endl;
return grpc::Status( grpc::StatusCode::ABORTED, "descDbProxy has not been initialized.");
}

//if proxy exists close the stream with a deadline.
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline);
//if proxy exists close the stream.
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream();

//delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy.
m_connections[f_serverAddress].descDbProxy.reset();
Expand Down
6 changes: 2 additions & 4 deletions src/libCli/libCli/ConnectionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ namespace cli
/// @returns the gRpc DescriptorPool of the corresponding server address.
std::shared_ptr<grpc::protobuf::DescriptorPool> getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree);

/// @brief closes the DescDb stream with a given deadline.
/// @brief closes the DescDb stream with a default deadline of 20 seconds.
/// @param f_serverAddress server addresss to lookup the assigned DescDbProxy.
/// @param deadline optional dealine for closing the stream.
/// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address,
/// otherwise grpc status as a result of stream closure.
grpc::Status closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream(std::string f_serverAddress);

private:
ConnectionManager() {}
Expand Down
27 changes: 22 additions & 5 deletions src/libLocalDescriptorCache/DescDbProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,21 +372,29 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress)
}
}

grpc::Status DescDbProxy::closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status DescDbProxy::closeDescDbStream()
{
::grpc::Status status;
if ( m_reflectionDescDb == nullptr )
{
return grpc::Status::OK;
return status;
}
return m_reflectionDescDb->closeStreamWithDeadline(deadline);
status = m_reflectionDescDb->closeDescDbStream();
if(not status.ok())
{
//failure to close stream leads to invalid cache,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good thought :)

//removing it here so it will be written again next time.
std::string cacheFilePath = prepareCacheFile();
std::filesystem::remove(cacheFilePath);
}
return status;
}

DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel,
ArgParse::ParsedElement &parseTree)
{
m_channel = channel;
m_parseTree = parseTree;
m_disableCache = disableCache;
if(disableCache)
{
// Get Desc directly via reflection and without touching localDB
Expand All @@ -410,4 +418,13 @@ DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std:
}
}

DescDbProxy::~DescDbProxy(){}
DescDbProxy::~DescDbProxy()
{
//This call is a noop if desc db stream is already closed.
//There are two scenarios when we close the stream here.
//i) Stream failed to close within a timeout.
// Our rpc may still succeed but this invalidates the cache so we remove the cache file,
// to repopulate desc db on next rpc.
//íi) Stream successfully closed, leave the cache as it is.
closeDescDbStream();
}
5 changes: 2 additions & 3 deletions src/libLocalDescriptorCache/DescDbProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
/// @param hostAdress Address to the current host
void getDescriptors(const std::string &hostAddress);

/// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the DescDb stream.
/// @brief close the DescDb stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the DescDb stream.
grpc::Status closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel, ArgParse::ParsedElement &parseTree);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
// Provide a list of full names of registered services
bool GetServices(std::vector<grpc::string>* output);

/// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the reflection stream.
/// @brief close the reflection stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the reflection stream.
grpc::Status closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

private:
typedef ClientReaderWriter<
Expand Down
14 changes: 9 additions & 5 deletions third_party/gRPC_utils/proto_reflection_descriptor_database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using grpc::reflection::v1alpha::ServerReflection;
using grpc::reflection::v1alpha::ServerReflectionRequest;
using grpc::reflection::v1alpha::ServerReflectionResponse;

const uint8_t g_timeoutGrpcMainStreamSeconds = 20; //using default gwhisper timeout of 20 seconds.
Copy link
Author

@abb3r abb3r Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased since we already have default rpc timeout of 30secs anyway, is that too big? we saw that 10secs easily times out on slower systems.

namespace grpc {

ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
Expand Down Expand Up @@ -300,6 +301,9 @@ void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
const std::shared_ptr<ProtoReflectionDescriptorDatabase::ClientStream>
ProtoReflectionDescriptorDatabase::GetStream() {
if (!stream_) {
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(g_timeoutGrpcMainStreamSeconds);
ctx_.set_deadline(deadline);
stream_ = stub_->ServerReflectionInfo(&ctx_);
}
return stream_;
Expand All @@ -317,16 +321,13 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest(
return success;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ProtoReflectionDescriptorDatabase::closeDescDbStream()
{
stream_mutex_.lock();
if( deadline != std::nullopt )
{
ctx_.set_deadline(deadline.value());
}

auto status = closeStream();
stream_.reset();

stream_mutex_.unlock();
return status;
}
Expand All @@ -342,6 +343,9 @@ grpc::Status ProtoReflectionDescriptorDatabase::closeStream()
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else if (status.error_code() == StatusCode::DEADLINE_EXCEEDED) {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Grpc Server failed to close the stream within %d seconds.\n", g_timeoutGrpcMainStreamSeconds);
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
Expand Down
Loading