Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Standalone nodes to operate in an orchestrated environment #138

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 63 additions & 30 deletions src/JanusFtl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ JanusFtl::JanusFtl(

initOrchestratorConnection();

initServiceConnection();
initServiceConnections();

ftlServer = std::make_unique<FtlServer>(std::move(ingestControlListener),
std::move(mediaConnectionCreator),
Expand Down Expand Up @@ -271,8 +271,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error)

// If we're an Edge node and there are no more viewers for this channel, we can
// unsubscribe.
if ((configuration->GetNodeKind() == NodeKind::Edge) &&
(watchingStream->GetViewerCount() == 0))
if (orchestrationEnabled && (watchingStream->GetViewerCount() == 0))
{
orchestratorUnsubscribe = true;
}
Expand All @@ -287,7 +286,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error)
// If this was the last pending viewer for this channel, unsubscribe.
if (pendingViewerSessions[channelId].size() == 0)
{
if (configuration->GetNodeKind() == NodeKind::Edge)
if (orchestrationEnabled)
{
orchestratorUnsubscribe = true;
}
Expand All @@ -299,14 +298,16 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error)
if (orchestratorUnsubscribe)
{
// Remove temporary stream key
const auto& edgeServiceConnection =
std::dynamic_pointer_cast<EdgeNodeServiceConnection>(serviceConnection);
if (edgeServiceConnection == nullptr)
const auto& edgeService =
std::dynamic_pointer_cast<EdgeNodeServiceConnection>(edgeServiceConnection);
if (edgeService == nullptr)
{
throw std::runtime_error(
"Unexpected service connection type - expected EdgeNodeServiceConnection.");
}
edgeServiceConnection->ClearStreamKey(channelId);

edgeService->ClearStreamKey(channelId);
orchestratorRelayChannels.erase(channelId);

spdlog::info("Last viewer for channel {} has disconnected - unsubscribing...",
channelId);
Expand All @@ -332,7 +333,21 @@ json_t* JanusFtl::QuerySession(janus_plugin_session* handle)
#pragma region Private methods
Result<std::vector<std::byte>> JanusFtl::ftlServerRequestKey(ftl_channel_id_t channelId)
{
return serviceConnection->GetHmacKey(channelId);
std::unique_lock lock(streamDataMutex);
std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId, lock);
return connection->GetHmacKey(channelId);
}

std::shared_ptr<ServiceConnection> JanusFtl::getServiceConnection(ftl_channel_id_t channelId,
const std::unique_lock<std::shared_mutex>& streamDataLock
)
{
if (orchestratorRelayChannels.count(channelId) > 0) {
// This stream is coming from another ingest
return edgeServiceConnection;
} else {
return serviceConnection;
}
}

Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted(
Expand All @@ -342,7 +357,8 @@ Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted(
std::unique_lock lock(streamDataMutex);

// Attempt to start the stream on the service connection
Result<ftl_stream_id_t> startResult = serviceConnection->StartStream(channelId);
std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId, lock);
Result<ftl_stream_id_t> startResult = connection->StartStream(channelId);
if (startResult.IsError)
{
return Result<FtlServer::StartedStreamInfo>::Error(startResult.ErrorMessage);
Expand Down Expand Up @@ -376,7 +392,7 @@ Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted(
// TODO: Notify viewer sessions

// If we are configured as an Ingest node, notify the Orchestrator that a stream has started.
if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr))
if (orchestrationEnabled && orchestrationClient != nullptr && orchestratorRelayChannels.count(channelId) == 0)
{
spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId,
streamId);
Expand Down Expand Up @@ -410,7 +426,7 @@ void JanusFtl::initVideoDecoders()

void JanusFtl::initOrchestratorConnection()
{
if (configuration->GetNodeKind() != NodeKind::Standalone)
if (configuration->GetOrchestratorHostname().empty() == false)
{
spdlog::info(
"Connecting to Orchestration service @ {}:{}...",
Expand Down Expand Up @@ -445,17 +461,24 @@ void JanusFtl::initOrchestratorConnection()
.RegionCode = configuration->GetOrchestratorRegionCode(),
.Hostname = configuration->GetMyHostname(),
});

orchestrationEnabled = true;
}
}

void JanusFtl::initServiceConnection()
void JanusFtl::initServiceConnections()
{
// If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection
if (configuration->GetNodeKind() == NodeKind::Edge)
// If we are configured to be an standalone or edge node,
// we need to setup an edge service connection
if (configuration->GetNodeKind() == NodeKind::Standalone ||
configuration->GetNodeKind() == NodeKind::Edge)
{
serviceConnection = std::make_shared<EdgeNodeServiceConnection>();
edgeServiceConnection = std::make_shared<EdgeNodeServiceConnection>();
edgeServiceConnection->Init();
}
else

// If we're not an edge, setup a service connection as well
if (configuration->GetNodeKind() != NodeKind::Edge)
{
switch (configuration->GetServiceConnectionKind())
{
Expand All @@ -482,9 +505,9 @@ void JanusFtl::initServiceConnection()
configuration->GetDummyPreviewImagePath());
break;
}
}

