Skip to content

Commit

Permalink
fix(Analytics): Fixing crash when attempting to submit events while a…
Browse files Browse the repository at this point in the history
… previous submission is in progress (#3331)
  • Loading branch information
ruisebas authored Nov 1, 2023
1 parent 44009a9 commit ccd0545
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public protocol AnalyticsClientBehaviour: Actor {
func removeGlobalAttribute(forKey key: String, forEventType eventType: String)
func removeGlobalMetric(forKey key: String)
func removeGlobalMetric(forKey key: String, forEventType eventType: String)
func record(_ event: PinpointEvent) throws
func record(_ event: PinpointEvent) async throws

func setRemoteGlobalAttributes(_ attributes: [String: String])
func removeAllRemoteGlobalAttributes()
Expand Down Expand Up @@ -231,7 +231,7 @@ actor AnalyticsClient: AnalyticsClientBehaviour {
session: sessionProvider())
}

func record(_ event: PinpointEvent) throws {
func record(_ event: PinpointEvent) async throws {
// Add event type attributes
if let eventAttributes = eventTypeAttributes[event.eventType] {
for (key, attribute) in eventAttributes {
Expand All @@ -256,7 +256,7 @@ actor AnalyticsClient: AnalyticsClientBehaviour {
event.addMetric(metric, forKey: key)
}

try eventRecorder.save(event)
try await eventRecorder.save(event)
}

@discardableResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import enum AwsCommonRuntimeKit.CommonRunTimeError
import Foundation

/// AnalyticsEventRecording saves and submits pinpoint events
protocol AnalyticsEventRecording {
var pinpointClient: PinpointClientProtocol { get }
protocol AnalyticsEventRecording: Actor {
nonisolated var pinpointClient: PinpointClientProtocol { get }

/// Saves a pinpoint event to storage
/// - Parameter event: A PinpointEvent
Expand All @@ -35,12 +35,13 @@ protocol AnalyticsEventRecording {
}

/// An AnalyticsEventRecording implementation that stores and submits pinpoint events
class EventRecorder: AnalyticsEventRecording {
let appId: String
let storage: AnalyticsEventStorage
let pinpointClient: PinpointClientProtocol
let endpointClient: EndpointClientBehaviour
actor EventRecorder: AnalyticsEventRecording {
private let appId: String
private let storage: AnalyticsEventStorage
private var submittedEvents: [PinpointEvent] = []
private var submissionTask: Task<[PinpointEvent], Error>?
nonisolated let endpointClient: EndpointClientBehaviour
nonisolated let pinpointClient: PinpointClientProtocol

/// Initializer for Event Recorder
/// - Parameters:
Expand All @@ -66,31 +67,37 @@ class EventRecorder: AnalyticsEventRecording {
func save(_ event: PinpointEvent) throws {
log.verbose("saveEvent: \(event)")
try storage.saveEvent(event)
try self.storage.checkDiskSize(limit: Constants.pinpointClientByteLimitDefault)
try storage.checkDiskSize(limit: Constants.pinpointClientByteLimitDefault)
}

func updateAttributesOfEvents(ofType eventType: String,
withSessionId sessionId: PinpointSession.SessionId,
setAttributes attributes: [String: String]) throws {
try self.storage.updateEvents(ofType: eventType,
try storage.updateEvents(ofType: eventType,
withSessionId: sessionId,
setAttributes: attributes)
}

/// Submit all locally stored events in batches
/// If event submission fails, the event retry count is increment otherwise event is marked dirty and available for deletion in the local storage if retry count exceeds 3
/// If event submission succeeds, the event is removed from local storage
/// Submit all locally stored events in batches. If a previous submission is in progress, it waits until it's completed before proceeding.
/// When the submission for an event is accepted, the event is removed from local storage
/// When the submission for an event is rejected, the event retry count is incremented in the local storage. Events that exceed the maximum retry count (3) are purged.
/// - Returns: A collection of events submitted to Pinpoint
func submitAllEvents() async throws -> [PinpointEvent] {
submittedEvents = []
let eventsBatch = try getBatchRecords()
if eventsBatch.count > 0 {
let endpointProfile = await endpointClient.currentEndpointProfile()
try await processBatch(eventsBatch, endpointProfile: endpointProfile)
} else {
log.verbose("No events to submit")
let task = Task { [submissionTask] in
// Wait for the previous submission to complete, regardless of its result
_ = try? await submissionTask?.value
submittedEvents = []
let eventsBatch = try getBatchRecords()
if eventsBatch.count > 0 {
let endpointProfile = await endpointClient.currentEndpointProfile()
try await processBatch(eventsBatch, endpointProfile: endpointProfile)
} else {
log.verbose("No events to submit")
}
return submittedEvents
}
return submittedEvents
submissionTask = task
return try await task.value
}

private func getBatchRecords() throws -> [PinpointEvent] {
Expand Down Expand Up @@ -343,7 +350,7 @@ extension EventRecorder: DefaultLogger {
public static var log: Logger {
Amplify.Logging.logger(forCategory: CategoryType.analytics.displayName, forNamespace: String(describing: self))
}
public var log: Logger {
nonisolated public var log: Logger {
Self.log
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ class AnalyticsClientTests: XCTestCase {

do {
try await analyticsClient.record(event)
XCTAssertEqual(eventRecorder.saveCount, 1)
guard let savedEvent = eventRecorder.lastSavedEvent else {
let saveCount = await eventRecorder.saveCount
XCTAssertEqual(saveCount, 1)
guard let savedEvent = await eventRecorder.lastSavedEvent else {
XCTFail("Expected saved event")
return
}
Expand Down Expand Up @@ -118,8 +119,9 @@ class AnalyticsClientTests: XCTestCase {

do {
try await analyticsClient.record(event)
XCTAssertEqual(eventRecorder.saveCount, 1)
guard let savedEvent = eventRecorder.lastSavedEvent else {
let saveCount = await eventRecorder.saveCount
XCTAssertEqual(saveCount, 1)
guard let savedEvent = await eventRecorder.lastSavedEvent else {
XCTFail("Expected saved event")
return
}
Expand All @@ -142,7 +144,8 @@ class AnalyticsClientTests: XCTestCase {
func testSubmit() async {
do {
try await analyticsClient.submitEvents()
XCTAssertEqual(eventRecorder.submitCount, 1)
let submitCount = await eventRecorder.submitCount
XCTAssertEqual(submitCount, 1)
} catch {
XCTFail("Unexpected exception while attempting to submit events")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ class EventRecorderTests: XCTestCase {
/// - Given: a event recorder
/// - When: a new pinpoint event is aved
/// - Then: the event is saved to storage followed by a disk size check
func testSaveEvent() {
func testSaveEvent() async {
let session = PinpointSession(sessionId: "1", startTime: Date(), stopTime: nil)
let event = PinpointEvent(id: "1", eventType: "eventType", eventDate: Date(), session: session)

XCTAssertEqual(storage.events.count, 0)
XCTAssertEqual(storage.checkDiskSizeCallCount, 1)

do {
try recorder.save(event)
try await recorder.save(event)
} catch {
XCTFail("Failed to save events")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import AWSPinpoint
@_spi(InternalAWSPinpoint) @testable import InternalAWSPinpoint

class MockEventRecorder: AnalyticsEventRecording {
var pinpointClient: PinpointClientProtocol = MockPinpointClient()
actor MockEventRecorder: AnalyticsEventRecording {
nonisolated var pinpointClient: PinpointClientProtocol {
MockPinpointClient()
}

var saveCount = 0
var lastSavedEvent: PinpointEvent?
Expand Down

0 comments on commit ccd0545

Please sign in to comment.