Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📦 Hand RTP packets directly to JanusStream #106

Merged
merged 13 commits into from
Mar 15, 2021
3 changes: 2 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ sources = files([
'src/FtlControlConnection.cpp',
'src/FtlServer.cpp',
'src/FtlStream.cpp',
'src/JanusSession.cpp',
'src/JanusFtl.cpp',
'src/JanusSession.cpp',
'src/JanusStream.cpp',
# Library entrypoint
'src/entrypoint.cpp',
])
Expand Down
1 change: 0 additions & 1 deletion src/FtlControlConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class FtlControlConnection
FtlStream* ftlStream = nullptr;
bool hmacRequested = false;
bool isAuthenticated = false;
bool mediaPortRequested = false;
bool isStreaming = false;
ftl_channel_id_t channelId = 0;
std::vector<std::byte> hmacPayload;
Expand Down
29 changes: 13 additions & 16 deletions src/FtlServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ FtlServer::FtlServer(
RequestKeyCallback onRequestKey,
StreamStartedCallback onStreamStarted,
StreamEndedCallback onStreamEnded,
RtpPacketCallback onRtpPacket,
uint16_t minMediaPort,
uint16_t maxMediaPort)
:
Expand All @@ -29,7 +28,6 @@ FtlServer::FtlServer(
onRequestKey(onRequestKey),
onStreamStarted(onStreamStarted),
onStreamEnded(onStreamEnded),
onRtpPacket(onRtpPacket),
minMediaPort(minMediaPort),
maxMediaPort(maxMediaPort),
eventQueueThread(std::jthread(&FtlServer::eventQueueThreadBody, this))
Expand Down Expand Up @@ -325,12 +323,6 @@ void FtlServer::onStreamClosed(FtlStream* stream)
}));
}

void FtlServer::onStreamRtpPacket(ftl_channel_id_t channelId, ftl_stream_id_t streamId,
const std::vector<std::byte>& packet)
{
onRtpPacket(channelId, streamId, packet);
}

