From b11a524d2fb5717eb17431422f15e968dc37926e Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 3 Jul 2024 17:43:59 +0900 Subject: [PATCH] Data channel improvements (#418) --- ...lPairActor.swift => DataChannelPair.swift} | 90 ++++++++++++------- Sources/LiveKit/Core/Room+Engine.swift | 12 +-- .../LiveKit/Core/Room+TransportDelegate.swift | 4 +- Sources/LiveKit/Core/Room.swift | 24 ++--- Tests/LiveKitTests/PublishDataTests.swift | 18 +++- 5 files changed, 93 insertions(+), 55 deletions(-) rename Sources/LiveKit/Core/{DataChannelPairActor.swift => DataChannelPair.swift} (53%) diff --git a/Sources/LiveKit/Core/DataChannelPairActor.swift b/Sources/LiveKit/Core/DataChannelPair.swift similarity index 53% rename from Sources/LiveKit/Core/DataChannelPairActor.swift rename to Sources/LiveKit/Core/DataChannelPair.swift index 310456f15..108eefd73 100644 --- a/Sources/LiveKit/Core/DataChannelPairActor.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -22,37 +22,53 @@ internal import LiveKitWebRTC @_implementationOnly import LiveKitWebRTC #endif -actor DataChannelPairActor: NSObject, Loggable { - // MARK: - Types +// MARK: - Internal delegate - public typealias OnDataPacket = (_ dataPacket: Livekit_DataPacket) -> Void +protocol DataChannelDelegate { + func dataChannel(_ dataChannelPair: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) +} +class DataChannelPair: NSObject, Loggable { // MARK: - Public + public let delegates = MulticastDelegate(label: "DataChannelDelegate") + public let openCompleter = AsyncCompleter(label: "Data channel open", defaultTimeout: .defaultPublisherDataChannelOpen) - public var isOpen: Bool { - guard let reliable = _reliableChannel, let lossy = _lossyChannel else { return false } - return reliable.readyState == .open && lossy.readyState == .open - } + public var isOpen: Bool { _state.isOpen } // MARK: - Private - private let _onDataPacket: OnDataPacket? - private var _reliableChannel: LKRTCDataChannel? - private var _lossyChannel: LKRTCDataChannel? + private struct State { + var lossy: LKRTCDataChannel? + var reliable: LKRTCDataChannel? - public init(reliableChannel: LKRTCDataChannel? = nil, + var isOpen: Bool { + guard let lossy, let reliable else { return false } + return reliable.readyState == .open && lossy.readyState == .open + } + } + + private let _state: StateSync + + public init(delegate: DataChannelDelegate? = nil, lossyChannel: LKRTCDataChannel? = nil, - onDataPacket: OnDataPacket? = nil) + reliableChannel: LKRTCDataChannel? = nil) { - _reliableChannel = reliableChannel - _lossyChannel = lossyChannel - _onDataPacket = onDataPacket + _state = StateSync(State(lossy: lossyChannel, + reliable: reliableChannel)) + + if let delegate { + delegates.add(delegate: delegate) + } } public func set(reliable channel: LKRTCDataChannel?) { - _reliableChannel = channel + let isOpen = _state.mutate { + $0.reliable = channel + return $0.isOpen + } + channel?.delegate = self if isOpen { @@ -61,7 +77,11 @@ actor DataChannelPairActor: NSObject, Loggable { } public func set(lossy channel: LKRTCDataChannel?) { - _lossyChannel = channel + let isOpen = _state.mutate { + $0.lossy = channel + return $0.isOpen + } + channel?.delegate = self if isOpen { @@ -70,10 +90,15 @@ actor DataChannelPairActor: NSObject, Loggable { } public func reset() { - _reliableChannel?.close() - _lossyChannel?.close() - _reliableChannel = nil - _lossyChannel = nil + let (lossy, reliable) = _state.mutate { + let result = ($0.lossy, $0.reliable) + $0.reliable = nil + $0.lossy = nil + return result + } + + lossy?.close() + reliable?.close() openCompleter.reset() } @@ -91,14 +116,14 @@ actor DataChannelPairActor: NSObject, Loggable { let serializedData = try packet.serializedData() let rtcData = RTC.createDataBuffer(data: serializedData) - let channel = (kind == .reliable) ? _reliableChannel : _lossyChannel + let channel = _state.read { kind == .reliable ? $0.reliable : $0.lossy } guard let sendDataResult = channel?.sendData(rtcData), sendDataResult else { throw LiveKitError(.invalidState, message: "sendData failed") } } public func infos() -> [Livekit_DataChannelInfo] { - [_lossyChannel, _reliableChannel] + _state.read { [$0.lossy, $0.reliable] } .compactMap { $0 } .map { $0.toLKInfoType() } } @@ -106,22 +131,21 @@ actor DataChannelPairActor: NSObject, Loggable { // MARK: - RTCDataChannelDelegate -extension DataChannelPairActor: LKRTCDataChannelDelegate { - nonisolated func dataChannelDidChangeState(_: LKRTCDataChannel) { - Task.detached { - if await self.isOpen { - self.openCompleter.resume(returning: ()) - } +extension DataChannelPair: LKRTCDataChannelDelegate { + func dataChannelDidChangeState(_: LKRTCDataChannel) { + if isOpen { + openCompleter.resume(returning: ()) } } - nonisolated func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) { - log("dataChannel(didReceiveMessageWith:)") + func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) { guard let dataPacket = try? Livekit_DataPacket(contiguousBytes: buffer.data) else { - log("could not decode data message", .error) + log("Could not decode data message", .error) return } - _onDataPacket?(dataPacket) + delegates.notify { + $0.dataChannel(self, didReceiveDataPacket: dataPacket) + } } } diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 1e1a5ba61..73d0c9c53 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -43,8 +43,8 @@ extension Room { // Resets state of transports func cleanUpRTC() async { // Close data channels - await publisherDataChannel.reset() - await subscriberDataChannel.reset() + publisherDataChannel.reset() + subscriberDataChannel.reset() let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) } @@ -90,13 +90,13 @@ extension Room { log("publisher is not .connected", .error) } - let dataChannelIsOpen = await publisherDataChannel.isOpen + let dataChannelIsOpen = publisherDataChannel.isOpen if !dataChannelIsOpen { log("publisher data channel is not .open", .error) } // Should return true if successful - try await publisherDataChannel.send(userPacket: userPacket, kind: kind) + try publisherDataChannel.send(userPacket: userPacket, kind: kind) } } @@ -163,8 +163,8 @@ extension Room { let lossyDataChannel = await publisher.dataChannel(for: LKRTCDataChannel.labels.lossy, configuration: RTC.createDataChannelConfiguration(maxRetransmits: 0)) - await publisherDataChannel.set(reliable: reliableDataChannel) - await publisherDataChannel.set(lossy: lossyDataChannel) + publisherDataChannel.set(reliable: reliableDataChannel) + publisherDataChannel.set(lossy: lossyDataChannel) log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))") log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))") diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index 2be2631fb..6f32967e6 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -106,8 +106,8 @@ extension Room: TransportDelegate { if _state.isSubscriberPrimary, transport.target == .subscriber { switch dataChannel.label { - case LKRTCDataChannel.labels.reliable: await subscriberDataChannel.set(reliable: dataChannel) - case LKRTCDataChannel.labels.lossy: await subscriberDataChannel.set(lossy: dataChannel) + case LKRTCDataChannel.labels.reliable: subscriberDataChannel.set(reliable: dataChannel) + case LKRTCDataChannel.labels.lossy: subscriberDataChannel.set(lossy: dataChannel) default: log("Unknown data channel label \(dataChannel.label)", .warning) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 7dc64c1cc..25f7f6955 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -105,16 +105,8 @@ public class Room: NSObject, ObservableObject, Loggable { // MARK: - DataChannels - lazy var subscriberDataChannel: DataChannelPairActor = .init(onDataPacket: { [weak self] dataPacket in - guard let self else { return } - switch dataPacket.value { - case let .speaker(update): self.engine(self, didUpdateSpeakers: update.speakers) - case let .user(userPacket): self.engine(self, didReceiveUserPacket: userPacket) - default: return - } - }) - - let publisherDataChannel = DataChannelPairActor() + lazy var subscriberDataChannel = DataChannelPair(delegate: self) + lazy var publisherDataChannel = DataChannelPair(delegate: self) var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks", qos: .default) @@ -513,3 +505,15 @@ public extension Room { set { RTC.bypassVoiceProcessing = newValue } } } + +// MARK: - DataChannelDelegate + +extension Room: DataChannelDelegate { + func dataChannel(_: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) { + switch dataPacket.value { + case let .speaker(update): engine(self, didUpdateSpeakers: update.speakers) + case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket) + default: return + } + } +} diff --git a/Tests/LiveKitTests/PublishDataTests.swift b/Tests/LiveKitTests/PublishDataTests.swift index 036e1f529..85854cee8 100644 --- a/Tests/LiveKitTests/PublishDataTests.swift +++ b/Tests/LiveKitTests/PublishDataTests.swift @@ -18,12 +18,22 @@ import XCTest class PublishDataTests: XCTestCase { - struct TestDataPayload: Codable { - let content: String + // Test with canSubscribe: true + func testPublishDataReceiverCanSubscribe() async throws { + try await _publishDataTest(receiverRoomOptions: RoomTestingOptions(canSubscribe: true)) } - func testPublishData() async throws { - try await withRooms([RoomTestingOptions(canPublishData: true), RoomTestingOptions()]) { rooms in + // Test with canSubscribe: false + func testPublishDataReceiverCanNotSubscribe() async throws { + try await _publishDataTest(receiverRoomOptions: RoomTestingOptions(canSubscribe: false)) + } + + private func _publishDataTest(receiverRoomOptions: RoomTestingOptions) async throws { + struct TestDataPayload: Codable { + let content: String + } + + try await withRooms([RoomTestingOptions(canPublishData: true), receiverRoomOptions]) { rooms in // Alias to Rooms let room1 = rooms[0] let room2 = rooms[1]