Skip to content

Commit

Permalink
Merge pull request #128 from ably-labs/50-trigger-RETRY
Browse files Browse the repository at this point in the history
[ECO-4985] Trigger the RETRY operation where spec says to
  • Loading branch information
lawrence-forooghian authored Nov 19, 2024
2 parents eacfcb7 + 82fed05 commit 435af70
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ internal actor DefaultRoom<LifecycleManagerFactory: RoomLifecycleManagerFactory>
// MARK: - Room status

internal func onStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription<RoomStatusChange> {
await lifecycleManager.onChange(bufferingPolicy: bufferingPolicy)
await lifecycleManager.onRoomStatusChange(bufferingPolicy: bufferingPolicy)
}

internal var status: RoomStatus {
Expand Down
87 changes: 76 additions & 11 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal protocol RoomLifecycleManager: Sendable {
func performDetachOperation() async throws
func performReleaseOperation() async
var roomStatus: RoomStatus { get async }
func onChange(bufferingPolicy: BufferingPolicy) async -> Subscription<RoomStatusChange>
func onRoomStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription<RoomStatusChange>
func waitToBeAbleToPerformPresenceOperations(requestedByFeature requester: RoomFeature) async throws(ARTErrorInfo)
}

Expand Down Expand Up @@ -88,7 +88,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
private var contributorAnnotations: ContributorAnnotations
private var listenForStateChangesTask: Task<Void, Never>!
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var subscriptions: [Subscription<RoomStatusChange>] = []
private var roomStatusChangeSubscriptions: [Subscription<RoomStatusChange>] = []
private var operationResultContinuations = OperationResultContinuations()

// MARK: - Initializers and `deinit`
Expand Down Expand Up @@ -188,7 +188,8 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case detaching(detachOperationID: UUID)
case detached
case detachedDueToRetryOperation(retryOperationID: UUID)
case suspendedAwaitingStartOfRetryOperation(error: ARTErrorInfo)
// `retryOperationTask` is exposed so that tests can wait for the triggered RETRY operation to complete.
case suspendedAwaitingStartOfRetryOperation(retryOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case suspended(retryOperationID: UUID, error: ARTErrorInfo)
case failed(error: ARTErrorInfo)
case releasing(releaseOperationID: UUID)
Expand All @@ -210,7 +211,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
.detaching
case .detached, .detachedDueToRetryOperation:
.detached
case let .suspendedAwaitingStartOfRetryOperation(error):
case let .suspendedAwaitingStartOfRetryOperation(_, error):
.suspended(error: error)
case let .suspended(_, error):
.suspended(error: error)
Expand Down Expand Up @@ -318,12 +319,36 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
status.toRoomStatus
}

internal func onChange(bufferingPolicy: BufferingPolicy) -> Subscription<RoomStatusChange> {
internal func onRoomStatusChange(bufferingPolicy: BufferingPolicy) -> Subscription<RoomStatusChange> {
let subscription: Subscription<RoomStatusChange> = .init(bufferingPolicy: bufferingPolicy)
subscriptions.append(subscription)
roomStatusChangeSubscriptions.append(subscription)
return subscription
}

#if DEBUG
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
/// Supports the ``testsOnly_onRoomStatusChange()`` method.
private var statusChangeSubscriptions: [Subscription<StatusChange>] = []

internal struct StatusChange {
internal var current: Status
internal var previous: Status
}

/// Allows tests to subscribe to changes to the manager’s internal status (which exposes more cases and additional metadata, compared to the ``RoomStatus`` exposed by ``onRoomStatusChange(bufferingPolicy:)``).
internal func testsOnly_onStatusChange() -> Subscription<StatusChange> {
let subscription: Subscription<StatusChange> = .init(bufferingPolicy: .unbounded)
statusChangeSubscriptions.append(subscription)
return subscription
}

internal func emitStatusChange(_ change: StatusChange) {
for subscription in statusChangeSubscriptions {
subscription.emit(change)
}
}
#endif

/// Updates ``status`` and emits a status change event.
private func changeStatus(to new: Status) {
logger.log(message: "Transitioning from \(status) to \(new)", level: .info)
Expand All @@ -333,12 +358,17 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// Avoid a double-emit of room status when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
if new.toRoomStatus != previous.toRoomStatus {
let statusChange = RoomStatusChange(current: status.toRoomStatus, previous: previous.toRoomStatus)
emitStatusChange(statusChange)
emitRoomStatusChange(statusChange)
}

#if DEBUG
let statusChange = StatusChange(current: status, previous: previous)
emitStatusChange(statusChange)
#endif
}

private func emitStatusChange(_ change: RoomStatusChange) {
for subscription in subscriptions {
private func emitRoomStatusChange(_ change: RoomStatusChange) {
for subscription in roomStatusChangeSubscriptions {
subscription.emit(change)
}
}
Expand Down Expand Up @@ -480,7 +510,14 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor

clearTransientDisconnectTimeouts()

changeStatus(to: .suspendedAwaitingStartOfRetryOperation(error: reason))
// My understanding is that, since this task is being created inside an actor’s synchronous code, the two .suspended* statuses will always come in the right order; i.e. first .suspendedAwaitingStartOfRetryOperation and then .suspended.
let retryOperationTask = scheduleAnOperation(
kind: .retry(
triggeringContributor: contributor,
errorForSuspendedStatus: reason
)
)
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(retryOperationTask: retryOperationTask, error: reason))
}
case .attaching:
if !hasOperationInProgress, !contributorAnnotations[contributor].hasTransientDisconnectTimeout {
Expand Down Expand Up @@ -690,6 +727,27 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
try result.get()
}

/// The kinds of operation that you can schedule using ``scheduleAnOperation(kind:)``.
private enum OperationKind {
/// The RETRY operation.
case retry(triggeringContributor: Contributor, errorForSuspendedStatus: ARTErrorInfo)
}

/// Requests that a room lifecycle operation be performed asynchronously.
private func scheduleAnOperation(kind: OperationKind) -> Task<Void, Never> {
logger.log(message: "Scheduling operation \(kind)", level: .debug)
return Task {
logger.log(message: "Performing scheduled operation \(kind)", level: .debug)
switch kind {
case let .retry(triggeringContributor, errorForSuspendedStatus):
await performRetryOperation(
triggeredByContributor: triggeringContributor,
errorForSuspendedStatus: errorForSuspendedStatus
)
}
}
}

// MARK: - ATTACH operation

internal func performAttachOperation() async throws {
Expand Down Expand Up @@ -752,9 +810,16 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .suspended:
// CHA-RL1h2
let error = ARTErrorInfo(chatError: .attachmentFailed(feature: contributor.feature, underlyingError: contributorAttachError))
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(error: error))

// CHA-RL1h3
// My understanding is that, since this task is being created inside an actor’s synchronous code, the two .suspended* statuses will always come in the right order; i.e. first .suspendedAwaitingStartOfRetryOperation and then .suspended.
let retryOperationTask = scheduleAnOperation(
kind: .retry(
triggeringContributor: contributor,
errorForSuspendedStatus: error
)
)
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(retryOperationTask: retryOperationTask, error: error))
throw error
case .failed:
// CHA-RL1h4
Expand Down
Loading

0 comments on commit 435af70

Please sign in to comment.