Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logging): enable log rotation and set retry on full log store sync #3699

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable {
}

@override
FutureOr<bool> isFull(int maxSizeInMB) {
bool isFull(int maxSizeInMB) {
throw UnimplementedError('isFull() has not been implemented.');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable {
}

@override
Future<bool> isFull(int maxSizeInMB) {
bool isFull(int maxSizeInMB) {
return _database.isFull(maxSizeInMB);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class DartQueuedItemStore
// ignore: avoid_unused_constructor_parameters
DartQueuedItemStore(String? storagePath);

late final Future<QueuedItemStore> _database = () async {
if (await IndexedDbAdapter.checkIsIndexedDBSupported()) {
late final QueuedItemStore _database = () {
if (IndexedDbAdapter.checkIsIndexedDBSupported()) {
return IndexedDbAdapter();
}
logger.warn(
Expand All @@ -34,7 +34,7 @@ class DartQueuedItemStore
String timestamp, {
bool enableQueueRotation = false,
}) async {
final db = await _database;
final db = _database;
await db.addItem(
string,
timestamp,
Expand All @@ -44,33 +44,33 @@ class DartQueuedItemStore

@override
Future<void> deleteItems(Iterable<QueuedItem> items) async {
final db = await _database;
final db = _database;
await db.deleteItems(items);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and throughout

Suggested change
final db = _database;
await db.deleteItems(items);
await _database.deleteItems(items);

}

@override
Future<Iterable<QueuedItem>> getCount(int count) async {
final db = await _database;
final db = _database;
return db.getCount(count);
}

@override
Future<Iterable<QueuedItem>> getAll() async {
final db = await _database;
final db = _database;
return db.getAll();
}

@override
Future<bool> isFull(int maxSizeInMB) async {
final db = await _database;
bool isFull(int maxSizeInMB) {
final db = _database;
return db.isFull(maxSizeInMB);
}

/// Clear IndexedDB data.
@override
@visibleForTesting
Future<void> clear() async {
final db = await _database;
final db = _database;
return db.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DriftQueuedItemStore extends _$DriftQueuedItemStore
}

@override
Future<bool> isFull(int maxSizeInMB) async {
bool isFull(int maxSizeInMB) {
final maxBytes = maxSizeInMB * 1024 * 1024;
return _currentTotalByteSize >= maxBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore {
}

@override
Future<bool> isFull(int maxSizeInMB) async {
bool isFull(int maxSizeInMB) {
final maxBytes = maxSizeInMB * 1024 * 1024;
return _currentTotalByteSize >= maxBytes;
}
Expand All @@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore {
void close() {}

/// Check that IndexDB will work on this device.
static Future<bool> checkIsIndexedDBSupported() async {
static bool checkIsIndexedDBSupported() {
if (indexedDB == null) {
return false;
}
// indexedDB will be non-null in Firefox private browsing,
// but will fail to open.
try {
final openRequest = indexedDB!.open('test', 1);
await openRequest.future;
indexedDB!.open('test', 1).result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueuedItemStore.isFull() is a sync method. however it was implemented async because the web implementation calls checkIsIndexedDBSupported to either use indexedDB or InMemoryQueuedItemStore. Because the checkIsIndexedDBSupported was async all the web APIs had to be async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why would this be guaranteed to throw in the same way as before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on the docs If the operation is successful, the value of result is a connection to the database. If the request failed and the result is not available, an InvalidStateError exception is thrown.

return true;
} on Object {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ void main() {
await db.addItem(largeItem, DateTime.now().toIso8601String());
}

var result = await db.isFull(capacityLimit);
var result = db.isFull(capacityLimit);
expect(result, isFalse);

for (var i = 0; i < 100; i++) {
await db.addItem(largeItem, DateTime.now().toIso8601String());
}

result = await db.isFull(capacityLimit);
result = db.isFull(capacityLimit);
expect(result, isTrue);
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import 'package:meta/meta.dart';
const int _maxNumberOfLogEventsInBatch = 10000;
const int _maxLogEventsBatchSize = 1048576;
const int _baseBufferSize = 26;
const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay;
const int _maxLogEventSize = 256000;
final int _maxLogEventsTimeSpanInBatch =
const Duration(hours: 24).inMilliseconds;
const int _maxLogEventTimeInFutureInMilliseconds =
Duration.millisecondsPerHour * 2;
const int _baseRetryIntervalInMilliseconds = Duration.millisecondsPerMinute;

typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);

Expand Down Expand Up @@ -123,7 +125,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
bool _enabled;
StoppableTimer? _timer;
RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider;

int _retryCount = 0;
int? _retryTimeInMillisecondsSinceEpoch;
set remoteLoggingConstraintProvider(
RemoteLoggingConstraintProvider remoteProvider,
) {
Expand All @@ -139,32 +142,110 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
Future<void> startSyncing() async {
final batchStream = _getLogBatchesToSync();
await for (final (logs, events) in batchStream) {
final response = await _sendToCloudWatch(events);
// TODO(nikahsn): handle tooOldLogEventEndIndex
// and expiredLogEventEndIndex.
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
// TODO(nikahsn): throw and exception to enable log rotation if the
// log store is full.
_TooNewLogEventException? tooNewException;
while (logs.isNotEmpty && events.isNotEmpty) {
final rejectedLogEventsInfo =
(await _sendToCloudWatch(events)).rejectedLogEventsInfo;
final expiredLogEventEndIndex =
rejectedLogEventsInfo?.expiredLogEventEndIndex;
final tooOldLogEventEndIndex =
rejectedLogEventsInfo?.tooOldLogEventEndIndex;
final tooNewLogEventStartIndex =
rejectedLogEventsInfo?.tooNewLogEventStartIndex;

if (expiredLogEventEndIndex != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extract this logic into a function so it's not duplicated 3 times?

expiredLogEventEndIndex >= 0 &&
expiredLogEventEndIndex <= events.length - 1) {
// delete old logs from log store
await _logStore
.deleteItems(logs.sublist(0, expiredLogEventEndIndex + 1));
// set logs to start from next
logs.removeRange(0, expiredLogEventEndIndex + 1);
// set events to start from next
events.removeRange(0, expiredLogEventEndIndex + 1);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't multiple of these blocks apply?

}
if (tooOldLogEventEndIndex != null &&
tooOldLogEventEndIndex >= 0 &&
tooOldLogEventEndIndex <= events.length - 1) {
// delete old logs from log store
await _logStore
.deleteItems(logs.sublist(0, tooOldLogEventEndIndex + 1));
// set logs to start from next
logs.removeRange(0, tooOldLogEventEndIndex + 1);
// set events to start from next
events.removeRange(0, tooOldLogEventEndIndex + 1);
continue;
}
if (tooNewLogEventStartIndex != null &&
tooNewLogEventStartIndex >= 0 &&
tooNewLogEventStartIndex <= events.length - 1) {
tooNewException = _TooNewLogEventException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw it here? The control flow would be easier to follow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to continue with syncing the current batch after sanitizing the events in the current batch and then throw to stop syncing next batches

events[tooNewLogEventStartIndex].timestamp.toInt(),
);
// set logs to end at the index
logs.removeRange(tooNewLogEventStartIndex, logs.length);
// set events to end at the index
events.removeRange(tooNewLogEventStartIndex, events.length);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why continue if all batches after this will be newer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to continue with syncing the current batch after sanitizing the events in the current batch and then throw to stop syncing next batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove continue now

}
await _logStore.deleteItems(logs);
break;
}
await _logStore.deleteItems(logs);
if (tooNewException != null) {
throw tooNewException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be cleaner if you just returned the retry time instead of throwing

}
}
}

if (!_syncing) {
// TODO(nikahsn): disable log rotation.
_syncing = true;
var nextRetry = 0;
try {
await startSyncing();
} on _TooNewLogEventException catch (e) {
nextRetry = e.timeInMillisecondsSinceEpoch -
_maxLogEventTimeInFutureInMilliseconds;
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
// TODO(nikahsn): enable log rotation if the log store is full
} finally {
_handleFullLogStoreAfterSync(
retryTimeInMillisecondsSinceEpoch: nextRetry,
);
_syncing = false;
}
}
}

void _handleFullLogStoreAfterSync({
int retryTimeInMillisecondsSinceEpoch = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this logic to work with DateTime and Duration objects instead of integers? This is very opaque and hard to follow

}) {
final isLogStoreFull =
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB);
if (!isLogStoreFull) {
_retryCount = 0;
_retryTimeInMillisecondsSinceEpoch = null;
return;
}
if (retryTimeInMillisecondsSinceEpoch > 0 &&
retryTimeInMillisecondsSinceEpoch >
DateTime.now().millisecondsSinceEpoch) {
_retryTimeInMillisecondsSinceEpoch = retryTimeInMillisecondsSinceEpoch;
return;
}
_retryCount += 1;
_retryTimeInMillisecondsSinceEpoch = DateTime.now().millisecondsSinceEpoch +
(_retryCount * _baseRetryIntervalInMilliseconds);
}

bool _shouldSyncOnFullLogStore() {
if (_retryTimeInMillisecondsSinceEpoch == null) {
return true;
}
return DateTime.now().millisecondsSinceEpoch >=
_retryTimeInMillisecondsSinceEpoch!;
}

void _onTimerError(Object e) {
logger.error('Failed to sync logs to CloudWatch.', e);
}
Expand Down Expand Up @@ -235,11 +316,18 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
return;
}
final item = logEntry.toQueuedItem();
final isLogStoreFull =
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB);
final shouldEnableQueueRotation =
isLogStoreFull && _retryTimeInMillisecondsSinceEpoch != null;

await _logStore.addItem(
item.value,
item.timestamp,
enableQueueRotation: shouldEnableQueueRotation,
);
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {

if (isLogStoreFull && _shouldSyncOnFullLogStore()) {
await _startSyncingIfNotInProgress();
}
}
Expand All @@ -263,6 +351,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
_enabled = false;
_timer?.stop();
await _logStore.clear();
_retryCount = 0;
_retryTimeInMillisecondsSinceEpoch = null;
}

/// Sends logs on-demand to CloudWatch.
Expand Down Expand Up @@ -295,3 +385,10 @@ extension on LogEntry {
);
}
}

class _TooNewLogEventException implements Exception {
const _TooNewLogEventException(
this.timeInMillisecondsSinceEpoch,
);
final int timeInMillisecondsSinceEpoch;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ abstract interface class QueuedItemStore {
FutureOr<Iterable<QueuedItem>> getAll();

/// Whether the queue size is reached [maxSizeInMB].
FutureOr<bool> isFull(int maxSizeInMB);
bool isFull(int maxSizeInMB);

/// Clear the queue of items.
FutureOr<void> clear();
Expand Down
Loading