Skip to content

Commit

Permalink
Move Timer to IceInternal, add InlineTimerTask (#2923)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Oct 17, 2024
1 parent ff1bcfa commit b631a08
Show file tree
Hide file tree
Showing 39 changed files with 114 additions and 124 deletions.
2 changes: 1 addition & 1 deletion cpp/include/Ice/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ namespace Ice
friend ICE_API CommunicatorPtr initialize(StringSeq&, const InitializationData&, std::int32_t);
friend ICE_API CommunicatorPtr initialize(const InitializationData&, std::int32_t);
friend ICE_API IceInternal::InstancePtr IceInternal::getInstance(const Ice::CommunicatorPtr&);
friend ICE_API ::Ice::TimerPtr IceInternal::getInstanceTimer(const Ice::CommunicatorPtr&);
friend ICE_API IceInternal::TimerPtr IceInternal::getInstanceTimer(const Ice::CommunicatorPtr&);

const IceInternal::InstancePtr _instance;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/Ice/Initialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ namespace IceInternal
// to be used by modules such as Freeze.
//
ICE_API InstancePtr getInstance(const Ice::CommunicatorPtr&);
ICE_API Ice::TimerPtr getInstanceTimer(const Ice::CommunicatorPtr&);
ICE_API TimerPtr getInstanceTimer(const Ice::CommunicatorPtr&);
}

#endif
2 changes: 1 addition & 1 deletion cpp/include/Ice/OutgoingAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ namespace IceInternal
// correct notified of failures and make sure the retry task is
// correctly canceled when the invocation completes.
//
class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, public Ice::TimerTask
class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, public TimerTask
{
public:
virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0;
Expand Down
21 changes: 20 additions & 1 deletion cpp/include/Ice/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <stdexcept>
#include <thread>

namespace Ice
namespace IceInternal
{
class Timer;
using TimerPtr = std::shared_ptr<Timer>;
Expand All @@ -32,6 +32,18 @@ namespace Ice
};
using TimerTaskPtr = std::shared_ptr<TimerTask>;

// Adapts a function<void()> to a TimerTask.
class InlineTimerTask final : public TimerTask
{
public:
InlineTimerTask(std::function<void()> function) : _function(std::move(function)) {}

void runTimerTask() final { _function(); }

private:
std::function<void()> _function;
};

// The timer class is used to schedule tasks for one-time execution or repeated execution. Tasks are executed by a
// dedicated timer thread sequentially.
class ICE_API Timer
Expand All @@ -43,6 +55,13 @@ namespace Ice
// Destroy the timer and join the timer execution thread. Must not be called from a timer task.
void destroy();

// Schedule a function-task for execution after a given delay.
template<class Rep, class Period>
void schedule(std::function<void()> function, const std::chrono::duration<Rep, Period>& delay)
{
schedule(std::make_shared<InlineTimerTask>(std::move(function)), delay);
}

// Schedule task for execution after a given delay.
template<class Rep, class Period>
void schedule(const TimerTaskPtr& task, const std::chrono::duration<Rep, Period>& delay)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/Instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Instance::Instance(const Ice::CommunicatorPtr& communicator) : _communicator(com

_executor = make_shared<CallbackExecutor>();
_connectionManager = make_shared<ConnectionManager>(_executor);
_timer = make_shared<Ice::Timer>();
_timer = make_shared<IceInternal::Timer>();
_traceLevels = make_shared<TraceLevels>(_communicator);
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/Instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace DataStormI
return _executor;
}

Ice::TimerPtr getTimer() const
IceInternal::TimerPtr getTimer() const
{
assert(_timer);
return _timer;
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace DataStormI
std::optional<DataStormContract::LookupPrx> _lookup;
std::shared_ptr<TraceLevels> _traceLevels;
std::shared_ptr<CallbackExecutor> _executor;
Ice::TimerPtr _timer;
IceInternal::TimerPtr _timer;
std::chrono::milliseconds _retryDelay;
int _retryMultiplier;
int _retryCount;
Expand Down
16 changes: 7 additions & 9 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "Instance.h"
#include "NodeI.h"
#include "NodeSessionI.h"
#include "TimerTaskI.h"
#include "TopicFactoryI.h"
#include "TraceUtil.h"

Expand Down Expand Up @@ -423,15 +422,14 @@ NodeSessionManager::disconnected(optional<NodePrx> node, optional<LookupPrx> loo
else
{
instance->getTimer()->schedule(
make_shared<TimerTaskI>(
[=, this, self = shared_from_this()]
[=, this, self = shared_from_this()]
{
auto instance = _instance.lock();
if (instance)
{
auto instance = _instance.lock();
if (instance)
{
self->connect(lookup, _nodePrx);
}
}),
self->connect(lookup, _nodePrx);
}
},
instance->getRetryDelay(_retryCount++));
}
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "ConnectionManager.h"
#include "Instance.h"
#include "NodeI.h"
#include "TimerTaskI.h"
#include "TopicFactoryI.h"
#include "TopicI.h"
#include "TraceUtil.h"
Expand Down Expand Up @@ -693,7 +692,7 @@ SessionI::retry(optional<NodePrx> node, exception_ptr exception)
<< " (ms) for peer to reconnect";
}

_retryTask = make_shared<TimerTaskI>([self = shared_from_this()] { self->remove(); });
_retryTask = make_shared<IceInternal::InlineTimerTask>([self = shared_from_this()] { self->remove(); });
_instance->getTimer()->schedule(_retryTask, delay);
}
else
Expand Down Expand Up @@ -736,7 +735,8 @@ SessionI::retry(optional<NodePrx> node, exception_ptr exception)
return false;
}

