Skip to content

Commit

Permalink
feat(logging): log stream provider to cache log stream (#3646)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikaHsn authored Aug 31, 2023
1 parent 5b102a2 commit 5206d87
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import 'dart:math';

import 'package:aws_common/aws_common.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/in_memory_queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
import 'package:fixnum/fixnum.dart';
Expand All @@ -32,10 +30,11 @@ import 'package:meta/meta.dart';
// The maximum number of log events in a batch is 10,000.

const int _maxNumberOfLogEventsInBatch = 10000;
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
const int _maxLogEventsBatchSize = 1048576;
const int _baseBufferSize = 26;
const int _maxLogEventSize = 256000;
final int _maxLogEventsTimeSpanInBatch =
const Duration(hours: 24).inMilliseconds;

typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);

Expand Down Expand Up @@ -138,51 +137,28 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin

Future<void> _startSyncingIfNotInProgress() async {
Future<void> startSyncing() async {
String logStream;
try {
logStream = await _logStreamProvider.logStream;
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.clear();
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
'local store.',
);
}
logger.error('Failed to create CloudWatch log stream', e);
return;
}

final batcheStream = _getLogBatchesToSync();
await for (final (logs, events) in batcheStream) {
try {
final response = await _sendToCloudWatch(events, logStream);
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
null) {
break;
}
await _logStore.deleteItems(logs);
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.deleteItems(logs);
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
'from local store.',
);
}
logger.error('Failed to sync batched logs to CloudWatch', e);
final batchStream = _getLogBatchesToSync();
await for (final (logs, events) in batchStream) {
final response = await _sendToCloudWatch(events);
// TODO(nikahsn): handle tooOldLogEventEndIndex
// and expiredLogEventEndIndex.
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
// TODO(nikahsn): throw and exception to enable log rotation if the
// log store is full.
break;
}
await _logStore.deleteItems(logs);
}
}

