Skip to content

Commit

Permalink
📬 FtlServer event queue model (#102)
Browse files Browse the repository at this point in the history
This change introduces a single-thread event queue model for `FtlServer` in order to minimize issues with deadlocks and out-of-order processing for sequential events.

`FtlServer` now contains definitions for 11 events that can be processed:

```cpp
enum class FtlServerEventKind
{
    Unknown = 0,
    StopStream,                 // Request to stop a specific Channel / Stream ID
    NewControlConnection,       // ConnectionListener has produced a ConnectionTransport
    ControlConnectionClosed,    // FtlControlConnection has closed
    ControlRequestHmacKey,      // Control connection requests HMAC key
    ControlHmacKeyFound,        // HMAC key has been provided for a Control connection
    ControlRequestMediaPort,    // Control connection requests media port
    TerminateControlConnection, // Terminate and remove a Control connection
    StreamIdAssigned,           // StreamStartedCallback has returned a Stream ID
    StreamStarted,              // FtlStream has started successfully
    StreamStartFailed,          // FtlStream has failed to start
    StreamClosed,               // FtlStream has closed
};
```

Each event has an event payload that derives from `FtlServer::FtlServerEvent`.

When `FtlServer` is constructed, a jthread is initialized to process event queue events. When `FtlServer` is destructed, the `stop_token` is set on the jthread, stopping the event queue, and the jthread is joined.

Any public method exposed by `FtlServer` will enqueue an event in the queue to ensure events are processed in the intended order, and to avoid blocking the caller.

Similarly, any outgoing call will be processed on a new jthread via `dispatchAsyncCall` to avoid blocking the event queue. These threads will be tracked by `FtlServer` and joined on destruction of the class or when they finish their task. Return values will be enqueued as a task in the event queue for further processing.
  • Loading branch information
haydenmc authored Mar 10, 2021
1 parent 84455e4 commit c8666a6
Show file tree
Hide file tree
Showing 15 changed files with 798 additions and 265 deletions.
7 changes: 5 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
[submodule "cpp-httplib"]
path = cpp-httplib
path = vendor/cpp-httplib
url = https://github.com/yhirose/cpp-httplib
[submodule "janus-ftl-orchestrator"]
path = janus-ftl-orchestrator
path = vendor/janus-ftl-orchestrator
url = https://github.com/Glimesh/janus-ftl-orchestrator
[submodule "vendor/eventpp"]
path = vendor/eventpp
url = https://github.com/wqking/eventpp.git
5 changes: 3 additions & 2 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ deps = [
]

incdir = include_directories(
'./cpp-httplib',
'./janus-ftl-orchestrator/inc',
'./vendor/cpp-httplib',
'./vendor/janus-ftl-orchestrator/inc',
'./vendor/eventpp/include',
janusincludepath,
is_system: true,
)
Expand Down
2 changes: 1 addition & 1 deletion src/ConnectionListeners/ConnectionListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class ConnectionListener
* has been established
*/
virtual void SetOnNewConnection(
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection) = 0;
std::function<void(ConnectionTransport*)> onNewConnection) = 0;
};
16 changes: 8 additions & 8 deletions src/ConnectionListeners/TcpConnectionListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ void TcpConnectionListener::Listen(std::promise<void>&& readyPromise)
sockaddr_in acceptAddress = { 0 };
socklen_t acceptLen = sizeof(acceptAddress);
getpeername(connectionHandle, reinterpret_cast<sockaddr*>(&acceptAddress), &acceptLen);
// Create a ConnectionTransport for this new connection
auto transport = std::make_unique<NetworkSocketConnectionTransport>(
NetworkSocketConnectionKind::Tcp,
connectionHandle,
acceptAddress);
if (onNewConnection)
{
onNewConnection(std::move(transport));
// Create a ConnectionTransport for this new connection
auto transport = new NetworkSocketConnectionTransport(
NetworkSocketConnectionKind::Tcp,
connectionHandle,
acceptAddress);
onNewConnection(transport);
}
else
{
Expand All @@ -126,11 +126,11 @@ void TcpConnectionListener::Listen(std::promise<void>&& readyPromise)

void TcpConnectionListener::StopListening()
{

// TODO
}

void TcpConnectionListener::SetOnNewConnection(
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection)
std::function<void(ConnectionTransport*)> onNewConnection)
{
this->onNewConnection = onNewConnection;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ConnectionListeners/TcpConnectionListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TcpConnectionListener : public ConnectionListener
void Listen(std::promise<void>&& readyPromise = std::promise<void>()) override;
void StopListening() override;
void SetOnNewConnection(
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection) override;
std::function<void(ConnectionTransport*)> onNewConnection) override;

