Skip to content

Commit

Permalink
Runs room checks prior to performing presence operations via `feature…
Browse files Browse the repository at this point in the history
…Channel.waitToBeAbleToPerformPresenceOperations` which was implemented as part of [1].

It also became necessary to clean up the example app as this uncovered race conditions between concurrently running Tasks.
  • Loading branch information
umair-ably committed Nov 21, 2024
1 parent 5344d7f commit 370d444
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 54 deletions.
104 changes: 65 additions & 39 deletions Example/AblyChatExample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ struct ContentView: View {
}
}
}
.tryTask { try await setDefaultTitle() }
.tryTask { try await attachRoom() }
.tryTask { try await showMessages() }
.tryTask { try await showReactions() }
.tryTask { try await showPresence() }
.tryTask { try await showOccupancy() }
.tryTask {
try await setDefaultTitle()
try await attachRoom()
try await showMessages()
try await showReactions()
try await showPresence()
try await showOccupancy()
}
.tryTask {
// NOTE: As we implement more features, move them out of the `if Environment.current == .mock` block and into the main block just above.
if Environment.current == .mock {
Expand Down Expand Up @@ -179,70 +181,94 @@ struct ContentView: View {
}
}

for await message in messagesSubscription {
withAnimation {
messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0)
// Continue listening for messages on a background task so this function can return
Task {
for await message in messagesSubscription {
withAnimation {
messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0)
}
}
}
}

func showReactions() async throws {
let reactionSubscription = try await room().reactions.subscribe(bufferingPolicy: .unbounded)
for await reaction in reactionSubscription {
withAnimation {
showReaction(reaction.displayedText)

// Continue listening for reactions on a background task so this function can return
Task {
for await reaction in reactionSubscription {
withAnimation {
showReaction(reaction.displayedText)
}
}
}
}

func showPresence() async throws {
try await room().presence.enter(data: .init(userCustomData: ["status": .string("📱 Online")]))

for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) {
withAnimation {
let status = event.data?.userCustomData?["status"]?.value as? String
let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)"
let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage
// Continue listening for new presence events on a background task so this function can return
Task {
for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) {
withAnimation {
let status = event.data?.userCustomData?["status"]?.value as? String
let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)"
let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage

messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0)
messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0)
}
}
}
}

func showTypings() async throws {
for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..."
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
typingInfo = ""
// Continue listening for typing events on a background task so this function can return
Task {
for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..."
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
typingInfo = ""
}
}
}
}
}
}

func showOccupancy() async throws {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
// Continue listening for occupancy events on a background task so this function can return
let currentOccupancy = try await room().occupancy.get()
withAnimation {
occupancyInfo = "Connections: \(currentOccupancy.presenceMembers) (\(currentOccupancy.connections))"
}

Task {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
}
}
}
}

