-
Notifications
You must be signed in to change notification settings - Fork 249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(logging): enable log rotation and set retry on full log store sync #3699
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore { | |
} | ||
|
||
@override | ||
Future<bool> isFull(int maxSizeInMB) async { | ||
bool isFull(int maxSizeInMB) { | ||
final maxBytes = maxSizeInMB * 1024 * 1024; | ||
return _currentTotalByteSize >= maxBytes; | ||
} | ||
|
@@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore { | |
void close() {} | ||
|
||
/// Check that IndexDB will work on this device. | ||
static Future<bool> checkIsIndexedDBSupported() async { | ||
static bool checkIsIndexedDBSupported() { | ||
if (indexedDB == null) { | ||
return false; | ||
} | ||
// indexedDB will be non-null in Firefox private browsing, | ||
// but will fail to open. | ||
try { | ||
final openRequest = indexedDB!.open('test', 1); | ||
await openRequest.future; | ||
indexedDB!.open('test', 1).result; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But why would this be guaranteed to throw in the same way as before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. based on the docs If the operation is successful, the value of result is a connection to the database. If the request failed and the result is not available, an InvalidStateError exception is thrown. |
||
return true; | ||
} on Object { | ||
return false; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,11 @@ import 'package:meta/meta.dart'; | |
const int _maxNumberOfLogEventsInBatch = 10000; | ||
const int _maxLogEventsBatchSize = 1048576; | ||
const int _baseBufferSize = 26; | ||
const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; | ||
const int _maxLogEventSize = 256000; | ||
final int _maxLogEventsTimeSpanInBatch = | ||
const Duration(hours: 24).inMilliseconds; | ||
const int _maxLogEventTimeInFutureInMilliseconds = | ||
Duration.millisecondsPerHour * 2; | ||
const int _baseRetryIntervalInMilliseconds = Duration.millisecondsPerMinute; | ||
|
||
typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents); | ||
|
||
|
@@ -123,7 +125,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin | |
bool _enabled; | ||
StoppableTimer? _timer; | ||
RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider; | ||
|
||
int _retryCount = 0; | ||
int? _retryTimeInMillisecondsSinceEpoch; | ||
set remoteLoggingConstraintProvider( | ||
RemoteLoggingConstraintProvider remoteProvider, | ||
) { | ||
|
@@ -139,32 +142,110 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin | |
Future<void> startSyncing() async { | ||
final batchStream = _getLogBatchesToSync(); | ||
await for (final (logs, events) in batchStream) { | ||
final response = await _sendToCloudWatch(events); | ||
// TODO(nikahsn): handle tooOldLogEventEndIndex | ||
// and expiredLogEventEndIndex. | ||
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) { | ||
// TODO(nikahsn): throw and exception to enable log rotation if the | ||
// log store is full. | ||
_TooNewLogEventException? tooNewException; | ||
while (logs.isNotEmpty && events.isNotEmpty) { | ||
final rejectedLogEventsInfo = | ||
(await _sendToCloudWatch(events)).rejectedLogEventsInfo; | ||
final expiredLogEventEndIndex = | ||
rejectedLogEventsInfo?.expiredLogEventEndIndex; | ||
final tooOldLogEventEndIndex = | ||
rejectedLogEventsInfo?.tooOldLogEventEndIndex; | ||
final tooNewLogEventStartIndex = | ||
rejectedLogEventsInfo?.tooNewLogEventStartIndex; | ||
|
||
if (expiredLogEventEndIndex != null && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you extract this logic into a function so it's not duplicated 3 times? |
||
expiredLogEventEndIndex >= 0 && | ||
expiredLogEventEndIndex <= events.length - 1) { | ||
// delete old logs from log store | ||
await _logStore | ||
.deleteItems(logs.sublist(0, expiredLogEventEndIndex + 1)); | ||
// set logs to start from next | ||
logs.removeRange(0, expiredLogEventEndIndex + 1); | ||
// set events to start from next | ||
events.removeRange(0, expiredLogEventEndIndex + 1); | ||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't multiple of these blocks apply? |
||
} | ||
if (tooOldLogEventEndIndex != null && | ||
tooOldLogEventEndIndex >= 0 && | ||
tooOldLogEventEndIndex <= events.length - 1) { | ||
// delete old logs from log store | ||
await _logStore | ||
.deleteItems(logs.sublist(0, tooOldLogEventEndIndex + 1)); | ||
// set logs to start from next | ||
logs.removeRange(0, tooOldLogEventEndIndex + 1); | ||
// set events to start from next | ||
events.removeRange(0, tooOldLogEventEndIndex + 1); | ||
continue; | ||
} | ||
if (tooNewLogEventStartIndex != null && | ||
tooNewLogEventStartIndex >= 0 && | ||
tooNewLogEventStartIndex <= events.length - 1) { | ||
tooNewException = _TooNewLogEventException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not throw it here? The control flow would be easier to follow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we want to continue with syncing the current batch after sanitizing the events in the current batch and then throw to stop syncing next batches |
||
events[tooNewLogEventStartIndex].timestamp.toInt(), | ||
); | ||
// set logs to end at the index | ||
logs.removeRange(tooNewLogEventStartIndex, logs.length); | ||
// set events to end at the index | ||
events.removeRange(tooNewLogEventStartIndex, events.length); | ||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why continue if all batches after this will be newer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we want to continue with syncing the current batch after sanitizing the events in the current batch and then throw to stop syncing next batches There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can remove |
||
} | ||
await _logStore.deleteItems(logs); | ||
break; | ||
} | ||
await _logStore.deleteItems(logs); | ||
if (tooNewException != null) { | ||
throw tooNewException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be cleaner if you just returned the retry time instead of throwing |
||
} | ||
} | ||
} | ||
|
||
if (!_syncing) { | ||
// TODO(nikahsn): disable log rotation. | ||
_syncing = true; | ||
var nextRetry = 0; | ||
try { | ||
await startSyncing(); | ||
} on _TooNewLogEventException catch (e) { | ||
nextRetry = e.timeInMillisecondsSinceEpoch - | ||
_maxLogEventTimeInFutureInMilliseconds; | ||
} on Exception catch (e) { | ||
logger.error('Failed to sync logs to CloudWatch.', e); | ||
// TODO(nikahsn): enable log rotation if the log store is full | ||
} finally { | ||
_handleFullLogStoreAfterSync( | ||
retryTimeInMillisecondsSinceEpoch: nextRetry, | ||
); | ||
_syncing = false; | ||
} | ||
} | ||
} | ||
|
||
void _handleFullLogStoreAfterSync({ | ||
int retryTimeInMillisecondsSinceEpoch = 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you change this logic to work with |
||
}) { | ||
final isLogStoreFull = | ||
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); | ||
if (!isLogStoreFull) { | ||
_retryCount = 0; | ||
_retryTimeInMillisecondsSinceEpoch = null; | ||
return; | ||
} | ||
if (retryTimeInMillisecondsSinceEpoch > 0 && | ||
retryTimeInMillisecondsSinceEpoch > | ||
DateTime.now().millisecondsSinceEpoch) { | ||
_retryTimeInMillisecondsSinceEpoch = retryTimeInMillisecondsSinceEpoch; | ||
return; | ||
} | ||
_retryCount += 1; | ||
_retryTimeInMillisecondsSinceEpoch = DateTime.now().millisecondsSinceEpoch + | ||
(_retryCount * _baseRetryIntervalInMilliseconds); | ||
} | ||
|
||
bool _shouldSyncOnFullLogStore() { | ||
if (_retryTimeInMillisecondsSinceEpoch == null) { | ||
return true; | ||
} | ||
return DateTime.now().millisecondsSinceEpoch >= | ||
_retryTimeInMillisecondsSinceEpoch!; | ||
} | ||
|
||
void _onTimerError(Object e) { | ||
logger.error('Failed to sync logs to CloudWatch.', e); | ||
} | ||
|
@@ -235,11 +316,18 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin | |
return; | ||
} | ||
final item = logEntry.toQueuedItem(); | ||
final isLogStoreFull = | ||
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); | ||
final shouldEnableQueueRotation = | ||
isLogStoreFull && _retryTimeInMillisecondsSinceEpoch != null; | ||
|
||
await _logStore.addItem( | ||
item.value, | ||
item.timestamp, | ||
enableQueueRotation: shouldEnableQueueRotation, | ||
); | ||
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) { | ||
|
||
if (isLogStoreFull && _shouldSyncOnFullLogStore()) { | ||
await _startSyncingIfNotInProgress(); | ||
} | ||
} | ||
|
@@ -263,6 +351,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin | |
_enabled = false; | ||
_timer?.stop(); | ||
await _logStore.clear(); | ||
_retryCount = 0; | ||
_retryTimeInMillisecondsSinceEpoch = null; | ||
} | ||
|
||
/// Sends logs on-demand to CloudWatch. | ||
|
@@ -295,3 +385,10 @@ extension on LogEntry { | |
); | ||
} | ||
} | ||
|
||
class _TooNewLogEventException implements Exception { | ||
const _TooNewLogEventException( | ||
this.timeInMillisecondsSinceEpoch, | ||
); | ||
final int timeInMillisecondsSinceEpoch; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and throughout