private:
const int listenPort;
const int socketQueueLimit;
int listenSocketHandle = 0;
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection;
std::function<void(ConnectionTransport*)> onNewConnection;
};
183 changes: 106 additions & 77 deletions src/FtlControlConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,20 @@
#include "FtlControlConnection.h"

#include "ConnectionTransports/ConnectionTransport.h"
#include "FtlServer.h"
#include "Utilities/Util.h"

#include <algorithm>
#include <openssl/hmac.h>

#pragma region Constructor/Destructor
FtlControlConnection::FtlControlConnection(
std::unique_ptr<ConnectionTransport> transport,
RequestKeyCallback onRequestKey,
StartMediaPortCallback onStartMediaPort,
ConnectionClosedCallback onConnectionClosed)
FtlServer* ftlServer,
std::unique_ptr<ConnectionTransport> transport)
:
transport(std::move(transport)),
onRequestKey(onRequestKey),
onStartMediaPort(onStartMediaPort),
onConnectionClosed(onConnectionClosed)
{
ftlServer(ftlServer),
transport(std::move(transport))
{
// Bind to transport events
this->transport->SetOnBytesReceived(std::bind(
&FtlControlConnection::onTransportBytesReceived, this, std::placeholders::_1));
Expand All @@ -39,13 +36,88 @@ ftl_channel_id_t FtlControlConnection::GetChannelId()
return channelId;
}

void FtlControlConnection::SetOnConnectionClosed(ConnectionClosedCallback onConnectionClosed)
std::optional<sockaddr_in> FtlControlConnection::GetAddr()
{
this->onConnectionClosed = onConnectionClosed;
return transport->GetAddr();
}

void FtlControlConnection::SetFtlStream(FtlStream* ftlStream)
{
this->ftlStream = ftlStream;
}
#pragma endregion Getters/setters

#pragma region Public functions
void FtlControlConnection::ProvideHmacKey(const std::vector<std::byte>& hmacKey)
{
if (isAuthenticated)
{
spdlog::error("FtlControlConnection::ProvideHmacKey HMAC key was provided, but connection "
"was already authenticated!");
return;
}
else if (!hmacRequested)
{
spdlog::error("FtlControlConnection::ProvideHmacKey HMAC key was provided, but HMAC key "
"was never requested!");
return;
}

// Calculate
std::byte buffer[512];
uint32_t bufferLength;
HMAC(EVP_sha512(), reinterpret_cast<const unsigned char*>(hmacKey.data()), hmacKey.size(),
reinterpret_cast<const unsigned char*>(hmacPayload.data()), hmacPayload.size(),
reinterpret_cast<unsigned char*>(buffer), &bufferLength);

// Do the hashed values match?
bool match = true;
if (bufferLength != clientHmacHash.size())
{
match = false;
}
else
{
for (unsigned int i = 0; i < bufferLength; ++i)
{
if (clientHmacHash.at(i) != buffer[i])
{
match = false;
break;
}
}
}

if (match)
{
isAuthenticated = true;
writeToTransport(fmt::format("{}\n", FtlResponseCode::FTL_INGEST_RESP_OK));
std::string addrStr = transport->GetAddr().has_value() ?
Util::AddrToString(transport->GetAddr().value().sin_addr) : "UNKNOWN";
spdlog::info("{} authenticated as Channel {} successfully.", addrStr,
channelId);
}
else
{
spdlog::info("Client provided invalid HMAC hash for channel {}, disconnecting...",
channelId);
stopConnection();
return;
}
}

void FtlControlConnection::StartMediaPort(uint16_t mediaPort)
{
if (isStreaming)
{
spdlog::error("Channel {} has been assigned multiple media ports", channelId);
}
isStreaming = true;
spdlog::info("Assigned Channel {} media port {}", channelId, mediaPort);
writeToTransport(fmt::format("{} hi. Use UDP port {}\n", FtlResponseCode::FTL_INGEST_RESP_OK,
mediaPort));
}

Result<void> FtlControlConnection::StartAsync()
{
return transport->StartAsync();
Expand Down Expand Up @@ -109,9 +181,13 @@ void FtlControlConnection::onTransportBytesReceived(const std::vector<std::byte>

void FtlControlConnection::onTransportClosed()
{
if (onConnectionClosed)
if (ftlStream != nullptr)
{
ftlStream->ControlConnectionStopped(this);
}
else if (ftlServer != nullptr)
{
onConnectionClosed(*this);
ftlServer->ControlConnectionStopped(this);
}
}

Expand All @@ -136,9 +212,13 @@ void FtlControlConnection::stopConnection()

// Notify that we've stopped - we will not receive an OnConnectionClosed from the transport
// if we call Stop ourselves
if (onConnectionClosed)
if (ftlStream != nullptr)
{
ftlStream->ControlConnectionStopped(this);
}
else if (ftlServer != nullptr)
{
onConnectionClosed(*this);
ftlServer->ControlConnectionStopped(this);
}
}

Expand Down Expand Up @@ -202,61 +282,20 @@ void FtlControlConnection::processConnectCommand(const std::string& command)
stopConnection();
return;
}
std::vector<std::byte> hmacHash = Util::HexStringToByteArray(hmacHashStr);

// Try to fetch the key for this channel
Result<std::vector<std::byte>> keyResult = onRequestKey(requestedChannelId);
if (keyResult.IsError)
if (hmacRequested)
{
// Couldn't look up the key, so let's close the connection
spdlog::warn("Couldn't look up HMAC key for channel {}: {}", requestedChannelId,
keyResult.ErrorMessage);
spdlog::error("Control connection attempted multiple CONNECT handshakes");
stopConnection();
return;
}
std::vector<std::byte> key = keyResult.Value;

std::byte buffer[512];
uint32_t bufferLength;
HMAC(EVP_sha512(), reinterpret_cast<const unsigned char*>(key.data()), key.size(),
reinterpret_cast<const unsigned char*>(hmacPayload.data()), hmacPayload.size(),
reinterpret_cast<unsigned char*>(buffer), &bufferLength);
// Store the client's hash and requested channel ID
channelId = requestedChannelId;
clientHmacHash = Util::HexStringToByteArray(hmacHashStr);

// Do the hashed values match?
bool match = true;
if (bufferLength != hmacHash.size())
{
match = false;
}
else
{
for (unsigned int i = 0; i < bufferLength; ++i)
{
if (hmacHash.at(i) != buffer[i])
{
match = false;
break;
}
}
}

if (match)
{
isAuthenticated = true;
channelId = requestedChannelId;
writeToTransport(fmt::format("{}\n", FtlResponseCode::FTL_INGEST_RESP_OK));
std::string addrStr = transport->GetAddr().has_value() ?
Util::AddrToString(transport->GetAddr().value().sin_addr) : "UNKNOWN";
spdlog::info("{} authenticated as Channel {} successfully.", addrStr,
requestedChannelId);
}
else
{
spdlog::info("Client provided invalid HMAC hash for channel {}, disconnecting...",
requestedChannelId);
stopConnection();
return;
}
// Let the FtlServer know that we need an hmac key to calculate our own hash!
hmacRequested = true;
ftlServer->ControlConnectionRequestedHmacKey(this, requestedChannelId);
}
else
{
Expand Down Expand Up @@ -429,20 +468,10 @@ void FtlControlConnection::processDotCommand()
}

// HACK: We assume GetAddr() returns a value here.
Result<uint16_t> mediaPortResult = onStartMediaPort(*this, channelId, mediaMetadata,
// Tell the FtlServer we want a media port!
ftlServer->ControlConnectionRequestedMediaPort(this, channelId, mediaMetadata,
transport->GetAddr().value().sin_addr);
if (mediaPortResult.IsError)
{
spdlog::error("Could not assign media port for FTL connection: {}",
mediaPortResult.ErrorMessage);
stopConnection();
return;
}
uint16_t mediaPort = mediaPortResult.Value;
isStreaming = true;
spdlog::info("Assigned Channel {} media port {}", channelId, mediaPort);
writeToTransport(fmt::format("{} hi. Use UDP port {}\n", FtlResponseCode::FTL_INGEST_RESP_OK,
mediaPort));

}

void FtlControlConnection::processPingCommand()
Expand Down
Loading

0 comments on commit c8666a6

Please sign in to comment.