Skip to content

Commit

Permalink
Rework manual Connection close in C++ (#2652)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Aug 21, 2024
1 parent 7dd4676 commit a6d4792
Show file tree
Hide file tree
Showing 66 changed files with 525 additions and 721 deletions.
47 changes: 22 additions & 25 deletions cpp/include/Ice/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,6 @@ namespace Ice
BasedOnProxy
};

/**
* Determines the behavior when manually closing a connection.
*/
enum class ConnectionClose : std::uint8_t
{
/**
* Close the connection immediately without sending a close connection protocol message to the peer and waiting
* for the peer to acknowledge it.
*/
Forcefully,
/**
* Close the connection by notifying the peer but do not wait for pending outgoing invocations to complete. On
* the server side, the connection will not be closed until all incoming invocations have completed.
*/
Gracefully,
/**
* Wait for all pending invocations to complete before closing the connection.
*/
GracefullyWithWait
};

/**
* A collection of HTTP headers.
*/
Expand Down Expand Up @@ -144,11 +123,29 @@ namespace Ice
virtual ~Connection() = 0;

/**
* Manually close the connection using the specified closure mode.
* @param mode Determines how the connection will be closed.
* @see ConnectionClose
* Aborts this connection.
*/
virtual void abort() noexcept = 0;

/**
* Starts a graceful closure of this connection once all outstanding invocations have completed.
* @param response A callback that the implementation calls when the connection is closed gracefully.
* @param exception A callback that the implementation calls when the connection closure failed. Its
* exception_ptr parameter is always non-null and describes the reason for the closure.
* @remarks The response and exception callbacks may be called synchronously (from the calling thread); in
* particular, this occurs when you call close on a connection that is already closed. The implementation always
* calls one of the two callbacks once; it never calls both.
* If closing the connection takes longer than the configured close timeout, the connection is aborted with a
* CloseTimeoutException.
*/
virtual void
close(std::function<void()> response, std::function<void(std::exception_ptr)> exception) noexcept = 0;

/**
* Starts a graceful closure of this connection once all outstanding invocations have completed.
* @return A future that completes then the connection is closed.
*/
virtual void close(ConnectionClose mode) noexcept = 0;
std::future<void> close();