if (!_syncing) {
// TODO(nikahsn): disable log rotation.
_syncing = true;
try {
await startSyncing();
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
// TODO(nikahsn): enable log rotation if the log store is full
} finally {
_syncing = false;
}
Expand All @@ -200,20 +176,25 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin

Future<PutLogEventsResponse> _sendToCloudWatch(
List<InputLogEvent> logEvents,
String logStreamName,
) async {
final logStreamName = await _logStreamProvider.defaultLogStream;
final request = PutLogEventsRequest(
logGroupName: _pluginConfig.logGroupName,
logStreamName: logStreamName,
logEvents: logEvents,
);
return _client.putLogEvents(request).result;
try {
return await _client.putLogEvents(request).result;
} on ResourceNotFoundException {
await _logStreamProvider.createLogStream(logStreamName);
return _client.putLogEvents(request).result;
}
}

Stream<_LogBatch> _getLogBatchesToSync() async* {
final queuedLogs = (await _logStore.getAll()).toList();
if (queuedLogs.isEmpty) {
yield ([], []);
return;
}
final logEvents = <InputLogEvent>[];
final logQueues = <QueuedItem>[];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'dart:async';

import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:intl/intl.dart';

Expand All @@ -9,42 +11,51 @@ import 'package:intl/intl.dart';
/// [CloudWatchLogStreamProvider]
/// {@endtemplate}
abstract class CloudWatchLogStreamProvider {
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
///
/// It creates the log stream if it does not exists.
Future<String> get logStream;
/// Returns the default log stream name from cache. if cache is missing it
/// calls [createLogStream] and return the log stream name.
FutureOr<String> get defaultLogStream;

/// Creates the log stream and add it to the cache.
Future<void> createLogStream(String logStreamName);
}

/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
/// The default implementaion of [CloudWatchLogStreamProvider].
///
/// It uses `logStreamName` if provided otherwise uses `yyyy-MM-dd` date format
/// of UTC time now for the `logStreamName`.
/// It uses `defaultLogStreamName` if provided otherwise uses `yyyy-MM-dd`
/// date format of UTC time now for the `defaultLogStreamName`.
/// {@endtemplate}
class DefaultCloudWatchLogStreamProvider
implements CloudWatchLogStreamProvider {
/// {@macro aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
DefaultCloudWatchLogStreamProvider({
required CloudWatchLogsClient client,
required String logGroupName,
String? logStreamName,
}) : _logStreamName = logStreamName,
String? defaultLogStreamName,
}) : _defaultLogStreamName = defaultLogStreamName,
_logGroupName = logGroupName,
_client = client;

final String? _logStreamName;
final String? _defaultLogStreamName;
final String _logGroupName;
final CloudWatchLogsClient _client;
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');
final _createdLogStreams = <String>{};

/// Creates CloudWatch log stream if does not exists and returns
/// the log stream name.
///
/// Throws an [Exception] if fails to create the log stream.
@override
Future<String> get logStream async {
Future<String> get defaultLogStream async {
final logStreamName =
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
_defaultLogStreamName ?? _dateFormat.format(DateTime.timestamp());
if (_createdLogStreams.contains(logStreamName)) {
return logStreamName;
}
await createLogStream(logStreamName);
_createdLogStreams.add(logStreamName);
return logStreamName;
}

@override
Future<void> createLogStream(String logStreamName) async {
try {
await _client
.createLogStream(
Expand All @@ -55,10 +66,7 @@ class DefaultCloudWatchLogStreamProvider
)
.result;
} on ResourceAlreadyExistsException {
return logStreamName;
} on Exception {
rethrow;
return;
}
return logStreamName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class StoppableTimer {
required Future<void> Function() callback,
required void Function(Object) onError,
}) : _callback = callback,
_onError = onError,
_timer = Timer.periodic(duration, (Timer t) {
callback().catchError((Object e) {
onError(e);
Expand All @@ -25,12 +26,15 @@ class StoppableTimer {

/// [Duration] between invocations of the provided callback function.
final Duration duration;
final void Function() _callback;
final Future<void> Function() _callback;
final void Function(Object) _onError;

/// Start the timer.
void start() {
if (_timer.isActive) return;
_timer = Timer.periodic(duration, (Timer t) => _callback());
_timer = Timer.periodic(duration, (Timer t) {
_callback().catchError(_onError);
});
}

/// Stop the timer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import 'dart:async';

import 'package:aws_common/aws_common.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:mocktail/mocktail.dart';
import 'package:test/test.dart';
Expand Down Expand Up @@ -132,7 +131,101 @@ void main() {
(_) async => Future<PutLogEventsResponse>.value(PutLogEventsResponse()),
);

when(() => mockCloudWatchLogStreamProvider.logStream)
when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
(_) => mockPutLogEventsOperation,
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.addItem(any(), any()))
.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.deleteItems(any()))
.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
});

test('does not start a sync if a sync is in progress', () async {
when(
() => mockPutLogEventsOperation.result,
).thenAnswer(
(_) async => Future<PutLogEventsResponse>.value(
PutLogEventsResponse(),
),
);

when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
(_) => mockPutLogEventsOperation,
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.addItem(any(), any()))
.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

final flushLogs = plugin.flushLogs();
await expectLater(
plugin.flushLogs(),
completes,
);
await flushLogs;

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
});

test('does not delete logs if they are too new', () async {
when(
() => mockPutLogEventsOperation.result,
).thenAnswer(
(_) async => Future<PutLogEventsResponse>.value(
PutLogEventsResponse(
rejectedLogEventsInfo:
RejectedLogEventsInfo(tooNewLogEventStartIndex: 0),
),
),
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
Expand All @@ -146,11 +239,55 @@ void main() {
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await plugin.handleLogEntry(errorLog);
await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verifyNever(
() => mockQueuedItemStore.deleteItems(any()),
);
});

test('it calls create log stream on resource not found exception',
() async {
when(() => mockCloudWatchLogsClient.putLogEvents(any()))
.thenThrow(ResourceNotFoundException());

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockCloudWatchLogStreamProvider.createLogStream(any()))
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verify(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
).called(1);
});
});
}
Loading

0 comments on commit 5206d87

Please sign in to comment.