void FtlServer::eventStopStream(std::shared_ptr<FtlServerStopStreamEvent> event)
{
spdlog::debug("FtlServer::eventStopStream processing StopStream event...");
Expand Down Expand Up @@ -462,7 +454,7 @@ void FtlServer::eventTerminateControlConnection(
std::move(pendingControlConnections.at(event->Connection).first);
pendingControlConnections.erase(event->Connection);
dispatchAsyncCall(
[this, event, control = std::move(control)]() mutable
[event, control = std::move(control)]() mutable
{
control->Stop(event->ResponseCode);
});
Expand All @@ -480,13 +472,13 @@ void FtlServer::eventControlRequestMediaPort(
[this, event]()
{
// Attempt to start stream
Result<ftl_stream_id_t> streamIdResult =
Result<StartedStreamInfo> streamStartResult =
onStreamStarted(event->ChannelId, event->Metadata);
if (streamIdResult.IsError)
if (streamStartResult.IsError)
{
spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread "
"error starting stream - queueing TerminateControlConnection event: {}",
streamIdResult.ErrorMessage);
streamStartResult.ErrorMessage);
eventQueue.enqueue(FtlServerEventKind::TerminateControlConnection,
std::shared_ptr<FtlServerTerminateControlConnectionEvent>(
new FtlServerTerminateControlConnectionEvent
Expand All @@ -500,7 +492,8 @@ void FtlServer::eventControlRequestMediaPort(
{
spdlog::debug("FtlServer::eventControlRequestMediaPort processing thread "
"successfully received Stream ID - queueing StreamIdAssigned event...");
ftl_stream_id_t streamId = streamIdResult.Value;
ftl_stream_id_t streamId = streamStartResult.Value.StreamId;
std::shared_ptr<RtpPacketSink> packetSink = streamStartResult.Value.PacketSink;
eventQueue.enqueue(FtlServerEventKind::StreamIdAssigned,
std::shared_ptr<FtlServerStreamIdAssignedEvent>(
new FtlServerStreamIdAssignedEvent
Expand All @@ -510,6 +503,7 @@ void FtlServer::eventControlRequestMediaPort(
.StreamId = streamId,
.Metadata = event->Metadata,
.TargetAddr = event->TargetAddr,
.PacketSink = packetSink,
}));
}
});
Expand Down Expand Up @@ -554,18 +548,21 @@ void FtlServer::eventStreamIdAssigned(std::shared_ptr<FtlServerStreamIdAssignedE
return;
}
uint16_t mediaPort = portResult.Value;
std::shared_ptr<RtpPacketSink> rtpPacketSink = event->PacketSink;

// Attempt to fire up the new FtlStream. Queue a new event when we're done.
dispatchAsyncCall(
[this, event, control = std::move(control), mediaPort]() mutable
[this, event, control = std::move(control), mediaPort, rtpPacketSink]() mutable
{
std::unique_ptr<ConnectionTransport> mediaTransport =
mediaConnectionCreator->CreateConnection(mediaPort, event->TargetAddr);
auto stream = std::make_shared<FtlStream>(
std::move(control), std::move(mediaTransport), event->Metadata, event->StreamId,
std::bind(&FtlServer::onStreamClosed, this, std::placeholders::_1),
std::bind(&FtlServer::onStreamRtpPacket, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
[rtpPacketSink](ftl_channel_id_t channelId, ftl_stream_id_t steamId, const std::vector<std::byte> packet)
danstiner marked this conversation as resolved.
Show resolved Hide resolved
{
rtpPacketSink->SendRtpPacket(packet);
});

Result<void> streamStartResult = stream->StartAsync(mediaPort);
if (streamStartResult.IsError)
Expand Down
13 changes: 9 additions & 4 deletions src/FtlServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "FtlControlConnection.h"
#include "FtlStream.h"
#include "RtpPacketSink.h"
#include "Utilities/FtlTypes.h"
#include "Utilities/Result.h"

Expand Down Expand Up @@ -37,12 +38,17 @@ class ConnectionTransport;
class FtlServer
{
public:
/* Public types */
struct StartedStreamInfo {
ftl_stream_id_t StreamId;
std::shared_ptr<RtpPacketSink> PacketSink;
};

/* Callback types */
using RequestKeyCallback = std::function<Result<std::vector<std::byte>>(ftl_channel_id_t)>;
using StreamStartedCallback =
std::function<Result<ftl_stream_id_t>(ftl_channel_id_t, MediaMetadata)>;
std::function<Result<StartedStreamInfo>(ftl_channel_id_t, MediaMetadata)>;
using StreamEndedCallback = std::function<void(ftl_channel_id_t, ftl_stream_id_t)>;
using RtpPacketCallback = FtlStream::RtpPacketCallback;

/* Constructor/Destructor */
FtlServer(
Expand All @@ -51,7 +57,6 @@ class FtlServer
RequestKeyCallback onRequestKey,
StreamStartedCallback onStreamStarted,
StreamEndedCallback onStreamEnded,
RtpPacketCallback onRtpPacket,
uint16_t minMediaPort = DEFAULT_MEDIA_MIN_PORT,
uint16_t maxMediaPort = DEFAULT_MEDIA_MAX_PORT);
~FtlServer() = default;
Expand Down Expand Up @@ -173,6 +178,7 @@ class FtlServer
ftl_stream_id_t StreamId;
MediaMetadata Metadata;
in_addr TargetAddr;
std::shared_ptr<RtpPacketSink> PacketSink;
};
struct FtlServerStreamStartedEvent : public FtlServerEvent
{
Expand Down Expand Up @@ -211,7 +217,6 @@ class FtlServer
const RequestKeyCallback onRequestKey;
const StreamStartedCallback onStreamStarted;
const StreamEndedCallback onStreamEnded;
const RtpPacketCallback onRtpPacket;
// Media ports
const uint16_t minMediaPort;
const uint16_t maxMediaPort;
Expand Down
Loading