func showRoomStatus() async throws {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
} else {
statusInfo = "\(status.current)".capitalized
if status.current == .attached {
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
statusInfo = ""
// Continue listening for status change events on a background task so this function can return
Task {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
} else {
statusInfo = "\(status.current)".capitalized
if status.current == .attached {
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
statusInfo = ""
}
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions Sources/AblyChat/DefaultPresence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR6) It must be possible to retrieve all the @Members of the presence set. The behaviour depends on the current room status, as presence operations in a Realtime Client cause implicit attaches.
internal func get() async throws -> [PresenceMember] {
logger.log(message: "Getting presence", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get { [processPresenceGet] members, error in
do {
Expand All @@ -36,6 +45,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {

internal func get(params: PresenceQuery) async throws -> [PresenceMember] {
logger.log(message: "Getting presence with params: \(params)", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get(params.asARTRealtimePresenceQuery()) { [processPresenceGet] members, error in
do {
Expand All @@ -52,6 +70,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR5) It must be possible to query if a given clientId is in the presence set.
internal func isUserPresent(clientID: String) async throws -> Bool {
logger.log(message: "Checking if user is present with clientID: \(clientID)", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get(ARTRealtimePresenceQuery(clientId: clientID, connectionId: nil)) { [logger] members, error in
guard let members else {
Expand All @@ -68,6 +95,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR3a) Users may choose to enter presence, optionally providing custom data to enter with. The overall presence data must retain the format specified in CHA-PR2.
internal func enter(data: PresenceData? = nil) async throws {
logger.log(message: "Entering presence", level: .debug)

// CHA-PR3c to CHA-PR3g
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence enter operation: \(error)", level: .error)
throw error
}
return try await withCheckedThrowingContinuation { continuation in
channel.presence.enterClient(clientID, data: data?.asQueryItems()) { [logger] error in
if let error {
Expand All @@ -83,6 +118,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR10a) Users may choose to update their presence data, optionally providing custom data to update with. The overall presence data must retain the format specified in CHA-PR2.
internal func update(data: PresenceData? = nil) async throws {
logger.log(message: "Updating presence", level: .debug)

// CHA-PR10c to CHA-PR10g
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence update operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.update(data?.asQueryItems()) { [logger] error in
if let error {
Expand All @@ -98,6 +142,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR4a) Users may choose to leave presence, which results in them being removed from the Realtime presence set.
internal func leave(data: PresenceData? = nil) async throws {
logger.log(message: "Leaving presence", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence leave operation: \(error)", level: .error)
throw error
}
return try await withCheckedThrowingContinuation { continuation in
channel.presence.leave(data?.asQueryItems()) { [logger] error in
if let error {
Expand Down
33 changes: 18 additions & 15 deletions Tests/AblyChatTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,20 @@ struct IntegrationTests {
// (13) Subscribe to occupancy
let rxOccupancySubscription = await rxRoom.occupancy.subscribe(bufferingPolicy: .unbounded)

// (14) Enter presence on the other client and check that we receive the updated occupancy on the subscription
// (14) Attach the room so we can perform presence operations
try await txRoom.attach()

// (15) Enter presence on the other client and check that we receive the updated occupancy on the subscription
try await txRoom.presence.enter(data: nil)

// It can take a moment for the occupancy to update from the clients entering presence above, so we’ll wait 2 seconds here.
try await Task.sleep(nanoseconds: 2_000_000_000)

// (15) Check that we received an updated presence count when getting the occupancy
// (16) Check that we received an updated presence count when getting the occupancy
let updatedCurrentOccupancy = try await rxRoom.occupancy.get()
#expect(updatedCurrentOccupancy.presenceMembers == 1) // 1 for txClient entering presence

// (16) Check that we received an updated presence count on the subscription
// (17) Check that we received an updated presence count on the subscription
let rxOccupancyEventFromSubscription = try #require(await rxOccupancySubscription.first { _ in true })

#expect(rxOccupancyEventFromSubscription.presenceMembers == 1) // 1 for txClient entering presence
Expand All @@ -131,64 +134,64 @@ struct IntegrationTests {

// MARK: - Presence

// (17) Subscribe to presence
// (18) Subscribe to presence
let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update])

// (18) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription
// (19) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceEnterTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceEnterTxEvent.action == .enter)
#expect(rxPresenceEnterTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (19) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription
// (20) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceUpdateTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceUpdateTxEvent.action == .update)
#expect(rxPresenceUpdateTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (20) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription
// (21) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceLeaveTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceLeaveTxEvent.action == .leave)
#expect(rxPresenceLeaveTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (21) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription
// (22) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceEnterRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceEnterRxEvent.action == .enter)
#expect(rxPresenceEnterRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (22) Send `.update` presence event with custom data on our client and check that we receive it on the subscription
// (23) Send `.update` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceUpdateRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceUpdateRxEvent.action == .update)
#expect(rxPresenceUpdateRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (23) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription
// (24) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceLeaveRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceLeaveRxEvent.action == .leave)
#expect(rxPresenceLeaveRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// MARK: - Detach

// (24) Detach the room
// (25) Detach the room
try await rxRoom.detach()

// (25) Check that we received a DETACHED status change as a result of detaching the room
// (26) Check that we received a DETACHED status change as a result of detaching the room
_ = try #require(await rxRoomStatusSubscription.first { $0.current == .detached })
#expect(await rxRoom.status == .detached)

// MARK: - Release

// (26) Release the room
// (27) Release the room
try await rxClient.rooms.release(roomID: roomID)

// (27) Check that we received a RELEASED status change as a result of releasing the room
// (28) Check that we received a RELEASED status change as a result of releasing the room
_ = try #require(await rxRoomStatusSubscription.first { $0.current == .released })
#expect(await rxRoom.status == .released)

// (28) Fetch the room we just released and check it’s a new object
// (29) Fetch the room we just released and check it’s a new object
let postReleaseRxRoom = try await rxClient.rooms.get(roomID: roomID, options: .init())
#expect(postReleaseRxRoom !== rxRoom)
}
Expand Down

0 comments on commit 370d444

Please sign in to comment.