From 223113875a75e940ec57fb1449e1dbb4f9887c79 Mon Sep 17 00:00:00 2001 From: Daniel Stiner Date: Mon, 15 Mar 2021 12:41:47 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A6=20Hand=20RTP=20packets=20directly?= =?UTF-8?q?=20to=20JanusStream=20(#106)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Going back and doing ebefe001a1f6a42cac2c1c8f7cf59db18fe50897 the right way this time. This shortens the path for RTP packets to only require a single lock at the stream level. --- meson.build | 3 +- src/FtlControlConnection.h | 1 - src/FtlServer.cpp | 29 +++--- src/FtlServer.h | 13 ++- src/FtlStream.cpp | 2 +- src/FtlStream.h | 3 +- src/JanusFtl.cpp | 207 +++++++++++++------------------------ src/JanusFtl.h | 26 +---- src/JanusStream.cpp | 125 ++++++++++++++++++++++ src/JanusStream.h | 59 +++++++++++ src/RtpPacketSink.h | 19 ++++ 11 files changed, 306 insertions(+), 181 deletions(-) create mode 100644 src/JanusStream.cpp create mode 100644 src/JanusStream.h create mode 100644 src/RtpPacketSink.h diff --git a/meson.build b/meson.build index 02fb0ed..d2b3fba 100644 --- a/meson.build +++ b/meson.build @@ -47,8 +47,9 @@ sources = files([ 'src/FtlControlConnection.cpp', 'src/FtlServer.cpp', 'src/FtlStream.cpp', - 'src/JanusSession.cpp', 'src/JanusFtl.cpp', + 'src/JanusSession.cpp', + 'src/JanusStream.cpp', # Library entrypoint 'src/entrypoint.cpp', ]) diff --git a/src/FtlControlConnection.h b/src/FtlControlConnection.h index daef5b7..04f1d7f 100644 --- a/src/FtlControlConnection.h +++ b/src/FtlControlConnection.h @@ -80,7 +80,6 @@ class FtlControlConnection FtlStream* ftlStream = nullptr; bool hmacRequested = false; bool isAuthenticated = false; - bool mediaPortRequested = false; bool isStreaming = false; ftl_channel_id_t channelId = 0; std::vector hmacPayload; diff --git a/src/FtlServer.cpp b/src/FtlServer.cpp index d9365c3..52c239e 100644 --- a/src/FtlServer.cpp +++ b/src/FtlServer.cpp @@ -20,7 +20,6 @@ FtlServer::FtlServer( RequestKeyCallback onRequestKey, StreamStartedCallback onStreamStarted, StreamEndedCallback onStreamEnded, - RtpPacketCallback onRtpPacket, uint16_t minMediaPort, uint16_t maxMediaPort) : @@ -29,7 +28,6 @@ FtlServer::FtlServer( onRequestKey(onRequestKey), onStreamStarted(onStreamStarted), onStreamEnded(onStreamEnded), - onRtpPacket(onRtpPacket), minMediaPort(minMediaPort), maxMediaPort(maxMediaPort), eventQueueThread(std::jthread(&FtlServer::eventQueueThreadBody, this)) @@ -325,12 +323,6 @@ void FtlServer::onStreamClosed(FtlStream* stream) })); } -void FtlServer::onStreamRtpPacket(ftl_channel_id_t channelId, ftl_stream_id_t streamId, - const std::vector& packet) -{ - onRtpPacket(channelId, streamId, packet); -} - void FtlServer::eventStopStream(std::shared_ptr event) { spdlog::debug("FtlServer::eventStopStream processing StopStream event..."); @@ -462,7 +454,7 @@ void FtlServer::eventTerminateControlConnection( std::move(pendingControlConnections.at(event->Connection).first); pendingControlConnections.erase(event->Connection); dispatchAsyncCall( - [this, event, control = std::move(control)]() mutable + [event, control = std::move(control)]() mutable { control->Stop(event->ResponseCode); }); @@ -480,13 +472,13 @@ void FtlServer::eventControlRequestMediaPort( [this, event]() { // Attempt to start stream - Result streamIdResult = + Result streamStartResult = onStreamStarted(event->ChannelId, event->Metadata); - if (streamIdResult.IsError) + if (streamStartResult.IsError) { spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread " "error starting stream - queueing TerminateControlConnection event: {}", - streamIdResult.ErrorMessage); + streamStartResult.ErrorMessage); eventQueue.enqueue(FtlServerEventKind::TerminateControlConnection, std::shared_ptr( new FtlServerTerminateControlConnectionEvent @@ -500,7 +492,8 @@ void FtlServer::eventControlRequestMediaPort( { spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread " "successfully received Stream ID - queueing StreamIdAssigned event..."); - ftl_stream_id_t streamId = streamIdResult.Value; + ftl_stream_id_t streamId = streamStartResult.Value.StreamId; + std::shared_ptr packetSink = streamStartResult.Value.PacketSink; eventQueue.enqueue(FtlServerEventKind::StreamIdAssigned, std::shared_ptr( new FtlServerStreamIdAssignedEvent @@ -510,6 +503,7 @@ void FtlServer::eventControlRequestMediaPort( .StreamId = streamId, .Metadata = event->Metadata, .TargetAddr = event->TargetAddr, + .PacketSink = packetSink, })); } }); @@ -554,18 +548,21 @@ void FtlServer::eventStreamIdAssigned(std::shared_ptr rtpPacketSink = event->PacketSink; // Attempt to fire up the new FtlStream. Queue a new event when we're done. dispatchAsyncCall( - [this, event, control = std::move(control), mediaPort]() mutable + [this, event, control = std::move(control), mediaPort, rtpPacketSink]() mutable { std::unique_ptr mediaTransport = mediaConnectionCreator->CreateConnection(mediaPort, event->TargetAddr); auto stream = std::make_shared( std::move(control), std::move(mediaTransport), event->Metadata, event->StreamId, std::bind(&FtlServer::onStreamClosed, this, std::placeholders::_1), - std::bind(&FtlServer::onStreamRtpPacket, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + [rtpPacketSink](const std::vector packet) + { + rtpPacketSink->SendRtpPacket(packet); + }); Result streamStartResult = stream->StartAsync(mediaPort); if (streamStartResult.IsError) diff --git a/src/FtlServer.h b/src/FtlServer.h index fba1ab1..6cb0915 100644 --- a/src/FtlServer.h +++ b/src/FtlServer.h @@ -9,6 +9,7 @@ #include "FtlControlConnection.h" #include "FtlStream.h" +#include "RtpPacketSink.h" #include "Utilities/FtlTypes.h" #include "Utilities/Result.h" @@ -37,12 +38,17 @@ class ConnectionTransport; class FtlServer { public: + /* Public types */ + struct StartedStreamInfo { + ftl_stream_id_t StreamId; + std::shared_ptr PacketSink; + }; + /* Callback types */ using RequestKeyCallback = std::function>(ftl_channel_id_t)>; using StreamStartedCallback = - std::function(ftl_channel_id_t, MediaMetadata)>; + std::function(ftl_channel_id_t, MediaMetadata)>; using StreamEndedCallback = std::function; - using RtpPacketCallback = FtlStream::RtpPacketCallback; /* Constructor/Destructor */ FtlServer( @@ -51,7 +57,6 @@ class FtlServer RequestKeyCallback onRequestKey, StreamStartedCallback onStreamStarted, StreamEndedCallback onStreamEnded, - RtpPacketCallback onRtpPacket, uint16_t minMediaPort = DEFAULT_MEDIA_MIN_PORT, uint16_t maxMediaPort = DEFAULT_MEDIA_MAX_PORT); ~FtlServer() = default; @@ -173,6 +178,7 @@ class FtlServer ftl_stream_id_t StreamId; MediaMetadata Metadata; in_addr TargetAddr; + std::shared_ptr PacketSink; }; struct FtlServerStreamStartedEvent : public FtlServerEvent { @@ -211,7 +217,6 @@ class FtlServer const RequestKeyCallback onRequestKey; const StreamStartedCallback onStreamStarted; const StreamEndedCallback onStreamEnded; - const RtpPacketCallback onRtpPacket; // Media ports const uint16_t minMediaPort; const uint16_t maxMediaPort; diff --git a/src/FtlStream.cpp b/src/FtlStream.cpp index 1e51a16..3444138 100644 --- a/src/FtlStream.cpp +++ b/src/FtlStream.cpp @@ -580,7 +580,7 @@ void FtlStream::processAudioVideoRtpPacket(const std::vector& rtpPack if (onRtpPacket) { dataLock.unlock(); // Unlock while we call out - onRtpPacket(GetChannelId(), streamId, rtpPacket); + onRtpPacket(rtpPacket); dataLock.lock(); } } diff --git a/src/FtlStream.h b/src/FtlStream.h index f76e66b..ccec29b 100644 --- a/src/FtlStream.h +++ b/src/FtlStream.h @@ -33,8 +33,7 @@ class FtlStream public: /* Public types */ using ClosedCallback = std::function; - using RtpPacketCallback = std::function&)>; + using RtpPacketCallback = std::function&)>; struct FtlStreamStats { time_t StartTime; diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index 30ff282..a57fa92 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -43,9 +43,7 @@ JanusFtl::JanusFtl( std::bind(&JanusFtl::ftlServerStreamStarted, this, std::placeholders::_1, std::placeholders::_2), std::bind(&JanusFtl::ftlServerStreamEnded, this, std::placeholders::_1, - std::placeholders::_2), - std::bind(&JanusFtl::ftlServerRtpPacket, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + std::placeholders::_2)); } #pragma endregion @@ -268,13 +266,13 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If session is watching an active stream, remove it if (streams.count(channelId) > 0) { - ActiveStream& watchingStream = streams[channelId]; - watchingStream.ViewerSessions.erase(session.Session.get()); + std::shared_ptr& watchingStream = streams[channelId]; + watchingStream->RemoveViewerSession(session.Session.get()); // If we're an Edge node and there are no more viewers for this channel, we can // unsubscribe. if ((configuration->GetNodeKind() == NodeKind::Edge) && - (watchingStream.ViewerSessions.size() == 0)) + (watchingStream->GetViewerCount() == 0)) { orchestratorUnsubscribe = true; } @@ -337,7 +335,8 @@ Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t ch return serviceConnection->GetHmacKey(channelId); } -Result JanusFtl::ftlServerStreamStarted(ftl_channel_id_t channelId, +Result JanusFtl::ftlServerStreamStarted( + ftl_channel_id_t channelId, MediaMetadata mediaMetadata) { std::unique_lock lock(streamDataMutex); @@ -346,37 +345,31 @@ Result JanusFtl::ftlServerStreamStarted(ftl_channel_id_t channe Result startResult = serviceConnection->StartStream(channelId); if (startResult.IsError) { - return startResult; + return Result::Error(startResult.ErrorMessage); } ftl_stream_id_t streamId = startResult.Value; // Stop any existing streams on this channel if (streams.count(channelId) > 0) { - const ActiveStream& activeStream = streams[channelId]; + const auto& stream = streams.at(channelId); spdlog::info("Existing Stream {} exists for Channel {} - stopping...", - activeStream.StreamId, channelId); - ftlServer->StopStream(activeStream.ChannelId, activeStream.StreamId); - endStream(activeStream.ChannelId, activeStream.StreamId, lock); + stream->GetStreamId(), channelId); + ftlServer->StopStream(stream->GetChannelId(), stream->GetStreamId()); + endStream(stream->GetChannelId(), stream->GetStreamId(), lock); } // Insert new stream - streams.insert_or_assign(channelId, ActiveStream - { - .ChannelId = channelId, - .StreamId = streamId, - .Metadata = mediaMetadata, - .ViewerSessions = {}, - }); + auto stream = std::make_shared(channelId, streamId, mediaMetadata); + streams[channelId] = stream; // Move any pending viewer sessions over if (pendingViewerSessions.count(channelId) > 0) { for (const auto& pendingSession : pendingViewerSessions[channelId]) { - streams[channelId].ViewerSessions.insert(pendingSession); - sendJsep(sessions[pendingSession->GetJanusPluginSessionHandle()], streams[channelId], - nullptr); + stream->AddViewerSession(pendingSession); + sendJsep(sessions[pendingSession->GetJanusPluginSessionHandle()], *stream, nullptr); } pendingViewerSessions.erase(channelId); } @@ -397,7 +390,10 @@ Result JanusFtl::ftlServerStreamStarted(ftl_channel_id_t channe spdlog::info("Registered new stream: Channel {} / Stream {}.", channelId, streamId); - return Result::Success(streamId); + return Result::Success(FtlServer::StartedStreamInfo { + .StreamId = streamId, + .PacketSink = stream, + }); } void JanusFtl::ftlServerStreamEnded(ftl_channel_id_t channelId, ftl_stream_id_t streamId) @@ -406,36 +402,6 @@ void JanusFtl::ftlServerStreamEnded(ftl_channel_id_t channelId, ftl_stream_id_t endStream(channelId, streamId, lock); } -void JanusFtl::ftlServerRtpPacket(ftl_channel_id_t channelId, ftl_stream_id_t streamId, - const std::vector& packetData) -{ - std::shared_lock lock(streamDataMutex); - if (streams.count(channelId) <= 0) - { - spdlog::error("Packet received for unexpected channel {}", channelId); - return; - } - const ActiveStream& stream = streams[channelId]; - if (stream.StreamId != streamId) - { - spdlog::error("Packet received for channel {} had an unexpected stream ID: {}, expected {}", - channelId, streamId, stream.StreamId); - return; - } - for (const auto& session : stream.ViewerSessions) - { - session->SendRtpPacket(packetData, stream.Metadata); - } - - if (relayClients.count(channelId) > 0) - { - for (const auto& relay : relayClients.at(channelId)) - { - relay.Client->RelayPacket(packetData); - } - } -} - void JanusFtl::initPreviewGenerators() { // H264 @@ -563,8 +529,8 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) { continue; } - metadataByChannel.try_emplace(channelId, streams.at(channelId).Metadata); - viewersByChannel.try_emplace(channelId, streams.at(channelId).ViewerSessions.size()); + metadataByChannel.try_emplace(channelId, streams.at(channelId)->GetMetadata()); + viewersByChannel.try_emplace(channelId, streams.at(channelId)->GetViewerCount()); } lock.unlock(); @@ -673,50 +639,36 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, streamId); return; } - const ActiveStream& activeStream = streams[channelId]; - if (activeStream.StreamId != streamId) + const std::shared_ptr& stream = streams.at(channelId); + if (stream->GetStreamId() != streamId) { spdlog::error("Stream ended from channel {} had unexpected stream id {}, expected {}", - channelId, streamId, activeStream.StreamId); + channelId, streamId, stream->GetStreamId()); return; } // Reset any existing viewers to a pending state - if (pendingViewerSessions.count(channelId) == 0) - { - pendingViewerSessions.insert_or_assign(channelId, std::unordered_set()); - } - pendingViewerSessions[channelId].insert(activeStream.ViewerSessions.begin(), - activeStream.ViewerSessions.end()); + auto viewerSessions = stream->RemoveAllViewerSessions(); + pendingViewerSessions[channelId].insert(viewerSessions.begin(), viewerSessions.end()); // TODO: Tell viewers stream is offline. // If we are configured as an Ingest node, notify the Orchestrator that a stream has ended. if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) { spdlog::info("Unpublishing channel {} / stream {} from Orchestrator", - activeStream.ChannelId, activeStream.StreamId); + stream->GetChannelId(), stream->GetStreamId()); orchestrationClient->SendStreamPublish(ConnectionPublishPayload { .IsPublish = false, - .ChannelId = activeStream.ChannelId, - .StreamId = activeStream.StreamId, + .ChannelId = stream->GetChannelId(), + .StreamId = stream->GetStreamId(), }); } - // If relays exist for this stream, stop them - if (relayClients.count(channelId) > 0) - { - for (const auto& relay : relayClients.at(channelId)) - { - spdlog::info("Stopping relay for channel {} / stream {} -> {}...", activeStream.ChannelId, - activeStream.StreamId, relay.TargetHostname); - relay.Client->Stop(); - } - relayClients.erase(channelId); - } + stream->StopRelays(); - spdlog::info("Stream ended. Channel {} / stream {}", activeStream.ChannelId, - activeStream.StreamId); + spdlog::info("Stream ended. Channel {} / stream {}", + stream->GetChannelId(), stream->GetStreamId()); serviceConnection->EndStream(streamId); streams.erase(channelId); @@ -822,17 +774,17 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt } // Otherwise, we've got a live stream! - ActiveStream& stream = streams[channelId]; + auto& stream = streams.at(channelId); // TODO allow user to request ICE restart (new offer) // TODO if they're already watching, handle it // Set this session as a viewer - stream.ViewerSessions.insert(session.Session.get()); + stream->AddViewerSession(session.Session.get()); // Send the JSEP to initiate the media connection - sendJsep(session, stream, transaction); + sendJsep(session, *stream, transaction); return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL); } @@ -843,7 +795,7 @@ janus_plugin_result* JanusFtl::handleStartMessage(ActiveSession& session, JsonPt return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL); } -int JanusFtl::sendJsep(const ActiveSession& session, const ActiveStream& stream, char* transaction) +int JanusFtl::sendJsep(const ActiveSession& session, const JanusStream& stream, char* transaction) { // Prepare JSEP payload std::string sdpOffer = generateSdpOffer(session, stream); @@ -865,7 +817,7 @@ int JanusFtl::sendJsep(const ActiveSession& session, const ActiveStream& stream, jsepPtr.get()); } -std::string JanusFtl::generateSdpOffer(const ActiveSession& session, const ActiveStream& stream) +std::string JanusFtl::generateSdpOffer(const ActiveSession& session, const JanusStream& stream) { // https://tools.ietf.org/html/rfc4566 @@ -875,14 +827,14 @@ std::string JanusFtl::generateSdpOffer(const ActiveSession& session, const Activ offerStream << "v=0\r\n" << "o=- " << session.Session->GetSdpSessionId() << " " << session.Session->GetSdpVersion() << " IN IP4 127.0.0.1\r\n" << - "s=Channel " << stream.ChannelId << "\r\n"; + "s=Channel " << stream.GetChannelId() << "\r\n"; // Audio media description - if (stream.Metadata.HasAudio) + if (stream.GetMetadata().HasAudio) { - std::string audioPayloadType = std::to_string(stream.Metadata.AudioPayloadType); + std::string audioPayloadType = std::to_string(stream.GetMetadata().AudioPayloadType); std::string audioCodec = - SupportedAudioCodecs::AudioCodecString(stream.Metadata.AudioCodec); + SupportedAudioCodecs::AudioCodecString(stream.GetMetadata().AudioCodec); offerStream << "m=audio 1 RTP/SAVPF " << audioPayloadType << "\r\n" << "c=IN IP4 1.1.1.1\r\n" << @@ -892,11 +844,11 @@ std::string JanusFtl::generateSdpOffer(const ActiveSession& session, const Activ } // Video media description - if (stream.Metadata.HasVideo) + if (stream.GetMetadata().HasVideo) { - std::string videoPayloadType = std::to_string(stream.Metadata.VideoPayloadType); + std::string videoPayloadType = std::to_string(stream.GetMetadata().VideoPayloadType); std::string videoCodec = - SupportedVideoCodecs::VideoCodecString(stream.Metadata.VideoCodec); + SupportedVideoCodecs::VideoCodecString(stream.GetMetadata().VideoCodec); offerStream << "m=video 1 RTP/SAVPF " << videoPayloadType << "\r\n" << "c=IN IP4 1.1.1.1\r\n" << @@ -946,7 +898,7 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl payload.TargetHostname); // Do we have an active stream? - if (streams.count(payload.ChannelId) <= 0) + if (!streams.contains(payload.ChannelId)) { spdlog::error("Orchestrator requested a relay for channel that is not streaming." "Target hostname: {}, Channel ID: {}", payload.TargetHostname, payload.ChannelId); @@ -955,7 +907,7 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl .IsSuccess = false, }; } - ActiveStream& activeStream = streams[payload.ChannelId]; + auto& stream = streams.at(payload.ChannelId); // Start the relay now! auto relayClient = std::make_unique(payload.TargetHostname, payload.ChannelId, @@ -964,18 +916,18 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl { .VendorName = "janus-ftl-plugin", .VendorVersion = "0.0.0", // TODO: Versioning - .HasVideo = activeStream.Metadata.HasVideo, + .HasVideo = stream->GetMetadata().HasVideo, .VideoCodec = SupportedVideoCodecs::VideoCodecString( - activeStream.Metadata.VideoCodec), - .VideoHeight = activeStream.Metadata.VideoHeight, - .VideoWidth = activeStream.Metadata.VideoWidth, - .VideoPayloadType = activeStream.Metadata.VideoPayloadType, - .VideoIngestSsrc = activeStream.Metadata.VideoSsrc, - .HasAudio = activeStream.Metadata.HasAudio, + stream->GetMetadata().VideoCodec), + .VideoHeight = stream->GetMetadata().VideoHeight, + .VideoWidth = stream->GetMetadata().VideoWidth, + .VideoPayloadType = stream->GetMetadata().VideoPayloadType, + .VideoIngestSsrc = stream->GetMetadata().VideoSsrc, + .HasAudio = stream->GetMetadata().HasAudio, .AudioCodec = SupportedAudioCodecs::AudioCodecString( - activeStream.Metadata.AudioCodec), - .AudioPayloadType = activeStream.Metadata.AudioPayloadType, - .AudioIngestSsrc = activeStream.Metadata.AudioSsrc, + stream->GetMetadata().AudioCodec), + .AudioPayloadType = stream->GetMetadata().AudioPayloadType, + .AudioIngestSsrc = stream->GetMetadata().AudioSsrc, }); if (connectResult.IsError) { @@ -987,11 +939,7 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl }; } - relayClients[payload.ChannelId].push_back(ActiveRelay { - .ChannelId = payload.ChannelId, - .TargetHostname = payload.TargetHostname, - .Client = std::move(relayClient), - }); + stream->AddRelayClient(payload.TargetHostname, std::move(relayClient)); return ConnectionResult { @@ -1004,39 +952,28 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl "Channel {}, Stream {}, Target: {}", payload.ChannelId, payload.StreamId, payload.TargetHostname); - // Remove and stop matching relays - int numRelaysRemoved = 0; - if (relayClients.count(payload.ChannelId) > 0) + + // Do we have an active stream? + if (!streams.contains(payload.ChannelId)) { - for (auto it = relayClients.at(payload.ChannelId).begin(); - it != relayClients.at(payload.ChannelId).end();) - { - ActiveRelay& relay = *it; - if ((relay.ChannelId == payload.ChannelId) && - (relay.TargetHostname == payload.TargetHostname)) - { - relay.Client->Stop(); - it = relayClients.at(payload.ChannelId).erase(it); - ++numRelaysRemoved; - } - else - { - ++it; - } - } + spdlog::warn("Orchestrator requested to stop a relay for channel that is not streaming." + "Target hostname: {}, Channel ID: {}", payload.TargetHostname, payload.ChannelId); + return ConnectionResult { .IsSuccess = true }; } - - if (numRelaysRemoved == 0) + auto& stream = streams.at(payload.ChannelId); + if (stream->GetStreamId() != payload.StreamId) + { + spdlog::warn("Orchestrator requested to stop a relay for a stream that no longer exists: " + "Channel {}, Stream {}", payload.ChannelId, payload.StreamId); + return ConnectionResult { .IsSuccess = true }; + } + if (!stream->StopRelay(payload.TargetHostname)) { spdlog::warn("Orchestrator requested to stop non-existant relay: " "Channel {}, Stream {}, Target: {}", payload.ChannelId, payload.StreamId, payload.TargetHostname); } - - return ConnectionResult - { - .IsSuccess = true, - }; + return ConnectionResult { .IsSuccess = true }; } } #pragma endregion Private methods diff --git a/src/JanusFtl.h b/src/JanusFtl.h index eabdae1..4ec8299 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -14,6 +14,7 @@ #include "FtlClient.h" #include "FtlServer.h" #include "JanusSession.h" +#include "JanusStream.h" #include "PreviewGenerators/PreviewGenerator.h" #include "ServiceConnections/ServiceConnection.h" #include "Utilities/FtlTypes.h" @@ -86,25 +87,11 @@ class JanusFtl private: /* Private types */ - struct ActiveStream - { - ftl_channel_id_t ChannelId; - ftl_stream_id_t StreamId; - MediaMetadata Metadata; - std::unordered_set ViewerSessions; - std::time_t streamStartTime; - }; struct ActiveSession { std::optional WatchingChannelId; std::unique_ptr Session; }; - struct ActiveRelay - { - ftl_channel_id_t ChannelId; - std::string TargetHostname; - std::unique_ptr Client; - }; /* Private fields */ janus_plugin* pluginHandle; @@ -124,19 +111,16 @@ class JanusFtl std::unique_ptr watchdog; // Stream/Session/Relay data std::shared_mutex streamDataMutex; // Covers shared access to streams and sessions - std::unordered_map streams; + std::unordered_map> streams; std::unordered_map sessions; std::unordered_map> pendingViewerSessions; - std::unordered_map> relayClients; /* Private methods */ // FtlServer Callbacks Result> ftlServerRequestKey(ftl_channel_id_t channelId); - Result ftlServerStreamStarted(ftl_channel_id_t channelId, + Result ftlServerStreamStarted(ftl_channel_id_t channelId, MediaMetadata mediaMetadata); void ftlServerStreamEnded(ftl_channel_id_t channelId, ftl_stream_id_t streamId); - void ftlServerRtpPacket(ftl_channel_id_t channelId, ftl_stream_id_t streamId, - const std::vector& packetData); // Initialization void initPreviewGenerators(); void initOrchestratorConnection(); @@ -155,8 +139,8 @@ class JanusFtl char* transaction); janus_plugin_result* handleStartMessage(ActiveSession& session, JsonPtr message, char* transaction); - int sendJsep(const ActiveSession& session, const ActiveStream& stream, char* transaction); - std::string generateSdpOffer(const ActiveSession& session, const ActiveStream& stream); + int sendJsep(const ActiveSession& session, const JanusStream& stream, char* transaction); + std::string generateSdpOffer(const ActiveSession& session, const JanusStream& stream); // Orchestrator message handling void onOrchestratorConnectionClosed(); ConnectionResult onOrchestratorIntro(ConnectionIntroPayload payload); diff --git a/src/JanusStream.cpp b/src/JanusStream.cpp new file mode 100644 index 0000000..1abd1af --- /dev/null +++ b/src/JanusStream.cpp @@ -0,0 +1,125 @@ +/** + * @file JanusStream.h + * @author Daniel Stiner (danstiner@gmail.com) + * @date 2021-03-10 + * @copyright Copyright (c) 2021 Daniel Stiner + */ + +#include "JanusStream.h" + +#pragma region Constructor/Destructor +JanusStream::JanusStream( + ftl_channel_id_t channelId, + ftl_stream_id_t streamId, + MediaMetadata mediaMetadata) : + channelId(channelId), + streamId(streamId), + mediaMetadata(mediaMetadata) +{ } +#pragma endregion + +#pragma region Public methods +void JanusStream::SendRtpPacket(const std::vector& packet) +{ + std::lock_guard lock(mutex); + + for (const auto& session : viewerSessions) + { + session->SendRtpPacket(packet, mediaMetadata); + } + + for (const auto& relay : relays) + { + relay.Client->RelayPacket(packet); + } +} + +void JanusStream::AddViewerSession(JanusSession* session) +{ + std::lock_guard lock(mutex); + viewerSessions.insert(session); +} + +size_t JanusStream::RemoveViewerSession(JanusSession* session) +{ + std::lock_guard lock(mutex); + return viewerSessions.erase(session); +} + +std::unordered_set JanusStream::RemoveAllViewerSessions() +{ + std::lock_guard lock(mutex); + std::unordered_set removedSessions; + viewerSessions.swap(removedSessions); + return removedSessions; +} + +size_t JanusStream::GetViewerCount() const +{ + std::lock_guard lock(mutex); + return viewerSessions.size(); +} + +void JanusStream::AddRelayClient(const std::string targetHostname, + std::unique_ptr client) +{ + std::lock_guard lock(mutex); + relays.push_back(Relay { .TargetHostname = targetHostname, .Client = std::move(client) }); +} + +size_t JanusStream::StopRelay(const std::string& targetHostname) +{ + std::list removedRelays; + { + std::lock_guard lock(mutex); + for (auto it = relays.begin(); + it != relays.end();) + { + if (it->TargetHostname == targetHostname) { + removedRelays.push_back(std::move(*it)); + it = relays.erase(it); + } + } + } + for (Relay& relay : removedRelays) + { + spdlog::info("Stopping relay for channel {} / stream {} -> {}...", + channelId, streamId, relay.TargetHostname); + relay.Client->Stop(); + } + return removedRelays.size(); +} + +void JanusStream::StopRelays() +{ + std::list removedRelays; + { + std::lock_guard lock(mutex); + relays.swap(removedRelays); + } + for (const auto& relay : removedRelays) + { + spdlog::info("Stopping relay for channel {} / stream {} -> {}...", + channelId, streamId, relay.TargetHostname); + relay.Client->Stop(); + } +} + +#pragma endregion + +#pragma region Getters/setters + +ftl_channel_id_t JanusStream::GetChannelId() const +{ + return channelId; +} +ftl_stream_id_t JanusStream::GetStreamId() const +{ + return streamId; +} +MediaMetadata JanusStream::GetMetadata() const +{ + return mediaMetadata; +} + +#pragma endregion diff --git a/src/JanusStream.h b/src/JanusStream.h new file mode 100644 index 0000000..42a82db --- /dev/null +++ b/src/JanusStream.h @@ -0,0 +1,59 @@ +/** + * @file JanusStream.h + * @author Daniel Stiner (danstiner@gmail.com) + * @date 2021-03-10 + * @copyright Copyright (c) 2021 Daniel Stiner + */ + +#pragma once + +#include "FtlClient.h" +#include "JanusSession.h" +#include "RtpPacketSink.h" +#include "Utilities/FtlTypes.h" + +#include +#include + +class JanusStream : public RtpPacketSink +{ +public: + /* Constructor/Destructor */ + JanusStream( + ftl_channel_id_t channelId, + ftl_stream_id_t streamId, + MediaMetadata mediaMetadata); + + /* Public methods */ + void SendRtpPacket(const std::vector& packet) override; + + // Session methods + void AddViewerSession(JanusSession* session); + size_t RemoveViewerSession(JanusSession* session); + std::unordered_set RemoveAllViewerSessions(); + size_t GetViewerCount() const; + + // Relay client methods + void AddRelayClient(const std::string targetHostname, std::unique_ptr client); + size_t StopRelay(const std::string& targetHostname); + void StopRelays(); + + /* Getters/Setters */ + ftl_channel_id_t GetChannelId() const; + ftl_stream_id_t GetStreamId() const; + MediaMetadata GetMetadata() const; + +private: + struct Relay + { + std::string TargetHostname; + std::unique_ptr Client; + }; + + ftl_channel_id_t channelId; + ftl_stream_id_t streamId; + MediaMetadata mediaMetadata; + std::unordered_set viewerSessions; + std::list relays; + mutable std::mutex mutex; +}; diff --git a/src/RtpPacketSink.h b/src/RtpPacketSink.h new file mode 100644 index 0000000..2522067 --- /dev/null +++ b/src/RtpPacketSink.h @@ -0,0 +1,19 @@ +/** + * @file RtpPacketSink.h + * @author Daniel Stiner (danstiner@gmail.com) + * @date 2021-03-10 + * @copyright Copyright (c) 2021 Daniel Stiner + */ + +#pragma once + +#include + +class RtpPacketSink +{ +public: + virtual ~RtpPacketSink() {}; + + /* Public methods */ + virtual void SendRtpPacket(const std::vector& packet) = 0; +};