_retryTask = make_shared<TimerTaskI>([node, self = shared_from_this()] { self->reconnect(node); });
_retryTask =
make_shared<IceInternal::InlineTimerTask>([node, self = shared_from_this()] { self->reconnect(node); });
_instance->getTimer()->schedule(_retryTask, delay);
}
return true;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/SessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ namespace DataStormI
bool _destroyed;
int _sessionInstanceId;
int _retryCount;
Ice::TimerTaskPtr _retryTask;
IceInternal::TimerTaskPtr _retryTask;

std::map<std::int64_t, TopicSubscribers> _topics;
std::unique_lock<std::mutex>* _topicLock;
Expand Down
27 changes: 0 additions & 27 deletions cpp/src/DataStorm/TimerTaskI.h

This file was deleted.

2 changes: 1 addition & 1 deletion cpp/src/Ice/ConnectionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace
return nullptr;
}

class StartAcceptor : public Ice::TimerTask, public std::enable_shared_from_this<StartAcceptor>
class StartAcceptor : public TimerTask, public std::enable_shared_from_this<StartAcceptor>
{
public:
StartAcceptor(const IncomingConnectionFactoryPtr& factory, const InstancePtr& instance)
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/Ice/ConnectionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ using namespace IceInternal;

namespace
{
class ConnectTimerTask final : public Ice::TimerTask
class ConnectTimerTask final : public TimerTask
{
public:
ConnectTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}
Expand All @@ -55,7 +55,7 @@ namespace
const weak_ptr<Ice::ConnectionI> _connection;
};

class CloseTimerTask final : public Ice::TimerTask
class CloseTimerTask final : public TimerTask
{
public:
CloseTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}
Expand All @@ -72,7 +72,7 @@ namespace
const weak_ptr<Ice::ConnectionI> _connection;
};

class InactivityTimerTask final : public Ice::TimerTask
class InactivityTimerTask final : public TimerTask
{
public:
InactivityTimerTask(const Ice::ConnectionIPtr& connection) : _connection(connection) {}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/Ice/ConnectionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,13 @@ namespace Ice
const IceInternal::TraceLevelsPtr _traceLevels;
const IceInternal::ThreadPoolPtr _threadPool;

const Ice::TimerPtr _timer;
const IceInternal::TimerPtr _timer;

const std::chrono::seconds _connectTimeout;
const std::chrono::seconds _closeTimeout;
const std::chrono::seconds _inactivityTimeout;

Ice::TimerTaskPtr _inactivityTimerTask;
IceInternal::TimerTaskPtr _inactivityTimerTask;
bool _inactivityTimerTaskScheduled;

std::function<void(ConnectionIPtr)> _connectionStartCompleted;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/Ice/IdleTimeoutTransceiverDecorator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace IceInternal;

namespace
{
class HeartbeatTimerTask final : public Ice::TimerTask
class HeartbeatTimerTask final : public TimerTask
{
public:
HeartbeatTimerTask(const ConnectionIPtr& connection) : _connection(connection) {}
Expand All @@ -31,7 +31,7 @@ namespace
const weak_ptr<ConnectionI> _connection;
};

class IdleCheckTimerTask final : public Ice::TimerTask
class IdleCheckTimerTask final : public TimerTask
{
public:
IdleCheckTimerTask(const ConnectionIPtr& connection, const chrono::seconds& idleTimeout)
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/Ice/IdleTimeoutTransceiverDecorator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace IceInternal
IdleTimeoutTransceiverDecorator(
const TransceiverPtr& decoratee,
const std::chrono::seconds& idleTimeout,
const Ice::TimerPtr& timer)
const IceInternal::TimerPtr& timer)
: _decoratee(decoratee),
_idleTimeout(idleTimeout),
_timer(timer)
Expand Down Expand Up @@ -71,13 +71,13 @@ namespace IceInternal
private:
const TransceiverPtr _decoratee;
const std::chrono::seconds _idleTimeout;
const Ice::TimerPtr _timer;
const IceInternal::TimerPtr _timer;

bool _idleCheckEnabled = false;

// Set by decoratorInit
Ice::TimerTaskPtr _heartbeatTimerTask;
Ice::TimerTaskPtr _idleCheckTimerTask;
IceInternal::TimerTaskPtr _heartbeatTimerTask;
IceInternal::TimerTaskPtr _idleCheckTimerTask;
};
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Ice/Initialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ IceInternal::getInstance(const CommunicatorPtr& communicator)
return communicator->_instance;
}

