Skip to content

Commit

Permalink
Data channel improvements (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie authored Jul 3, 2024
1 parent efee044 commit b11a524
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,53 @@ internal import LiveKitWebRTC
@_implementationOnly import LiveKitWebRTC
#endif

actor DataChannelPairActor: NSObject, Loggable {
// MARK: - Types
// MARK: - Internal delegate

public typealias OnDataPacket = (_ dataPacket: Livekit_DataPacket) -> Void
protocol DataChannelDelegate {
func dataChannel(_ dataChannelPair: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket)
}

class DataChannelPair: NSObject, Loggable {
// MARK: - Public

public let delegates = MulticastDelegate<DataChannelDelegate>(label: "DataChannelDelegate")

public let openCompleter = AsyncCompleter<Void>(label: "Data channel open", defaultTimeout: .defaultPublisherDataChannelOpen)

public var isOpen: Bool {
guard let reliable = _reliableChannel, let lossy = _lossyChannel else { return false }
return reliable.readyState == .open && lossy.readyState == .open
}
public var isOpen: Bool { _state.isOpen }

// MARK: - Private

private let _onDataPacket: OnDataPacket?
private var _reliableChannel: LKRTCDataChannel?
private var _lossyChannel: LKRTCDataChannel?
private struct State {
var lossy: LKRTCDataChannel?
var reliable: LKRTCDataChannel?

public init(reliableChannel: LKRTCDataChannel? = nil,
var isOpen: Bool {
guard let lossy, let reliable else { return false }
return reliable.readyState == .open && lossy.readyState == .open
}
}

private let _state: StateSync<State>

public init(delegate: DataChannelDelegate? = nil,
lossyChannel: LKRTCDataChannel? = nil,
onDataPacket: OnDataPacket? = nil)
reliableChannel: LKRTCDataChannel? = nil)
{
_reliableChannel = reliableChannel
_lossyChannel = lossyChannel
_onDataPacket = onDataPacket
_state = StateSync(State(lossy: lossyChannel,
reliable: reliableChannel))

if let delegate {
delegates.add(delegate: delegate)
}
}

public func set(reliable channel: LKRTCDataChannel?) {
_reliableChannel = channel
let isOpen = _state.mutate {
$0.reliable = channel
return $0.isOpen
}

channel?.delegate = self

if isOpen {
Expand All @@ -61,7 +77,11 @@ actor DataChannelPairActor: NSObject, Loggable {
}

public func set(lossy channel: LKRTCDataChannel?) {
_lossyChannel = channel
let isOpen = _state.mutate {
$0.lossy = channel
return $0.isOpen
}

channel?.delegate = self

if isOpen {
Expand All @@ -70,10 +90,15 @@ actor DataChannelPairActor: NSObject, Loggable {
}

public func reset() {
_reliableChannel?.close()
_lossyChannel?.close()
_reliableChannel = nil
_lossyChannel = nil
let (lossy, reliable) = _state.mutate {
let result = ($0.lossy, $0.reliable)
$0.reliable = nil
$0.lossy = nil
return result
}

lossy?.close()
reliable?.close()

openCompleter.reset()
}
Expand All @@ -91,37 +116,36 @@ actor DataChannelPairActor: NSObject, Loggable {
let serializedData = try packet.serializedData()
let rtcData = RTC.createDataBuffer(data: serializedData)

let channel = (kind == .reliable) ? _reliableChannel : _lossyChannel
let channel = _state.read { kind == .reliable ? $0.reliable : $0.lossy }
guard let sendDataResult = channel?.sendData(rtcData), sendDataResult else {
throw LiveKitError(.invalidState, message: "sendData failed")
}
}

public func infos() -> [Livekit_DataChannelInfo] {
[_lossyChannel, _reliableChannel]
_state.read { [$0.lossy, $0.reliable] }
.compactMap { $0 }
.map { $0.toLKInfoType() }
}
}

// MARK: - RTCDataChannelDelegate

extension DataChannelPairActor: LKRTCDataChannelDelegate {
nonisolated func dataChannelDidChangeState(_: LKRTCDataChannel) {
Task.detached {
if await self.isOpen {
self.openCompleter.resume(returning: ())
}
extension DataChannelPair: LKRTCDataChannelDelegate {
func dataChannelDidChangeState(_: LKRTCDataChannel) {
if isOpen {
openCompleter.resume(returning: ())
}
}

nonisolated func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
log("dataChannel(didReceiveMessageWith:)")
func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
guard let dataPacket = try? Livekit_DataPacket(contiguousBytes: buffer.data) else {
log("could not decode data message", .error)
log("Could not decode data message", .error)
return
}

_onDataPacket?(dataPacket)
delegates.notify {
$0.dataChannel(self, didReceiveDataPacket: dataPacket)
}
}
}
12 changes: 6 additions & 6 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ extension Room {
// Resets state of transports
func cleanUpRTC() async {
// Close data channels
await publisherDataChannel.reset()
await subscriberDataChannel.reset()
publisherDataChannel.reset()
subscriberDataChannel.reset()

let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) }

Expand Down Expand Up @@ -90,13 +90,13 @@ extension Room {
log("publisher is not .connected", .error)
}

let dataChannelIsOpen = await publisherDataChannel.isOpen
let dataChannelIsOpen = publisherDataChannel.isOpen
if !dataChannelIsOpen {
log("publisher data channel is not .open", .error)
}

// Should return true if successful
try await publisherDataChannel.send(userPacket: userPacket, kind: kind)
try publisherDataChannel.send(userPacket: userPacket, kind: kind)
}
}

Expand Down Expand Up @@ -163,8 +163,8 @@ extension Room {
let lossyDataChannel = await publisher.dataChannel(for: LKRTCDataChannel.labels.lossy,
configuration: RTC.createDataChannelConfiguration(maxRetransmits: 0))

await publisherDataChannel.set(reliable: reliableDataChannel)
await publisherDataChannel.set(lossy: lossyDataChannel)
publisherDataChannel.set(reliable: reliableDataChannel)
publisherDataChannel.set(lossy: lossyDataChannel)

log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))")
log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))")
Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ extension Room: TransportDelegate {

if _state.isSubscriberPrimary, transport.target == .subscriber {
switch dataChannel.label {
case LKRTCDataChannel.labels.reliable: await subscriberDataChannel.set(reliable: dataChannel)
case LKRTCDataChannel.labels.lossy: await subscriberDataChannel.set(lossy: dataChannel)
case LKRTCDataChannel.labels.reliable: subscriberDataChannel.set(reliable: dataChannel)
case LKRTCDataChannel.labels.lossy: subscriberDataChannel.set(lossy: dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
}
}
Expand Down
24 changes: 14 additions & 10 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,8 @@ public class Room: NSObject, ObservableObject, Loggable {

// MARK: - DataChannels

lazy var subscriberDataChannel: DataChannelPairActor = .init(onDataPacket: { [weak self] dataPacket in
guard let self else { return }
switch dataPacket.value {
case let .speaker(update): self.engine(self, didUpdateSpeakers: update.speakers)
case let .user(userPacket): self.engine(self, didReceiveUserPacket: userPacket)
default: return
}
})

let publisherDataChannel = DataChannelPairActor()
lazy var subscriberDataChannel = DataChannelPair(delegate: self)
lazy var publisherDataChannel = DataChannelPair(delegate: self)

var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks",
qos: .default)
Expand Down Expand Up @@ -513,3 +505,15 @@ public extension Room {
set { RTC.bypassVoiceProcessing = newValue }
}
}

// MARK: - DataChannelDelegate

extension Room: DataChannelDelegate {
func dataChannel(_: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) {
switch dataPacket.value {
case let .speaker(update): engine(self, didUpdateSpeakers: update.speakers)
case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket)
default: return
}
}
}
18 changes: 14 additions & 4 deletions Tests/LiveKitTests/PublishDataTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@
import XCTest

class PublishDataTests: XCTestCase {
struct TestDataPayload: Codable {
let content: String
// Test with canSubscribe: true
func testPublishDataReceiverCanSubscribe() async throws {
try await _publishDataTest(receiverRoomOptions: RoomTestingOptions(canSubscribe: true))
}

func testPublishData() async throws {
try await withRooms([RoomTestingOptions(canPublishData: true), RoomTestingOptions()]) { rooms in
// Test with canSubscribe: false
func testPublishDataReceiverCanNotSubscribe() async throws {
try await _publishDataTest(receiverRoomOptions: RoomTestingOptions(canSubscribe: false))
}

private func _publishDataTest(receiverRoomOptions: RoomTestingOptions) async throws {
struct TestDataPayload: Codable {
let content: String
}

try await withRooms([RoomTestingOptions(canPublishData: true), receiverRoomOptions]) { rooms in
// Alias to Rooms
let room1 = rooms[0]
let room2 = rooms[1]
Expand Down

0 comments on commit b11a524

Please sign in to comment.