Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smartlink crash fix #1819

Merged
Merged
169 changes: 139 additions & 30 deletions Source/websocket/JSONRPCLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,69 @@ namespace Thunder {
using PendingMap = std::unordered_map<uint32_t, Entry>;
using InvokeFunction = Core::JSONRPC::InvokeFunction;

class Handler {
public:
using HandlerMap = std::unordered_map<string, InvokeFunction>;

public:
Handler():_adminLock(),_invokeMap(){}
Handler(const Handler&) = delete;
Handler(Handler&&) = delete;
Handler& operator=(const Handler&) = delete;
Karthick-Somasundaresan marked this conversation as resolved.
Show resolved Hide resolved
Handler& operator=(Handler&&) = delete;
~Handler() = default;
void Register(const string& methodName, const InvokeFunction& lambda)
{
_adminLock.Lock();
auto retval = _invokeMap.emplace(std::piecewise_construct,
std::make_tuple(methodName),
std::make_tuple(lambda));

if ( retval.second == false ) {
retval.first->second = lambda;
}
_adminLock.Unlock();
}
void Unregister(const string& methodName)
{
_adminLock.Lock();
HandlerMap::iterator index = _invokeMap.find(methodName);

ASSERT((index != _invokeMap.end()) && _T("Do not unregister methods that are not registered!!!"));

if (index != _invokeMap.end()) {
_invokeMap.erase(index);
}
_adminLock.Unlock();
}

uint32_t Invoke(const Core::JSONRPC::Context& context, const string& method, const string& parameters, string& response)
{
uint32_t result = Core::ERROR_UNKNOWN_METHOD;

response.clear();

_adminLock.Lock();
HandlerMap::iterator index = _invokeMap.find(Core::JSONRPC::Message::Method(method));
if (index != _invokeMap.end()) {
result = index->second(context, method, parameters, response);
}
_adminLock.Unlock();
return (result);
}
private:
mutable Core::CriticalSection _adminLock;
HandlerMap _invokeMap;
};

protected:
static constexpr uint32_t DefaultWaitTime = 10000;

LinkType(const string& callsign, const string connectingCallsign, const TCHAR* localCallsign, const string& query)
: _adminLock()
, _connectId(RemoteNodeId())
, _channel(CommunicationChannel::Instance(_connectId, string("/jsonrpc/") + connectingCallsign, query))
, _handler({ DetermineVersion(callsign) })
, _handler()
, _callsign(callsign.empty() ? string() : Core::JSONRPC::Message::Callsign(callsign + '.'))
, _localSpace()
, _pendingQueue()
Expand Down Expand Up @@ -524,10 +579,7 @@ namespace Thunder {
{
return (_callsign);
}
Core::JSONRPC::Handler::EventIterator Events() const
{
return (_handler.Events());
}

template <typename INBOUND, typename METHOD>
void Assign(const string& eventName, const METHOD& method)
{
Expand Down Expand Up @@ -1035,6 +1087,7 @@ namespace Thunder {
std::forward_as_tuple(waitTime, response));
ASSERT(newElement.second == true);


if (newElement.second == true) {
uint64_t expiry = newElement.first->second.Expiry();
_adminLock.Unlock();
Expand All @@ -1053,7 +1106,6 @@ namespace Thunder {

_adminLock.Unlock();
}

return (result);
}
uint32_t Inbound(const Core::ProxyType<Core::JSONRPC::Message>& inbound)
Expand All @@ -1062,6 +1114,7 @@ namespace Thunder {

ASSERT(inbound.IsValid() == true);


if ((inbound->Id.IsSet() == true) && (inbound->Result.IsSet() || inbound->Error.IsSet())) {
// Looks like this is a response..
ASSERT(inbound->Parameters.IsSet() == false);
Expand Down Expand Up @@ -1153,7 +1206,7 @@ namespace Thunder {
Core::CriticalSection _adminLock;
Core::NodeId _connectId;
Core::ProxyType< CommunicationChannel > _channel;
Core::JSONRPC::Handler _handler;
Handler _handler;
string _callsign;
string _localSpace;
PendingMap _pendingQueue;
Expand All @@ -1174,6 +1227,21 @@ namespace Thunder {
class Connection : public LinkType<INTERFACE> {
private:
using Base = LinkType<INTERFACE>;
class EventSubscriber: public Core::Thread {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a thread here ? This will always (per connection) require an 8MB callstack. I thnk you can "reuse" the ResouceMonitor Thread (communication thread) as this is probably just to request a registration ? Or if we need a thread, could we do something to limit it to 1 thread for all Conenction types per process ?

public:
EventSubscriber () = delete;
EventSubscriber(const EventSubscriber&) = delete;
EventSubscriber& operator=(const EventSubscriber&) = delete;
EventSubscriber(Connection& parent):Thread(Thunder::Core::Thread::DefaultStackSize(), _T("SmartLinkTypeEventSubscriber")), _parent(parent){}
~EventSubscriber() = default;
uint32_t Worker() override
{
_parent.SubscribeEvents();
return Core::infinite;
}
private:
Connection& _parent;
};
public:
static constexpr uint32_t DefaultWaitTime = Base::DefaultWaitTime;
private:
Expand Down Expand Up @@ -1253,6 +1321,9 @@ namespace Thunder {
: Base(callsign, string(), localCallsign, query)
, _monitor(string(), false)
, _parent(parent)
, _adminLock()
, _subscriptions()
, _eventSubscriber(*this)
, _state(UNKNOWN)
{
_monitor.template Assign<Statechange>(_T("statechange"), &Connection::state_change, this);
Expand All @@ -1261,30 +1332,77 @@ namespace Thunder {
~Connection() override
{
_monitor.Revoke(_T("statechange"));
_eventSubscriber.Stop();
_eventSubscriber.Wait(Core::Thread::STOPPED, Core::infinite);
}

public:
bool IsActivated()
{
return (_state == ACTIVATED);
}
void SubscribeEvents() {
_adminLock.Lock();
for (const string& iter: _subscriptions) {
SendSubscribeRequest(iter);
}
_adminLock.Unlock();
_eventSubscriber.Block();
_state = state::ACTIVATED;
_parent.StateChange();
}

template <typename INBOUND, typename METHOD>
uint32_t Subscribe(const uint32_t waitTime, const string& eventName, const METHOD& method)
{
auto result = Base::template Subscribe<INBOUND, METHOD>(waitTime, eventName, method);
if (result == Core::ERROR_NONE) {
_adminLock.Lock();
_subscriptions.insert(string(eventName));
_adminLock.Unlock();
}
return result;
}
template <typename INBOUND, typename METHOD, typename REALOBJECT>
uint32_t Subscribe(const uint32_t waitTime, const string& eventName, const METHOD& method, REALOBJECT* objectPtr)
{
auto result = Base::template Subscribe<INBOUND, METHOD, REALOBJECT>(waitTime, eventName, method, objectPtr);
if (result == Core::ERROR_NONE) {
_adminLock.Lock();
_subscriptions.insert(string(eventName));
_adminLock.Unlock();
}
return result;
}
void Unsubscribe(const uint32_t waitTime, const string& eventName)
{
_adminLock.Lock();
ASSERT(_subscriptions.find(eventName) != _subscriptions.end());
auto iter = _subscriptions.erase(eventName);
_adminLock.Unlock();
return Base::Unsubscribe(waitTime, eventName);
}


private:
uint32_t SendSubscribeRequest(const string& eventName) {
uint32_t retVal = Core::ERROR_UNAVAILABLE;
Core::JSONRPC::Message response;
const string parameters("{ \"event\": \"" + eventName + "\", \"id\": \"" + Base::Namespace() + "\"}");
auto result = Base::template Invoke<string>(DefaultWaitTime, "register", parameters, response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oke, synchronous Invoke, so communication thread is not an option :-) Maybe than 1 thread for all processes that have this ?

if (result == Core::ERROR_NONE && response.Error.IsSet() != true) {
retVal = Core::ERROR_NONE;
}
return retVal;

}
void SetState(const JSONRPC::JSONPluginState value)
{
if (value == JSONRPC::JSONPluginState::ACTIVATED) {
if ((_state != ACTIVATED) && (_state != LOADING)) {
_state = state::LOADING;
auto index(Base::Events());
while (index.Next() == true) {
_events.push_back(index.Event());
}
next_event(Core::JSON::String(), nullptr);
}
else if (_state == LOADING) {
_state = state::ACTIVATED;
_parent.StateChange();

_eventSubscriber.Run();

}
}
else if (value == JSONRPC::JSONPluginState::DEACTIVATED) {
Expand Down Expand Up @@ -1315,18 +1433,7 @@ namespace Thunder {
_monitor.template Dispatch<void>(DefaultWaitTime, method, &Connection::monitor_response, this);
}
}
void next_event(const Core::JSON::String& /* parameters */, const Core::JSONRPC::Error* /* result */)
{
// See if there are events pending for registration...
if (_events.empty() == false) {
const string parameters("{ \"event\": \"" + _events.front() + "\", \"id\": \"" + Base::Namespace() + "\"}");
_events.pop_front();
LinkType<INTERFACE>::Dispatch(DefaultWaitTime, _T("register"), parameters, &Connection::next_event, this);
}
else {
SetState(JSONRPC::JSONPluginState::ACTIVATED);
}
}


void Opened() override
{
Expand All @@ -1339,7 +1446,9 @@ namespace Thunder {
private:
LinkType<INTERFACE> _monitor;
SmartLinkType<INTERFACE>& _parent;
std::list<string> _events;
Core::CriticalSection _adminLock;
std::unordered_set<string> _subscriptions;
EventSubscriber _eventSubscriber;
state _state;
};

Expand Down
Loading