Skip to content

Commit

Permalink
📦 Hand RTP packets directly to JanusStream (#106)
Browse files Browse the repository at this point in the history
Going back and doing ebefe00 the right way this time.

This shortens the path for RTP packets to only require a single lock at the stream level.
  • Loading branch information
danstiner authored Mar 15, 2021
1 parent 9315b8b commit 2231138
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 181 deletions.
3 changes: 2 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ sources = files([
'src/FtlControlConnection.cpp',
'src/FtlServer.cpp',
'src/FtlStream.cpp',
'src/JanusSession.cpp',
'src/JanusFtl.cpp',
'src/JanusSession.cpp',
'src/JanusStream.cpp',
# Library entrypoint
'src/entrypoint.cpp',
])
Expand Down
1 change: 0 additions & 1 deletion src/FtlControlConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class FtlControlConnection
FtlStream* ftlStream = nullptr;
bool hmacRequested = false;
bool isAuthenticated = false;
bool mediaPortRequested = false;
bool isStreaming = false;
ftl_channel_id_t channelId = 0;
std::vector<std::byte> hmacPayload;
Expand Down
29 changes: 13 additions & 16 deletions src/FtlServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ FtlServer::FtlServer(
RequestKeyCallback onRequestKey,
StreamStartedCallback onStreamStarted,
StreamEndedCallback onStreamEnded,
RtpPacketCallback onRtpPacket,
uint16_t minMediaPort,
uint16_t maxMediaPort)
:
Expand All @@ -29,7 +28,6 @@ FtlServer::FtlServer(
onRequestKey(onRequestKey),
onStreamStarted(onStreamStarted),
onStreamEnded(onStreamEnded),
onRtpPacket(onRtpPacket),
minMediaPort(minMediaPort),
maxMediaPort(maxMediaPort),
eventQueueThread(std::jthread(&FtlServer::eventQueueThreadBody, this))
Expand Down Expand Up @@ -325,12 +323,6 @@ void FtlServer::onStreamClosed(FtlStream* stream)
}));
}

void FtlServer::onStreamRtpPacket(ftl_channel_id_t channelId, ftl_stream_id_t streamId,
const std::vector<std::byte>& packet)
{
onRtpPacket(channelId, streamId, packet);
}