serviceConnection->Init();
serviceConnection->Init();
}
}

void JanusFtl::initServiceReportThread()
Expand Down Expand Up @@ -521,6 +544,7 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
ftlServer->GetAllStatsAndKeyframes();
std::unordered_map<ftl_channel_id_t, MediaMetadata> metadataByChannel;
std::unordered_map<ftl_channel_id_t, uint32_t> viewersByChannel;
std::unordered_map<ftl_channel_id_t, std::shared_ptr<ServiceConnection>> servicesByChannel;
std::unique_lock lock(streamDataMutex);
for (const auto& streamInfo : statsAndKeyframes)
{
Expand All @@ -531,6 +555,7 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
}
metadataByChannel.try_emplace(channelId, streams.at(channelId)->GetMetadata());
viewersByChannel.try_emplace(channelId, streams.at(channelId)->GetViewerCount());
servicesByChannel.try_emplace(channelId, getServiceConnection(channelId, lock));
}
lock.unlock();

Expand All @@ -556,7 +581,8 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
}

if ((viewersByChannel.count(channelId) <= 0) ||
(metadataByChannel.count(channelId) <= 0))
(metadataByChannel.count(channelId) <= 0) ||
(servicesByChannel.count(channelId) <= 0))
{
continue;
}
Expand Down Expand Up @@ -603,8 +629,11 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
.videoWidth = videoWidth,
.videoHeight = videoHeight,
};


const std::shared_ptr<ServiceConnection> connection = servicesByChannel.at(channelId);
Result<ServiceConnection::ServiceResponse> updateResult =
serviceConnection->UpdateStreamMetadata(streamId, metadata);
connection->UpdateStreamMetadata(streamId, metadata);
// Check if the request failed, or the service wants to end this stream
if (updateResult.IsError ||
(updateResult.Value == ServiceConnection::ServiceResponse::EndStream))
Expand Down Expand Up @@ -632,7 +661,7 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
{
std::vector<uint8_t> jpegBytes =
videoDecoders.at(keyframe.Codec)->GenerateJpegImage(keyframe.Packets);
serviceConnection->SendJpegPreviewImage(streamId, jpegBytes);
connection->SendJpegPreviewImage(streamId, jpegBytes);
}
catch (const PreviewGenerationFailedException& e)
{
Expand Down Expand Up @@ -677,7 +706,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId,
// TODO: Tell viewers stream is offline.

// If we are configured as an Ingest node, notify the Orchestrator that a stream has ended.
if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr))
if (orchestrationEnabled && (orchestrationClient != nullptr))
{
spdlog::info("Unpublishing channel {} / stream {} from Orchestrator",
stream->GetChannelId(), stream->GetStreamId());
Expand All @@ -694,7 +723,8 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId,
spdlog::info("Stream ended. Channel {} / stream {}",
stream->GetChannelId(), stream->GetStreamId());

serviceConnection->EndStream(streamId);
std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId, streamDataLock);
connection->EndStream(streamId);
streams.erase(channelId);
}

Expand Down Expand Up @@ -749,17 +779,20 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt

// If we're an Edge node and this is a first viewer for a given channel,
// request that this channel be relayed to us.
if ((configuration->GetNodeKind() == NodeKind::Edge) && (pendingViewers == 0))
if (orchestrationEnabled && (pendingViewers == 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking through a potential ambiguity in this codepath -

Consider a scenario where a standalone FTL instance receives a watch request for a channel that has not started streaming yet. The standalone FTL server will subscribe to the channel via orchestrator.

Afterwards, the streamer actually begins streaming, but connects to the same standalone FTL server.

I suspect the orchestrator will fulfill the standalone FTL server's subscription and notify the server to relay the stream to itself.

Since only one stream can exist per channel, the standalone server will probably terminate the stream (or maybe worse, haven't really traced it through all the way).

We'll probably need special behavior to either

1.) Enlighten the orchestrator so it can handle when a standalone server tries to publish a stream on the same channel that it previously has subscribed to

2.) Enlighten the standalone FTL server so it can handle "transitioning" between a pending orchestrator relay and a "direct" stream

Thoughts?

{
// Generate a new stream key for incoming relay of this channel
const auto& edgeServiceConnection =
std::dynamic_pointer_cast<EdgeNodeServiceConnection>(serviceConnection);
if (edgeServiceConnection == nullptr)
const auto& edgeService =
std::dynamic_pointer_cast<EdgeNodeServiceConnection>(edgeServiceConnection);
if (edgeService == nullptr)
{
throw std::runtime_error(
"Unexpected service connection type - expected EdgeNodeServiceConnection.");
}
std::vector<std::byte> streamKey = edgeServiceConnection->ProvisionStreamKey(channelId);

orchestratorRelayChannels.insert(channelId);

std::vector<std::byte> streamKey = edgeService->ProvisionStreamKey(channelId);

// Subscribe for relay of this stream
spdlog::info("First viewer for channel {} - subscribing...",
Expand Down Expand Up @@ -1002,4 +1035,4 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl
return ConnectionResult { .IsSuccess = true };
}
}
#pragma endregion Private methods
#pragma endregion Private methods
9 changes: 8 additions & 1 deletion src/JanusFtl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "JanusSession.h"
#include "JanusStream.h"
#include "ServiceConnections/ServiceConnection.h"
#include "ServiceConnections/EdgeNodeServiceConnection.h"
#include "Utilities/FtlTypes.h"
#include "Utilities/JanssonPtr.h"
#include "Utilities/Result.h"
Expand Down Expand Up @@ -96,8 +97,10 @@ class JanusFtl
janus_callbacks* janusCore;
std::unique_ptr<FtlServer> ftlServer;
std::unique_ptr<Configuration> configuration;
bool orchestrationEnabled = false;
std::shared_ptr<FtlConnection> orchestrationClient;
std::shared_ptr<ServiceConnection> serviceConnection;
std::shared_ptr<EdgeNodeServiceConnection> edgeServiceConnection;
std::unordered_map<VideoCodecKind, std::unique_ptr<VideoDecoder>> videoDecoders;
uint32_t maxAllowedBitsPerSecond = 0;
uint32_t rollingSizeAvgMs = 2000;
Expand All @@ -114,6 +117,7 @@ class JanusFtl
std::unordered_map<ftl_channel_id_t, std::shared_ptr<JanusStream>> streams;
std::unordered_map<janus_plugin_session*, ActiveSession> sessions;
std::unordered_map<ftl_channel_id_t, std::unordered_set<JanusSession*>> pendingViewerSessions;
std::unordered_set<ftl_channel_id_t> orchestratorRelayChannels;

/* Private methods */
// FtlServer Callbacks
Expand All @@ -124,7 +128,7 @@ class JanusFtl
// Initialization
void initVideoDecoders();
void initOrchestratorConnection();
void initServiceConnection();
void initServiceConnections();
void initServiceReportThread();
// Service report thread body
void serviceReportThreadBody(std::promise<void>&& threadEndedPromise);
Expand All @@ -146,4 +150,7 @@ class JanusFtl
ConnectionResult onOrchestratorIntro(ConnectionIntroPayload payload);
ConnectionResult onOrchestratorOutro(ConnectionOutroPayload payload);
ConnectionResult onOrchestratorStreamRelay(ConnectionRelayPayload payload);
// Helper functions
std::shared_ptr<ServiceConnection> getServiceConnection(ftl_channel_id_t channelId,
const std::unique_lock<std::shared_mutex>& streamDataLock);
};
3 changes: 3 additions & 0 deletions src/ServiceConnections/EdgeNodeServiceConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ EdgeNodeServiceConnection::EdgeNodeServiceConnection()
#pragma region Public methods
std::vector<std::byte> EdgeNodeServiceConnection::ProvisionStreamKey(ftl_channel_id_t channelId)
{
spdlog::debug(
"EdgeNodeServiceConnection::ProvisionStreamKey called...");
// If a stream key already exists for the given channel, just return the existing one
if (streamKeys.count(channelId) > 0)
{
Expand All @@ -41,6 +43,7 @@ void EdgeNodeServiceConnection::Init()

Result<std::vector<std::byte>> EdgeNodeServiceConnection::GetHmacKey(ftl_channel_id_t channelId)
{

if (streamKeys.count(channelId) > 0)
{
const auto& key = streamKeys[channelId];
Expand Down
2 changes: 2 additions & 0 deletions src/ServiceConnections/GlimeshServiceConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void GlimeshServiceConnection::Init()

Result<std::vector<std::byte>> GlimeshServiceConnection::GetHmacKey(uint32_t channelId)
{
spdlog::debug("GlimeshServiceConnection::GetHmacKey...");
std::stringstream query;
query << "query { channel(id: \"" << channelId << "\") { hmacKey } }";

Expand Down Expand Up @@ -68,6 +69,7 @@ Result<std::vector<std::byte>> GlimeshServiceConnection::GetHmacKey(uint32_t cha

Result<ftl_stream_id_t> GlimeshServiceConnection::StartStream(ftl_channel_id_t channelId)
{
spdlog::debug("GlimeshServiceConnection::StartStream...");
std::stringstream query;
query << "mutation { startStream(channelId: " << channelId << ") { id } }";

Expand Down