Ice::TimerPtr
IceInternal::TimerPtr
IceInternal::getInstanceTimer(const CommunicatorPtr& communicator)
{
return communicator->_instance->timer();
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/Ice/Instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,15 @@ namespace IceInternal // Required because ObserverUpdaterI is a friend of Instan
//
// Timer specialization which supports the thread observer
//
class ThreadObserverTimer final : public Ice::Timer
class ThreadObserverTimer final : public IceInternal::Timer
{
public:
ThreadObserverTimer() : _hasObserver(false) {}

void updateObserver(const Ice::Instrumentation::CommunicatorObserverPtr&);

private:
void runTimerTask(const Ice::TimerTaskPtr&) final;
void runTimerTask(const TimerTaskPtr&) final;

std::mutex _mutex;
std::atomic<bool> _hasObserver;
Expand All @@ -210,7 +210,7 @@ ThreadObserverTimer::updateObserver(const Ice::Instrumentation::CommunicatorObse
}

void
ThreadObserverTimer::runTimerTask(const Ice::TimerTaskPtr& task)
ThreadObserverTimer::runTimerTask(const TimerTaskPtr& task)
{
if (_hasObserver)
{
Expand Down Expand Up @@ -443,7 +443,7 @@ IceInternal::Instance::retryQueue()
return _retryQueue;
}

Ice::TimerPtr
IceInternal::TimerPtr
IceInternal::Instance::timer()
{
lock_guard lock(_mutex);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Ice/Instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace IceInternal
EndpointHostResolverPtr endpointHostResolver();
RetryQueuePtr retryQueue();
const std::vector<int>& retryIntervals() const { return _retryIntervals; }
Ice::TimerPtr timer();
IceInternal::TimerPtr timer();
EndpointFactoryManagerPtr endpointFactoryManager() const;
Ice::PluginManagerPtr pluginManager() const;
size_t messageSizeMax() const { return _messageSizeMax; }
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Ice/RetryQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace IceInternal
using OutgoingAsyncBasePtr = std::shared_ptr<OutgoingAsyncBase>;
using ProxyOutgoingAsyncBasePtr = std::shared_ptr<ProxyOutgoingAsyncBase>;

class RetryTask : public Ice::TimerTask, public CancellationHandler, public std::enable_shared_from_this<RetryTask>
class RetryTask : public TimerTask, public CancellationHandler, public std::enable_shared_from_this<RetryTask>
{
public:
RetryTask(const InstancePtr&, const RetryQueuePtr&, const ProxyOutgoingAsyncBasePtr&);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/IceDiscovery/LookupI.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace IceDiscovery
{
class LookupI;

class Request : public Ice::TimerTask
class Request : public IceInternal::TimerTask
{
public:
Request(const LookupIPtr&, int);
Expand Down Expand Up @@ -137,7 +137,7 @@ namespace IceDiscovery
void objectRequestTimedOut(const ObjectRequestPtr&);
void objectRequestException(const ObjectRequestPtr&, std::exception_ptr);

const Ice::TimerPtr& timer() { return _timer; }
const IceInternal::TimerPtr& timer() { return _timer; }

int latencyMultiplier() { return _latencyMultiplier; }

Expand All @@ -150,7 +150,7 @@ namespace IceDiscovery
const int _latencyMultiplier;
const std::string _domainId;

Ice::TimerPtr _timer;
IceInternal::TimerPtr _timer;
bool _warnOnce;

std::map<Ice::Identity, ObjectRequestPtr> _objectRequests;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/IceGrid/Allocatable.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace IceGrid
class Allocatable;
class SessionI;

class AllocationRequest : public Ice::TimerTask, public std::enable_shared_from_this<AllocationRequest>
class AllocationRequest : public IceInternal::TimerTask, public std::enable_shared_from_this<AllocationRequest>
{
public:
virtual ~AllocationRequest() = default;
Expand Down
Loading

0 comments on commit b631a08

Please sign in to comment.