diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_cloudwatch_logger_plugin.dart new file mode 100644 index 00000000000..929557d7788 --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_cloudwatch_logger_plugin.dart @@ -0,0 +1,19 @@ +import 'package:amplify_logging_cloudwatch/src/amplify_log_stream_name_provider.dart'; +import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart'; + +/// {@macro aws_logging_cloudwatch.cloudwatch_logger_plugin} +class AmplifyCloudWatchLoggerPlugin extends CloudWatchLoggerPlugin { + /// {@macro aws_logging_cloudwatch.cloudwatch_logger_plugin} + AmplifyCloudWatchLoggerPlugin({ + required super.credentialsProvider, + required super.pluginConfig, + }) : super( + logStreamProvider: DefaultCloudWatchLogStreamProvider( + credentialsProvider: credentialsProvider, + region: pluginConfig.region, + logGroupName: pluginConfig.logGroupName, + defaultLogStreamNameProvider: + AmplifyLogStreamNameProvider().defaultLogStreamName, + ), + ); +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_log_stream_name_provider.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_log_stream_name_provider.dart new file mode 100644 index 00000000000..28fc1b66d7e --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/amplify_log_stream_name_provider.dart @@ -0,0 +1,39 @@ +import 'package:amplify_core/amplify_core.dart'; +import 'package:amplify_logging_cloudwatch/src/device_info/device_info.dart'; +import 'package:intl/intl.dart'; + +const _guestUserId = 'guest'; + +/// {@template amplify_logging_cloudwatch.amplify_log_stream_name_provider} +/// It uses {mm-dd-yyyy}.{deviceId}.{userId|guest} format for log stream name. +/// +/// It gets deviceId from platform inforamtion or uuid if it is `null`. +/// It gets userId from `Amplify.Auth.getCurrentUser()).userId` or uses `guest` +/// if userId is not available. +/// {@endtemplate} +class AmplifyLogStreamNameProvider { + /// {@macro amplify_logging_cloudwatch.amplify_log_stream_name_provider} + AmplifyLogStreamNameProvider(); + static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd'); + String? _deviceID; + + /// Returns log stream name in `{mm-dd-yyyy}.{deviceId}.{userId|guest}` format + Future defaultLogStreamName() async { + _deviceID ??= await getDeviceId() ?? UUID.getUUID(); + String userId; + userId = await _getUserId(); + return '${_dateFormat.format(DateTime.timestamp())}.$_deviceID.$userId'; + } + + Future _getUserId() async { + String userId; + try { + userId = (await Amplify.Auth.getCurrentUser()).userId; + } on Error { + userId = _guestUserId; + } on Exception { + userId = _guestUserId; + } + return userId; + } +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.dart new file mode 100644 index 00000000000..b5c3a0672cf --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.dart @@ -0,0 +1,3 @@ +export 'device_info.stub.dart' + if (dart.library.io) 'device_info.vm.dart' + if (dart.library.html) 'device_info.web.dart'; diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.stub.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.stub.dart new file mode 100644 index 00000000000..5929bec1e24 --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.stub.dart @@ -0,0 +1,7 @@ +/// {@template amplify_logging_cloudwatch.device_info} +/// Returns device Id from platform information on `vm`. +/// Returns a UUID across browser sessions for web. +/// {@endtemplate} +Future getDeviceId() { + throw UnimplementedError('getDeviceId() has not been implemented.'); +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.vm.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.vm.dart new file mode 100644 index 00000000000..246e600f035 --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.vm.dart @@ -0,0 +1,30 @@ +import 'dart:io'; + +import 'package:device_info_plus/device_info_plus.dart'; + +/// {@macro amplify_logging_cloudwatch.device_info} +Future getDeviceId() async { + final deviceInfo = DeviceInfoPlugin(); + String? deviceID; + try { + if (Platform.isAndroid) { + final androidInfo = await deviceInfo.androidInfo; + deviceID = androidInfo.id; + } else if (Platform.isIOS) { + final iosInfo = await deviceInfo.iosInfo; + deviceID = iosInfo.identifierForVendor ?? ''; + } else if (Platform.isLinux) { + final linuxInfo = await deviceInfo.linuxInfo; + deviceID = linuxInfo.machineId ?? ''; + } else if (Platform.isMacOS) { + final macInfo = await deviceInfo.macOsInfo; + deviceID = macInfo.systemGUID ?? ''; + } else if (Platform.isWindows) { + final windowsInfo = await deviceInfo.windowsInfo; + deviceID = windowsInfo.deviceId; + } + } on Exception { + return null; + } + return deviceID; +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.web.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.web.dart new file mode 100644 index 00000000000..a6ee9e49722 --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/device_info/device_info.web.dart @@ -0,0 +1,16 @@ +import 'dart:html'; + +import 'package:amplify_core/amplify_core.dart'; + +const _localStorageKey = 'amplify-cloudwatch-logger-device-id'; + +/// {@macro amplify_logging_cloudwatch.device_info} +Future getDeviceId() async { + var deviceID = window.localStorage[_localStorageKey]; + if (deviceID != null) { + return deviceID; + } + deviceID = UUID.getUUID(); + window.localStorage.putIfAbsent(_localStorageKey, () => deviceID!); + return deviceID; +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart index 74f5ac28b97..9e9fd28c3f1 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart @@ -42,7 +42,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - FutureOr isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { throw UnimplementedError('isFull() has not been implemented.'); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart index 627213d0e22..7acc8ee13c7 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart @@ -56,7 +56,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - Future isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { return _database.isFull(maxSizeInMB); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart index 0dc36a615cf..c20db8c60be 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart @@ -14,8 +14,8 @@ class DartQueuedItemStore // ignore: avoid_unused_constructor_parameters DartQueuedItemStore(String? storagePath); - late final Future _database = () async { - if (await IndexedDbAdapter.checkIsIndexedDBSupported()) { + late final QueuedItemStore _database = () { + if (IndexedDbAdapter.checkIsIndexedDBSupported()) { return IndexedDbAdapter(); } logger.warn( @@ -34,8 +34,7 @@ class DartQueuedItemStore String timestamp, { bool enableQueueRotation = false, }) async { - final db = await _database; - await db.addItem( + await _database.addItem( string, timestamp, enableQueueRotation: enableQueueRotation, @@ -44,34 +43,29 @@ class DartQueuedItemStore @override Future deleteItems(Iterable items) async { - final db = await _database; - await db.deleteItems(items); + await _database.deleteItems(items); } @override Future> getCount(int count) async { - final db = await _database; - return db.getCount(count); + return _database.getCount(count); } @override Future> getAll() async { - final db = await _database; - return db.getAll(); + return _database.getAll(); } @override - Future isFull(int maxSizeInMB) async { - final db = await _database; - return db.isFull(maxSizeInMB); + bool isFull(int maxSizeInMB) { + return _database.isFull(maxSizeInMB); } /// Clear IndexedDB data. @override @visibleForTesting Future clear() async { - final db = await _database; - return db.clear(); + return _database.clear(); } @override diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart index a7d6026ccea..d29f32777c1 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart @@ -104,7 +104,7 @@ class DriftQueuedItemStore extends _$DriftQueuedItemStore } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart index 37c1812e789..c4e4c13763a 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart @@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore { } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } @@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore { void close() {} /// Check that IndexDB will work on this device. - static Future checkIsIndexedDBSupported() async { + static bool checkIsIndexedDBSupported() { if (indexedDB == null) { return false; } // indexedDB will be non-null in Firefox private browsing, // but will fail to open. try { - final openRequest = indexedDB!.open('test', 1); - await openRequest.future; + indexedDB!.open('test', 1).result; return true; } on Object { return false; diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/pubspec.yaml b/packages/logging_cloudwatch/amplify_logging_cloudwatch/pubspec.yaml index 2c5bf793db5..d378666f82f 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/pubspec.yaml +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/pubspec.yaml @@ -15,9 +15,11 @@ dependencies: aws_common: ">=0.6.0 <0.7.0" aws_logging_cloudwatch: ^0.1.0 collection: ^1.15.0 + device_info_plus: ^9.0.0 drift: ">=2.11.0 <2.12.0" flutter: sdk: flutter + intl: ">=0.18.0 <1.0.0" meta: ^1.7.0 path_provider: ^2.0.0 @@ -27,4 +29,6 @@ dev_dependencies: build_test: ^2.0.0 build_web_compilers: ^4.0.0 drift_dev: ^2.2.0+1 + flutter_test: + sdk: flutter test: ^1.22.1 diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/amplify_log_stream_provider_test.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/amplify_log_stream_provider_test.dart new file mode 100644 index 00000000000..b9e04b2b56c --- /dev/null +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/amplify_log_stream_provider_test.dart @@ -0,0 +1,21 @@ +@TestOn('vm') + +import 'package:amplify_logging_cloudwatch/src/amplify_log_stream_name_provider.dart'; +import 'package:flutter_test/flutter_test.dart' as flutter; +import 'package:test/test.dart'; + +void main() { + flutter.TestWidgetsFlutterBinding.ensureInitialized(); + test('it uses uuid and guest when their values are not provided', () async { + final logStreamNameProvider = AmplifyLogStreamNameProvider(); + await expectLater(logStreamNameProvider.defaultLogStreamName(), completes); + }); + + test('it caches the device Id', () async { + final logStreamNameProvider = AmplifyLogStreamNameProvider(); + final logStreamName1 = await logStreamNameProvider.defaultLogStreamName(); + final logStreamName2 = await logStreamNameProvider.defaultLogStreamName(); + + expect(logStreamName1, logStreamName2); + }); +} diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart index 4c18fc12d88..c49b1057808 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart @@ -233,14 +233,14 @@ void main() { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); for (var i = 0; i < 100; i++) { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, ); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index 6476e16cdbc..b3eecf74bbd 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -32,9 +32,10 @@ import 'package:meta/meta.dart'; const int _maxNumberOfLogEventsInBatch = 10000; const int _maxLogEventsBatchSize = 1048576; const int _baseBufferSize = 26; +const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; const int _maxLogEventSize = 256000; -final int _maxLogEventsTimeSpanInBatch = - const Duration(hours: 24).inMilliseconds; +const Duration _minusMaxLogEventTimeInFuture = Duration(hours: -2); +const Duration _baseRetryInterval = Duration(seconds: 10); typedef _LogBatch = (List logQueues, List logEvents); @@ -72,10 +73,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin _logStreamProvider = logStreamProvider ?? DefaultCloudWatchLogStreamProvider( logGroupName: pluginConfig.logGroupName, - client: CloudWatchLogsClient( - region: pluginConfig.region, - credentialsProvider: credentialsProvider, - ), + region: pluginConfig.region, + credentialsProvider: credentialsProvider, ) { _timer = pluginConfig.flushInterval > Duration.zero ? StoppableTimer( @@ -113,7 +112,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin bool _enabled; StoppableTimer? _timer; RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider; - + int _retryCount = 0; + DateTime? _retryTime; set remoteLoggingConstraintProvider( RemoteLoggingConstraintProvider remoteProvider, ) { @@ -129,32 +129,89 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin Future startSyncing() async { final batchStream = _getLogBatchesToSync(); await for (final (logs, events) in batchStream) { - final response = await _sendToCloudWatch(events); - // TODO(nikahsn): handle tooOldLogEventEndIndex - // and expiredLogEventEndIndex. - if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) { - // TODO(nikahsn): throw and exception to enable log rotation if the - // log store is full. - break; + _TooNewLogEventException? tooNewException; + while (logs.isNotEmpty && events.isNotEmpty) { + final rejectedLogEventsInfo = + (await _sendToCloudWatch(events)).rejectedLogEventsInfo; + if (rejectedLogEventsInfo == null) { + await _logStore.deleteItems(logs); + break; + } + + final (tooOldEndIndex, tooNewStartIndex) = + rejectedLogEventsInfo.parse(events.length); + + if (_isValidIndex(tooNewStartIndex, events.length)) { + tooNewException = _TooNewLogEventException( + events[tooNewStartIndex!].timestamp.toInt(), + ); + // set logs to end before the index. + logs.removeRange(tooNewStartIndex, events.length); + // set events to end before the index. + events.removeRange(tooNewStartIndex, events.length); + } + if (_isValidIndex(tooOldEndIndex, events.length)) { + // remove old logs from log store. + await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1)); + // set logs to start after the index. + logs.removeRange(0, tooOldEndIndex + 1); + // set events to start after the index. + events.removeRange(0, tooOldEndIndex + 1); + } + } + // after sending each batch to CloudWatch check if the batch has + // `tooNewException` and throw to stop syncing next batches. + if (tooNewException != null) { + throw tooNewException; } - await _logStore.deleteItems(logs); } } if (!_syncing) { - // TODO(nikahsn): disable log rotation. _syncing = true; + DateTime? nextRetry; try { await startSyncing(); + } on _TooNewLogEventException catch (e) { + nextRetry = + DateTime.fromMillisecondsSinceEpoch(e.timeInMillisecondsSinceEpoch) + .add(_minusMaxLogEventTimeInFuture); } on Exception catch (e) { logger.error('Failed to sync logs to CloudWatch.', e); - // TODO(nikahsn): enable log rotation if the log store is full } finally { + _handleFullLogStoreAfterSync( + retryTime: nextRetry, + ); _syncing = false; } } } + void _handleFullLogStoreAfterSync({ + DateTime? retryTime, + }) { + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + if (!isLogStoreFull) { + _retryCount = 0; + _retryTime = null; + return; + } + if (retryTime != null && retryTime.isAfter(DateTime.timestamp())) { + _retryTime = retryTime; + return; + } + _retryCount += 1; + _retryTime = DateTime.timestamp().add((_baseRetryInterval * _retryCount)); + } + + bool _shouldSyncOnFullLogStore() { + if (_retryTime == null) { + return true; + } + return !(_retryTime!.isAfter(DateTime.timestamp())); + } + void _onTimerError(Object e) { logger.error('Failed to sync logs to CloudWatch.', e); } @@ -225,11 +282,17 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin return; } final item = logEntry.toQueuedItem(); + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + final shouldEnableQueueRotation = isLogStoreFull && _retryTime != null; + await _logStore.addItem( item.value, item.timestamp, + enableQueueRotation: shouldEnableQueueRotation, ); - if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) { + + if (isLogStoreFull && _shouldSyncOnFullLogStore()) { await _startSyncingIfNotInProgress(); } } @@ -253,6 +316,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin _enabled = false; _timer?.stop(); await _logStore.clear(); + _retryCount = 0; + _retryTime = null; } /// Sends logs on-demand to CloudWatch. @@ -285,3 +350,34 @@ extension on LogEntry { ); } } + +extension on RejectedLogEventsInfo { + (int? pastEndIndex, int? futureStartIndex) parse(int length) { + int? pastEndIndex; + int? futureStartIndex; + + if (_isValidIndex(tooOldLogEventEndIndex, length)) { + pastEndIndex = tooOldLogEventEndIndex; + } + if (_isValidIndex(expiredLogEventEndIndex, length)) { + pastEndIndex = pastEndIndex == null + ? expiredLogEventEndIndex + : max(pastEndIndex, expiredLogEventEndIndex!); + } + if (_isValidIndex(tooNewLogEventStartIndex, length)) { + futureStartIndex = tooNewLogEventStartIndex; + } + return (pastEndIndex, futureStartIndex); + } +} + +class _TooNewLogEventException implements Exception { + const _TooNewLogEventException( + this.timeInMillisecondsSinceEpoch, + ); + final int timeInMillisecondsSinceEpoch; +} + +bool _isValidIndex(int? index, int length) { + return index != null && index >= 0 && index <= length - 1; +} diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart index 963c47e5e13..8828b2609e3 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart @@ -3,6 +3,7 @@ import 'dart:async'; +import 'package:aws_common/aws_common.dart'; import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart'; import 'package:intl/intl.dart'; @@ -22,30 +23,36 @@ abstract class CloudWatchLogStreamProvider { /// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider} /// The default implementaion of [CloudWatchLogStreamProvider]. /// -/// It uses `defaultLogStreamName` if provided otherwise uses `yyyy-MM-dd` -/// date format of UTC time now for the `defaultLogStreamName`. +/// It uses `defaultLogStreamNameProvider` if provided otherwise uses +/// `yyyy-MM-dd` date format of UTC time now for the `defaultLogStreamName`. /// {@endtemplate} class DefaultCloudWatchLogStreamProvider implements CloudWatchLogStreamProvider { /// {@macro aws_logging_cloudwatch.default_cloudwatch_logstream_provider} DefaultCloudWatchLogStreamProvider({ - required CloudWatchLogsClient client, + required AWSCredentialsProvider credentialsProvider, + required String region, required String logGroupName, - String? defaultLogStreamName, - }) : _defaultLogStreamName = defaultLogStreamName, - _logGroupName = logGroupName, - _client = client; + FutureOr Function()? defaultLogStreamNameProvider, + }) : _logGroupName = logGroupName, + _client = CloudWatchLogsClient( + region: region, + credentialsProvider: credentialsProvider, + ), + _defaultLogStreamNameProvider = defaultLogStreamNameProvider; - final String? _defaultLogStreamName; + final FutureOr Function()? _defaultLogStreamNameProvider; final String _logGroupName; final CloudWatchLogsClient _client; + static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd'); final _createdLogStreams = {}; @override Future get defaultLogStream async { - final logStreamName = - _defaultLogStreamName ?? _dateFormat.format(DateTime.timestamp()); + final logStreamNameProvider = + _defaultLogStreamNameProvider ?? _timeBasedLogStreamNameProvider; + final logStreamName = await logStreamNameProvider(); if (_createdLogStreams.contains(logStreamName)) { return logStreamName; } @@ -54,6 +61,10 @@ class DefaultCloudWatchLogStreamProvider return logStreamName; } + String _timeBasedLogStreamNameProvider() { + return _dateFormat.format(DateTime.timestamp()); + } + @override Future createLogStream(String logStreamName) async { try { diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart index 24e197d449c..9317aa48789 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart @@ -24,7 +24,7 @@ abstract interface class QueuedItemStore { FutureOr> getAll(); /// Whether the queue size is reached [maxSizeInMB]. - FutureOr isFull(int maxSizeInMB); + bool isFull(int maxSizeInMB); /// Clear the queue of items. FutureOr clear(); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart index fe699366825..4e4f57f39e2 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart @@ -45,7 +45,17 @@ void main() { ), QueuedItem( id: 2, - value: 'second og message', + value: 'second log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 3, + value: 'third log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 4, + value: 'forth log message', timestamp: DateTime.timestamp().toIso8601String(), ), ]; @@ -62,17 +72,34 @@ void main() { logStreamProvider: mockCloudWatchLogStreamProvider, ); }); + test('when enabled, logs are added to the item store', () async { - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) - .thenAnswer((_) async => Future.value(false)); + .thenReturn(false); + plugin.enable(); + await expectLater( plugin.handleLogEntry(errorLog), completes, ); - verify(() => mockQueuedItemStore.addItem(any(), any())).called(1); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + verify( () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), ).called(1); @@ -138,8 +165,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.deleteItems(any())) .thenAnswer((_) async => {}); @@ -184,8 +211,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.getAll()).thenAnswer( (_) async => Future>.value(queuedItems), @@ -239,6 +266,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -276,6 +306,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -289,5 +322,405 @@ void main() { () => mockCloudWatchLogStreamProvider.createLogStream(any()), ).called(1); }); + + test('it enables log rotation when log store is full and retry is set', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: true, + ), + ).called(1); + }); + + test( + 'it does not enable log rotation when log store is full if retry is not ' + 'set. it start sync on full log store.', + () async { + final isFullResponse = [false, true, false]; + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenAnswer((_) => isFullResponse.removeAt(0)); + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => + Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(2); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + }, + ); + + test('it does not sync on full log store if retry time not reached', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(1); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1); + }); + + test( + 'it deletes too old logs in the batch and sync the rest', + () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooOldLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => mockQueuedItemStore + .deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(1), + ); + }, + ); + + test('it deletes expired logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(expiredLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(1), + ); + }); + + test('it leaves too new logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooNewLogEventStartIndex: 1), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 1); + expect(captures.captured.length, 1); + expect( + (captures.captured.last as Iterable).first, + queuedItems.first, + ); + }); + + test( + 'it deltes old logs and leaves too new logs in the batch' + ' and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: RejectedLogEventsInfo( + expiredLogEventEndIndex: 0, + tooOldLogEventEndIndex: 1, + tooNewLogEventStartIndex: 3, + ), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable), + queuedItems.sublist(0, 2), + ); + + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(2, 3), + ); + }); }); } diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart index ec2bca88743..ab035a6ca2e 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart @@ -231,7 +231,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); // add enough items to exceed capacity limit of 1mb @@ -239,7 +239,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, );