Skip to content

Commit

Permalink
Accurate audio track counting (#477)
Browse files Browse the repository at this point in the history
~~Not sure what can cause trackDidStop to be called unbalanced with
trackDidStart but better guard this.~~
Accurate counting of audio tracks instead of counting publish /
unpublish.
  • Loading branch information
hiroshihorie authored Sep 9, 2024
1 parent 4d8f2f5 commit 4ca5fa2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 35 deletions.
13 changes: 12 additions & 1 deletion Sources/LiveKit/Support/SerialRunnerActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@ actor SerialRunnerActor<Value: Sendable> {

func run(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
let task = Task { [previousTask] in
let _ = try? await previousTask?.value
// Wait for the previous task to complete, but cancel it if needed
if let previousTask, !Task.isCancelled {
// If previous task is still running, wait for it
_ = try? await previousTask.value
}

// Check for cancellation before running the block
try Task.checkCancellation()

// Run the new block
return try await block()
}

previousTask = task

return try await withTaskCancellationHandler {
// Await the current task's result
try await task.value
} onCancel: {
// Ensure the task is canceled when requested
task.cancel()
}
}
Expand Down
6 changes: 2 additions & 4 deletions Sources/LiveKit/Track/AudioManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,16 @@ public class AudioManager: Loggable {
}

func trackDidStart(_ type: Type) {
// async mutation
_state.mutate { state in
if type == .local { state.localTracksCount += 1 }
if type == .remote { state.remoteTracksCount += 1 }
}
}

func trackDidStop(_ type: Type) {
// async mutation
_state.mutate { state in
if type == .local { state.localTracksCount -= 1 }
if type == .remote { state.remoteTracksCount -= 1 }
if type == .local { state.localTracksCount = max(state.localTracksCount - 1, 0) }
if type == .remote { state.remoteTracksCount = max(state.remoteTracksCount - 1, 0) }
}
}

Expand Down
28 changes: 10 additions & 18 deletions Sources/LiveKit/Track/Local/LocalAudioTrack.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,23 @@ public class LocalAudioTrack: Track, LocalTrack, AudioTrack {
captureOptions: options)
}

@discardableResult
override func onPublish() async throws -> Bool {
let didPublish = try await super.onPublish()
if didPublish {
AudioManager.shared.trackDidStart(.local)
}
return didPublish
}

@discardableResult
override func onUnpublish() async throws -> Bool {
let didUnpublish = try await super.onUnpublish()
if didUnpublish {
AudioManager.shared.trackDidStop(.local)
}
return didUnpublish
}

public func mute() async throws {
try await super._mute()
}

public func unmute() async throws {
try await super._unmute()
}

// MARK: - Internal

override func startCapture() async throws {
AudioManager.shared.trackDidStart(.local)
}

override func stopCapture() async throws {
AudioManager.shared.trackDidStop(.local)
}
}

public extension LocalAudioTrack {
Expand Down
32 changes: 20 additions & 12 deletions Sources/LiveKit/Track/Track.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class Track: NSObject, Loggable {

private let _statisticsTimer = AsyncTimer(interval: 1.0)

private let _startStopSerialRunner = SerialRunnerActor<Void>()

init(name: String,
kind: Kind,
source: Source,
Expand Down Expand Up @@ -230,24 +232,30 @@ public class Track: NSObject, Loggable {

@objc
public final func start() async throws {
guard _state.trackState != .started else {
log("Already started", .warning)
return
try await _startStopSerialRunner.run { [weak self] in
guard let self else { return }
guard self._state.trackState != .started else {
self.log("Already started", .warning)
return
}
try await self.startCapture()
if self is RemoteTrack { try await self.enable() }
self._state.mutate { $0.trackState = .started }
}
try await startCapture()
if self is RemoteTrack { try await enable() }
_state.mutate { $0.trackState = .started }
}

@objc
public final func stop() async throws {
guard _state.trackState != .stopped else {
log("Already stopped", .warning)
return
try await _startStopSerialRunner.run { [weak self] in
guard let self else { return }
guard self._state.trackState != .stopped else {
self.log("Already stopped", .warning)
return
}
try await self.stopCapture()
if self is RemoteTrack { try await self.disable() }
self._state.mutate { $0.trackState = .stopped }
}
try await stopCapture()
if self is RemoteTrack { try await disable() }
_state.mutate { $0.trackState = .stopped }
}

// Returns true if didEnable
Expand Down

0 comments on commit 4ca5fa2

Please sign in to comment.