diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart index 74f5ac28b9..9e9fd28c3f 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart @@ -42,7 +42,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - FutureOr isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { throw UnimplementedError('isFull() has not been implemented.'); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart index 627213d0e2..7acc8ee13c 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart @@ -56,7 +56,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - Future isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { return _database.isFull(maxSizeInMB); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart index 0dc36a615c..c20db8c60b 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart @@ -14,8 +14,8 @@ class DartQueuedItemStore // ignore: avoid_unused_constructor_parameters DartQueuedItemStore(String? storagePath); - late final Future _database = () async { - if (await IndexedDbAdapter.checkIsIndexedDBSupported()) { + late final QueuedItemStore _database = () { + if (IndexedDbAdapter.checkIsIndexedDBSupported()) { return IndexedDbAdapter(); } logger.warn( @@ -34,8 +34,7 @@ class DartQueuedItemStore String timestamp, { bool enableQueueRotation = false, }) async { - final db = await _database; - await db.addItem( + await _database.addItem( string, timestamp, enableQueueRotation: enableQueueRotation, @@ -44,34 +43,29 @@ class DartQueuedItemStore @override Future deleteItems(Iterable items) async { - final db = await _database; - await db.deleteItems(items); + await _database.deleteItems(items); } @override Future> getCount(int count) async { - final db = await _database; - return db.getCount(count); + return _database.getCount(count); } @override Future> getAll() async { - final db = await _database; - return db.getAll(); + return _database.getAll(); } @override - Future isFull(int maxSizeInMB) async { - final db = await _database; - return db.isFull(maxSizeInMB); + bool isFull(int maxSizeInMB) { + return _database.isFull(maxSizeInMB); } /// Clear IndexedDB data. @override @visibleForTesting Future clear() async { - final db = await _database; - return db.clear(); + return _database.clear(); } @override diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart index a7d6026cce..d29f32777c 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart @@ -104,7 +104,7 @@ class DriftQueuedItemStore extends _$DriftQueuedItemStore } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart index 37c1812e78..c4e4c13763 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart @@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore { } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } @@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore { void close() {} /// Check that IndexDB will work on this device. - static Future checkIsIndexedDBSupported() async { + static bool checkIsIndexedDBSupported() { if (indexedDB == null) { return false; } // indexedDB will be non-null in Firefox private browsing, // but will fail to open. try { - final openRequest = indexedDB!.open('test', 1); - await openRequest.future; + indexedDB!.open('test', 1).result; return true; } on Object { return false; diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart index 4c18fc12d8..c49b105780 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart @@ -233,14 +233,14 @@ void main() { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); for (var i = 0; i < 100; i++) { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, ); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index 93da31d040..5e302b497e 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -32,9 +32,10 @@ import 'package:meta/meta.dart'; const int _maxNumberOfLogEventsInBatch = 10000; const int _maxLogEventsBatchSize = 1048576; const int _baseBufferSize = 26; +const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; const int _maxLogEventSize = 256000; -final int _maxLogEventsTimeSpanInBatch = - const Duration(hours: 24).inMilliseconds; +const Duration _minusMaxLogEventTimeInFuture = Duration(hours: -2); +const Duration _baseRetryInterval = Duration(seconds: 10); typedef _LogBatch = (List logQueues, List logEvents); @@ -113,7 +114,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin bool _enabled; StoppableTimer? _timer; RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider; - + int _retryCount = 0; + DateTime? _retryTime; set remoteLoggingConstraintProvider( RemoteLoggingConstraintProvider remoteProvider, ) { @@ -129,32 +131,89 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin Future startSyncing() async { final batchStream = _getLogBatchesToSync(); await for (final (logs, events) in batchStream) { - final response = await _sendToCloudWatch(events); - // TODO(nikahsn): handle tooOldLogEventEndIndex - // and expiredLogEventEndIndex. - if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) { - // TODO(nikahsn): throw and exception to enable log rotation if the - // log store is full. - break; + _TooNewLogEventException? tooNewException; + while (logs.isNotEmpty && events.isNotEmpty) { + final rejectedLogEventsInfo = + (await _sendToCloudWatch(events)).rejectedLogEventsInfo; + if (rejectedLogEventsInfo == null) { + await _logStore.deleteItems(logs); + break; + } + + final (tooOldEndIndex, tooNewStartIndex) = + rejectedLogEventsInfo.parse(events.length); + + if (_isValidIndex(tooNewStartIndex, events.length)) { + tooNewException = _TooNewLogEventException( + events[tooNewStartIndex!].timestamp.toInt(), + ); + // set logs to end before the index. + logs.removeRange(tooNewStartIndex, events.length); + // set events to end before the index. + events.removeRange(tooNewStartIndex, events.length); + } + if (_isValidIndex(tooOldEndIndex, events.length)) { + // remove old logs from log store. + await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1)); + // set logs to start after the index. + logs.removeRange(0, tooOldEndIndex + 1); + // set events to start after the index. + events.removeRange(0, tooOldEndIndex + 1); + } + } + // after sending each batch to CloudWatch check if the batch has + // `tooNewException` and throw to stop syncing next batches. + if (tooNewException != null) { + throw tooNewException; } - await _logStore.deleteItems(logs); } } if (!_syncing) { - // TODO(nikahsn): disable log rotation. _syncing = true; + DateTime? nextRetry; try { await startSyncing(); + } on _TooNewLogEventException catch (e) { + nextRetry = + DateTime.fromMillisecondsSinceEpoch(e.timeInMillisecondsSinceEpoch) + .add(_minusMaxLogEventTimeInFuture); } on Exception catch (e) { logger.error('Failed to sync logs to CloudWatch.', e); - // TODO(nikahsn): enable log rotation if the log store is full } finally { + _handleFullLogStoreAfterSync( + retryTime: nextRetry, + ); _syncing = false; } } } + void _handleFullLogStoreAfterSync({ + DateTime? retryTime, + }) { + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + if (!isLogStoreFull) { + _retryCount = 0; + _retryTime = null; + return; + } + if (retryTime != null && retryTime.isAfter(DateTime.timestamp())) { + _retryTime = retryTime; + return; + } + _retryCount += 1; + _retryTime = DateTime.timestamp().add((_baseRetryInterval * _retryCount)); + } + + bool _shouldSyncOnFullLogStore() { + if (_retryTime == null) { + return true; + } + return !(_retryTime!.isAfter(DateTime.timestamp())); + } + void _onTimerError(Object e) { logger.error('Failed to sync logs to CloudWatch.', e); } @@ -225,11 +284,17 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin return; } final item = logEntry.toQueuedItem(); + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + final shouldEnableQueueRotation = isLogStoreFull && _retryTime != null; + await _logStore.addItem( item.value, item.timestamp, + enableQueueRotation: shouldEnableQueueRotation, ); - if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) { + + if (isLogStoreFull && _shouldSyncOnFullLogStore()) { await _startSyncingIfNotInProgress(); } } @@ -253,6 +318,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin _enabled = false; _timer?.stop(); await _logStore.clear(); + _retryCount = 0; + _retryTime = null; } /// Sends logs on-demand to CloudWatch. @@ -285,3 +352,34 @@ extension on LogEntry { ); } } + +extension on RejectedLogEventsInfo { + (int? pastEndIndex, int? futureStartIndex) parse(int length) { + int? pastEndIndex; + int? futureStartIndex; + + if (_isValidIndex(tooOldLogEventEndIndex, length)) { + pastEndIndex = tooOldLogEventEndIndex; + } + if (_isValidIndex(expiredLogEventEndIndex, length)) { + pastEndIndex = pastEndIndex == null + ? expiredLogEventEndIndex + : max(pastEndIndex, expiredLogEventEndIndex!); + } + if (_isValidIndex(tooNewLogEventStartIndex, length)) { + futureStartIndex = tooNewLogEventStartIndex; + } + return (pastEndIndex, futureStartIndex); + } +} + +class _TooNewLogEventException implements Exception { + const _TooNewLogEventException( + this.timeInMillisecondsSinceEpoch, + ); + final int timeInMillisecondsSinceEpoch; +} + +bool _isValidIndex(int? index, int length) { + return index != null && index >= 0 && index <= length - 1; +} diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart index 24e197d449..9317aa4878 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart @@ -24,7 +24,7 @@ abstract interface class QueuedItemStore { FutureOr> getAll(); /// Whether the queue size is reached [maxSizeInMB]. - FutureOr isFull(int maxSizeInMB); + bool isFull(int maxSizeInMB); /// Clear the queue of items. FutureOr clear(); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart index fe69936682..4e4f57f39e 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart @@ -45,7 +45,17 @@ void main() { ), QueuedItem( id: 2, - value: 'second og message', + value: 'second log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 3, + value: 'third log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 4, + value: 'forth log message', timestamp: DateTime.timestamp().toIso8601String(), ), ]; @@ -62,17 +72,34 @@ void main() { logStreamProvider: mockCloudWatchLogStreamProvider, ); }); + test('when enabled, logs are added to the item store', () async { - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) - .thenAnswer((_) async => Future.value(false)); + .thenReturn(false); + plugin.enable(); + await expectLater( plugin.handleLogEntry(errorLog), completes, ); - verify(() => mockQueuedItemStore.addItem(any(), any())).called(1); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + verify( () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), ).called(1); @@ -138,8 +165,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.deleteItems(any())) .thenAnswer((_) async => {}); @@ -184,8 +211,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.getAll()).thenAnswer( (_) async => Future>.value(queuedItems), @@ -239,6 +266,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -276,6 +306,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -289,5 +322,405 @@ void main() { () => mockCloudWatchLogStreamProvider.createLogStream(any()), ).called(1); }); + + test('it enables log rotation when log store is full and retry is set', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: true, + ), + ).called(1); + }); + + test( + 'it does not enable log rotation when log store is full if retry is not ' + 'set. it start sync on full log store.', + () async { + final isFullResponse = [false, true, false]; + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenAnswer((_) => isFullResponse.removeAt(0)); + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => + Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(2); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + }, + ); + + test('it does not sync on full log store if retry time not reached', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(1); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1); + }); + + test( + 'it deletes too old logs in the batch and sync the rest', + () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooOldLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => mockQueuedItemStore + .deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(1), + ); + }, + ); + + test('it deletes expired logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(expiredLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(1), + ); + }); + + test('it leaves too new logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooNewLogEventStartIndex: 1), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 1); + expect(captures.captured.length, 1); + expect( + (captures.captured.last as Iterable).first, + queuedItems.first, + ); + }); + + test( + 'it deltes old logs and leaves too new logs in the batch' + ' and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: RejectedLogEventsInfo( + expiredLogEventEndIndex: 0, + tooOldLogEventEndIndex: 1, + tooNewLogEventStartIndex: 3, + ), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable), + queuedItems.sublist(0, 2), + ); + + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(2, 3), + ); + }); }); } diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart index ec2bca8874..ab035a6ca2 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart @@ -231,7 +231,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); // add enough items to exceed capacity limit of 1mb @@ -239,7 +239,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, );