void FtlServer::eventStopStream(std::shared_ptr<FtlServerStopStreamEvent> event)
{
spdlog::debug("FtlServer::eventStopStream processing StopStream event...");
Expand Down Expand Up @@ -462,7 +454,7 @@ void FtlServer::eventTerminateControlConnection(
std::move(pendingControlConnections.at(event->Connection).first);
pendingControlConnections.erase(event->Connection);
dispatchAsyncCall(
[this, event, control = std::move(control)]() mutable
[event, control = std::move(control)]() mutable
{
control->Stop(event->ResponseCode);
});
Expand All @@ -480,13 +472,13 @@ void FtlServer::eventControlRequestMediaPort(
[this, event]()
{
// Attempt to start stream
Result<ftl_stream_id_t> streamIdResult =
Result<StartedStreamInfo> streamStartResult =
onStreamStarted(event->ChannelId, event->Metadata);
if (streamIdResult.IsError)
if (streamStartResult.IsError)
{
spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread "
"error starting stream - queueing TerminateControlConnection event: {}",
streamIdResult.ErrorMessage);
streamStartResult.ErrorMessage);
eventQueue.enqueue(FtlServerEventKind::TerminateControlConnection,
std::shared_ptr<FtlServerTerminateControlConnectionEvent>(
new FtlServerTerminateControlConnectionEvent
Expand All @@ -500,7 +492,8 @@ void FtlServer::eventControlRequestMediaPort(
{
spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread "
"successfully received Stream ID - queueing StreamIdAssigned event...");
ftl_stream_id_t streamId = streamIdResult.Value;
ftl_stream_id_t streamId = streamStartResult.Value.StreamId;
std::shared_ptr<RtpPacketSink> packetSink = streamStartResult.Value.PacketSink;
eventQueue.enqueue(FtlServerEventKind::StreamIdAssigned,
std::shared_ptr<FtlServerStreamIdAssignedEvent>(
new FtlServerStreamIdAssignedEvent
Expand All @@ -510,6 +503,7 @@ void FtlServer::eventControlRequestMediaPort(
.StreamId = streamId,
.Metadata = event->Metadata,
.TargetAddr = event->TargetAddr,
.PacketSink = packetSink,
}));
}
});
Expand Down Expand Up @@ -554,18 +548,21 @@ void FtlServer::eventStreamIdAssigned(std::shared_ptr<FtlServerStreamIdAssignedE
return;
}
uint16_t mediaPort = portResult.Value;
std::shared_ptr<RtpPacketSink> rtpPacketSink = event->PacketSink;

// Attempt to fire up the new FtlStream. Queue a new event when we're done.
dispatchAsyncCall(
[this, event, control = std::move(control), mediaPort]() mutable
[this, event, control = std::move(control), mediaPort, rtpPacketSink]() mutable
{
std::unique_ptr<ConnectionTransport> mediaTransport =
mediaConnectionCreator->CreateConnection(mediaPort, event->TargetAddr);
auto stream = std::make_shared<FtlStream>(
std::move(control), std::move(mediaTransport), event->Metadata, event->StreamId,
std::bind(&FtlServer::onStreamClosed, this, std::placeholders::_1),
std::bind(&FtlServer::onStreamRtpPacket, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
[rtpPacketSink](const std::vector<std::byte> packet)
{
rtpPacketSink->SendRtpPacket(packet);
});

Result<void> streamStartResult = stream->StartAsync(mediaPort);
if (streamStartResult.IsError)
Expand Down
13 changes: 9 additions & 4 deletions src/FtlServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "FtlControlConnection.h"
#include "FtlStream.h"
#include "RtpPacketSink.h"
#include "Utilities/FtlTypes.h"
#include "Utilities/Result.h"

Expand Down Expand Up @@ -37,12 +38,17 @@ class ConnectionTransport;
class FtlServer
{
public:
/* Public types */
struct StartedStreamInfo {
ftl_stream_id_t StreamId;
std::shared_ptr<RtpPacketSink> PacketSink;
};

/* Callback types */
using RequestKeyCallback = std::function<Result<std::vector<std::byte>>(ftl_channel_id_t)>;
using StreamStartedCallback =
std::function<Result<ftl_stream_id_t>(ftl_channel_id_t, MediaMetadata)>;
std::function<Result<StartedStreamInfo>(ftl_channel_id_t, MediaMetadata)>;
using StreamEndedCallback = std::function<void(ftl_channel_id_t, ftl_stream_id_t)>;
using RtpPacketCallback = FtlStream::RtpPacketCallback;

/* Constructor/Destructor */
FtlServer(
Expand All @@ -51,7 +57,6 @@ class FtlServer
RequestKeyCallback onRequestKey,
StreamStartedCallback onStreamStarted,
StreamEndedCallback onStreamEnded,
RtpPacketCallback onRtpPacket,
uint16_t minMediaPort = DEFAULT_MEDIA_MIN_PORT,
uint16_t maxMediaPort = DEFAULT_MEDIA_MAX_PORT);
~FtlServer() = default;
Expand Down Expand Up @@ -173,6 +178,7 @@ class FtlServer
ftl_stream_id_t StreamId;
MediaMetadata Metadata;
in_addr TargetAddr;
std::shared_ptr<RtpPacketSink> PacketSink;
};
struct FtlServerStreamStartedEvent : public FtlServerEvent
{
Expand Down Expand Up @@ -211,7 +217,6 @@ class FtlServer
const RequestKeyCallback onRequestKey;
const StreamStartedCallback onStreamStarted;
const StreamEndedCallback onStreamEnded;
const RtpPacketCallback onRtpPacket;
// Media ports
const uint16_t minMediaPort;
const uint16_t maxMediaPort;
Expand Down
2 changes: 1 addition & 1 deletion src/FtlStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ void FtlStream::processAudioVideoRtpPacket(const std::vector<std::byte>& rtpPack
if (onRtpPacket)
{
dataLock.unlock(); // Unlock while we call out
onRtpPacket(GetChannelId(), streamId, rtpPacket);
onRtpPacket(rtpPacket);
dataLock.lock();
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/FtlStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class FtlStream
public:
/* Public types */
using ClosedCallback = std::function<void(FtlStream*)>;
using RtpPacketCallback = std::function<void(
ftl_channel_id_t, ftl_stream_id_t, const std::vector<std::byte>&)>;
using RtpPacketCallback = std::function<void(const std::vector<std::byte>&)>;
struct FtlStreamStats
{
time_t StartTime;
Expand Down
Loading

0 comments on commit 2231138

Please sign in to comment.