/**
* Create a special proxy that always uses this connection. This can be used for callbacks from a server to a
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Glacier2/RoutingTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Glacier2::RoutingTable::add(const ObjectProxySeq& unfiltered, const Current& cur

if (!_verifier->verify(*prx))
{
current.con->close(ConnectionClose::Forcefully);
current.con->abort();
throw ObjectNotExistException(__FILE__, __LINE__);
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/Glacier2/SessionRouterI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace Glacier2

// Initiate a graceful closure of the connection. Only initiate and graceful because the ultimate caller
// can be the Glacier2 client calling us over _connection.
_connection->close(ConnectionClose::Gracefully);
_connection->close(nullptr, nullptr);
}

private:
Expand Down Expand Up @@ -842,7 +842,7 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi
out << "rejecting request, no session is associated with the connection.\n";
out << "identity: " << identityToString(id);
}
connection->close(ConnectionClose::Forcefully);
connection->abort();
throw ObjectNotExistException(__FILE__, __LINE__);
}
return nullptr;
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/Ice/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,14 @@ Ice::Connection::flushBatchRequestsAsync(CompressBatch compress)
[promise](bool) { promise->set_value(); });
return promise->get_future();
}

future<void>
Ice::Connection::close()
{
auto sharedPromise = make_shared<promise<void>>();
close(
[sharedPromise]() { sharedPromise->set_value(); },
[sharedPromise](exception_ptr closeException) { sharedPromise->set_exception(closeException); });

return sharedPromise->get_future();
}
184 changes: 143 additions & 41 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,43 +468,76 @@ Ice::ConnectionI::destroy(DestructionReason reason)
}

void
Ice::ConnectionI::close(ConnectionClose mode) noexcept
Ice::ConnectionI::abort() noexcept
{
std::unique_lock lock(_mutex);
std::lock_guard lock(_mutex);
setState(
StateClosed,
make_exception_ptr(
ConnectionAbortedException{__FILE__, __LINE__, "connection aborted by the application", true}));
}

if (mode == ConnectionClose::Forcefully)
{
setState(
StateClosed,
make_exception_ptr(
ConnectionAbortedException{__FILE__, __LINE__, "connection aborted by the application", true}));
}
else if (mode == ConnectionClose::Gracefully)
{
setState(
StateClosing,
make_exception_ptr(ConnectionClosedException{
__FILE__,
__LINE__,
"connection closed gracefully by the application",
true}));
}
else
void
Ice::ConnectionI::close(function<void()> response, function<void(std::exception_ptr)> exception) noexcept
{
std::exception_ptr closeException = nullptr;
{
assert(mode == ConnectionClose::GracefullyWithWait);
std::lock_guard lock(_mutex);
if (_state >= StateClosed)
{
closeException = _exception;
assert(closeException);
}
else
{
if (response || exception)
{
_onClosedList.push_back(make_pair(std::move(response), std::move(exception)));
}

//
// Wait until all outstanding requests have been completed.
//
_conditionVariable.wait(lock, [this] { return _asyncRequests.empty(); });
if (_state < StateClosing)
{
if (_asyncRequests.empty())
{
doApplicationClose();
}
else
{
// We'll close the connection when we get the last reply message.
_closeRequested = true;
scheduleCloseTimerTask(); // we don't want to wait forever
}
}
// else nothing to do
}
}

setState(
StateClosing,
make_exception_ptr(ConnectionClosedException{
__FILE__,
__LINE__,
"connection closed gracefully by the application",
true}));
if (closeException) // already closed
{
try
{
rethrow_exception(closeException);
}
catch (const ConnectionClosedException&)
{
response();
}
catch (const CloseConnectionException&)
{
response();
}
catch (const CommunicatorDestroyedException&)
{
response();
}
catch (const ObjectAdapterDeactivatedException&)
{
response();
}
catch (...)
{
exception(closeException);
}
}
}

Expand Down Expand Up @@ -1688,6 +1721,54 @@ Ice::ConnectionI::finish(bool close)
_readStream.clear();
_readStream.b.clear();

if (!_onClosedList.empty())
{
bool success;
try
{
rethrow_exception(_exception);
}
catch (const ConnectionClosedException&)
{
success = true;
}
catch (const CloseConnectionException&)
{
success = true;
}
catch (const CommunicatorDestroyedException&)
{
success = true;
}
catch (const ObjectAdapterDeactivatedException&)
{
success = true;
}
catch (...)
{
success = false;
}

for (auto& pair : _onClosedList)
{
if (success)
{
if (pair.first)
{
pair.first();
}
}
else
{
if (pair.second)
{
pair.second(_exception);
}
}
}
_onClosedList.clear(); // break potential cycles
}

if (_closeCallback)
{
closeCallback(_closeCallback);
Expand Down Expand Up @@ -2177,10 +2258,7 @@ Ice::ConnectionI::initiateShutdown()
os.write(static_cast<uint8_t>(1)); // compression status: compression supported but not used.
os.write(headerSize); // Message size.

if (_closeTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}
scheduleCloseTimerTask();

OutgoingMessage message(&os, false);
if (sendMessage(message) & AsyncStatusSent)
Expand Down Expand Up @@ -3098,10 +3176,7 @@ Ice::ConnectionI::parseMessage(int32_t& upcallCount, function<bool(InputStream&)
SocketOperation op = _transceiver->closing(false, _exception);
if (op)
{
if (_closeTimeout > chrono::seconds::zero())
{
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}
scheduleCloseTimerTask();
return op;
}
setState(StateClosed);
Expand Down Expand Up @@ -3245,7 +3320,10 @@ Ice::ConnectionI::parseMessage(int32_t& upcallCount, function<bool(InputStream&)
};
++upcallCount;
}
_conditionVariable.notify_all(); // Notify threads blocked in close(false)
if (_closeRequested && _state < StateClosing && _asyncRequests.empty())
{
doApplicationClose();
}
}

break;
Expand Down Expand Up @@ -3440,3 +3518,27 @@ ConnectionI::cancelInactivityTimerTask()
_timer->cancel(_inactivityTimerTask);
}
}

void
ConnectionI::scheduleCloseTimerTask()
{
// Called with the ConnectionI mutex locked.

if (_closeTimeout > chrono::seconds::zero())
{
// We schedule a new task every time this function is called.
_timer->schedule(make_shared<CloseTimerTask>(shared_from_this()), _closeTimeout);
}
}

void
ConnectionI::doApplicationClose() noexcept
{
// Called with the ConnectionI mutex locked.
assert(_state < StateClosing);

setState(
StateClosing,
make_exception_ptr(
ConnectionClosedException{__FILE__, __LINE__, "connection closed gracefully by the application", true}));
}
13 changes: 12 additions & 1 deletion cpp/src/Ice/ConnectionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <chrono>
#include <condition_variable>
#include <deque>
#include <list>
#include <mutex>

#ifndef ICE_HAS_BZIP2
Expand Down Expand Up @@ -140,7 +141,9 @@ namespace Ice
void activate();
void hold();
void destroy(DestructionReason);
void close(ConnectionClose) noexcept final; // From Connection.

void abort() noexcept final;
void close(std::function<void()> response, std::function<void(std::exception_ptr)> exception) noexcept final;

bool isActiveOrHolding() const;
bool isFinished() const;
Expand Down Expand Up @@ -322,6 +325,9 @@ namespace Ice
void scheduleInactivityTimerTask();
void cancelInactivityTimerTask();

void scheduleCloseTimerTask();
void doApplicationClose() noexcept;

Ice::CommunicatorPtr _communicator;
const IceInternal::InstancePtr _instance;
const IceInternal::TransceiverPtr _transceiver;
Expand Down Expand Up @@ -397,7 +403,12 @@ namespace Ice
bool _initialized;
bool _validated;

// When true, the application called close and Connection must close the connection when it receives the reply
// for the last outstanding invocation.
bool _closeRequested = false;

CloseCallback _closeCallback;
std::list<std::pair<std::function<void()>, std::function<void(std::exception_ptr)>>> _onClosedList;

mutable std::mutex _mutex;
mutable std::condition_variable _conditionVariable;
Expand Down
Loading

0 comments on commit a6d4792

Please sign in to comment.