Skip to content

Commit

Permalink
TF-3334 Add more concurrence test cases for WebSocketQueueHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangdat committed Dec 23, 2024
1 parent f8f5350 commit 4082c67
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class WebSocketQueueHandler {
_messageQueue.removeFirst();
}

log('WebSocketQueueHandler::enqueue(): ${message.id}');
_messageQueue.add(message);
_queueController.add(message);
}
Expand All @@ -57,6 +58,7 @@ class WebSocketQueueHandler {
try {
while (queueSize > 0) {
final message = _messageQueue.removeFirst();
log('WebSocketQueueHandler::_processQueue(): processing message ${message.id}');

try {
await processMessageCallback(message);
Expand All @@ -78,13 +80,15 @@ class WebSocketQueueHandler {
}

void _addToProcessedMessages(String messageId) {
log('WebSocketQueueHandler::_addToProcessedMessages(): adding message $messageId to processed messages');
if (_processedMessageIds.length >= _maxProcessedIdsSize) {
_processedMessageIds.removeFirst();
}
_processedMessageIds.add(messageId);
}

void removeMessagesUpToCurrent(String messageId) {
log('WebSocketQueueHandler::removeMessagesUpToCurrent(): removing messages up to $messageId');
final isCurrentStateExist = _messageQueue
.any((message) => message.id == messageId);

Expand All @@ -94,6 +98,7 @@ class WebSocketQueueHandler {
}
while (queueSize > 0) {
final removedMessage = _messageQueue.removeFirst();
log('WebSocketQueueHandler::removeMessagesUpToCurrent(): removing message ${removedMessage.id} up to $messageId');
if (removedMessage.id == messageId) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:collection';

import 'package:flutter_test/flutter_test.dart';
Expand Down Expand Up @@ -32,6 +33,17 @@ void main() {

group('Basic Operations', () {
late WebSocketQueueHandler handler;
late List<String> processedMessages;
late List<dynamic> errors;

setUp(() {
processedMessages = [];
errors = [];
});

tearDown(() {
handler.dispose();
});

setUp(() {
handler = createHandler(
Expand All @@ -55,6 +67,30 @@ void main() {
expect(processedMessages, containsAllInOrder(['0', '1', '2', '3', '4']));
});

test('Duplicate messages should be skipped', () async {
final message = MockWebSocketMessage('duplicate_msg');

handler.enqueue(message);
await handler.waitForEmpty();

handler.enqueue(message);
await handler.waitForEmpty();

expect(processedMessages.length, equals(1));
expect(processedMessages, equals(['duplicate_msg']));
});

test('Queue size should not exceed maximum size', () async {
// Enqueue more messages than the max queue size
final messages = List.generate(130, (i) => MockWebSocketMessage('msg_$i'));

for (var message in messages) {
handler.enqueue(message);
}

expect(handler.queueSize, lessThanOrEqualTo(128));
});

test('Should correctly remove messages up to specified ID', () async {
final messages = List.generate(5, (index) => MockWebSocketMessage('$index'));

Expand All @@ -73,8 +109,87 @@ void main() {
});
});

group('Queue Size Management Tests', () {
test('Queue should drop oldest message when full', () async {
late List<dynamic> errors = [];

final handler = WebSocketQueueHandler(
processMessageCallback: (message) async {
await Future.delayed(const Duration(milliseconds: 10)); // Simulate processing time
processedMessages.add(message.id);
},
onErrorCallback: (error, stackTrace) {
errors.add(error);
},
);

// Fill the queue to maximum capacity (128)
for (var i = 0; i < 128; i++) {
handler.enqueue(MockWebSocketMessage('msg_$i'));
}

expect(handler.queueSize, equals(128));

// Add one more message
handler.enqueue(MockWebSocketMessage('msg_128'));

// Queue size should still be 128
expect(handler.queueSize, equals(128));

// Process all messages
await handler.waitForEmpty();

// Verify that msg_0 (the oldest) was dropped and msg_128 (newest) was processed
expect(processedMessages.contains('msg_0'), isFalse);
expect(processedMessages.contains('msg_1'), isTrue);
expect(processedMessages.contains('msg_127'), isTrue);
expect(processedMessages.contains('msg_128'), isTrue);
});

test('Queue should maintain size limit during message removal', () async {
final processedIds = <String>[];
final completer = Completer<void>();
late List<dynamic> errors = [];

final handler = WebSocketQueueHandler(
processMessageCallback: (message) async {
await Future.delayed(const Duration(milliseconds: 10));
processedIds.add(message.id);
},
onErrorCallback: (error, stackTrace) {
errors.add(error);
},
);

// Fill queue to capacity
for (var i = 0; i < 128; i++) {
handler.enqueue(MockWebSocketMessage('msg_$i'));
}

// Remove messages up to msg_64
handler.removeMessagesUpToCurrent('msg_64');

// Add new messages to fill the queue again
for (var i = 128; i < 192; i++) {
handler.enqueue(MockWebSocketMessage('msg_$i'));
}

expect(handler.queueSize, lessThanOrEqualTo(128),
reason: 'Queue size should not exceed maximum after removal and refill');

await handler.waitForEmpty();

// Verify that earlier messages were properly removed
for (var i = 0; i <= 64; i++) {
expect(processedMessages.contains('msg_$i'), isFalse,
reason: 'Message msg_$i should have been removed');
}
});
});

group('Concurrent Operations', () {
late WebSocketQueueHandler handler;
late List<dynamic> errors;

setUp(() {
handler = createHandler(
Expand Down Expand Up @@ -107,6 +222,169 @@ void main() {

expect(processedMessages, containsAllInOrder(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']));
});

test('Should handle concurrent enqueueing while processing is blocked', () async {
final processingCompleter = Completer<void>();
final processedIds = <String>[];
errors = [];
var processingStarted = Completer<void>();

// Create handler with a processing delay to simulate long-running task
handler = WebSocketQueueHandler(
processMessageCallback: (message) async {
if (!processingStarted.isCompleted) {
processingStarted.complete();
}
await processingCompleter.future; // Block processing
processedIds.add(message.id);
},
onErrorCallback: (error, stackTrace) {
errors.add(error);
},
);

// Enqueue first message to start processing
handler.enqueue(MockWebSocketMessage('initial_msg'));

// Wait for processing to start
await processingStarted.future;

// Concurrently enqueue messages while first message is still processing
await Future.wait(
List.generate(150, (i) => Future(() {
handler.enqueue(MockWebSocketMessage('concurrent_$i'));
}))
);

// Verify queue size is capped at max while processing is blocked
expect(handler.queueSize, lessThanOrEqualTo(128));

// Allow processing to continue
processingCompleter.complete();

await handler.waitForEmpty();
// Verify process order and dropped messages
expect(processedIds[0], equals('initial_msg'));
expect(processedIds.length, lessThanOrEqualTo(129));
expect(handler.isMessageProcessed('concurrent_149'), isTrue);
});

test('Should handle rapid enqueueing during active processing', () async {
final processedIds = <String>[];
final processingStarted = Completer<void>();
final batchProcessing = Completer<void>();
errors = [];

// Create handler with controlled processing delays
handler = WebSocketQueueHandler(
processMessageCallback: (message) async {
if (!processingStarted.isCompleted) {
processingStarted.complete();
await batchProcessing.future;
}
processedIds.add(message.id);
},
onErrorCallback: (error, stackTrace) {
errors.add(error);
},
);

// Start with initial batch
for (var i = 0; i < 50; i++) {
handler.enqueue(MockWebSocketMessage('batch1_$i'));
}

// Wait for the first message to start processing
await processingStarted.future;

// Add second batch while first batch is blocked
for (var i = 0; i < 50; i++) {
handler.enqueue(MockWebSocketMessage('batch2_$i'));
}

// Add third batch immediately
for (var i = 0; i < 50; i++) {
handler.enqueue(MockWebSocketMessage('batch3_$i'));
}

// Allow processing to continue
batchProcessing.complete();

// Wait for queue to be empty
await handler.waitForEmpty();

// Verify results
expect(processedIds.length, lessThanOrEqualTo(129),
reason: 'Total processed messages should not exceed queue capacity');

// Check if we have messages from the latest batch
final lastBatchCount = processedIds
.where((id) => id.startsWith('batch3_'))
.length;
expect(lastBatchCount, greaterThan(0),
reason: 'Should have processed some messages from the latest batch');

// Verify that some early messages were dropped
final firstBatchCount = processedIds
.where((id) => id.startsWith('batch1_'))
.length;
expect(firstBatchCount, lessThan(50),
reason: 'Some messages from first batch should have been dropped');
});

test('Should handle concurrent removeMessagesUpToCurrent during processing', () async {
final processedIds = <String>[];
final processingDelay = Completer<void>();
errors = [];

handler = WebSocketQueueHandler(
processMessageCallback: (message) async {
await processingDelay.future;
processedIds.add(message.id);
},
onErrorCallback: (error, stackTrace) {
errors.add(error);
},
);

// Fill queue
for (var i = 0; i < 128; i++) {
handler.enqueue(MockWebSocketMessage('msg_$i'));
}

// Start concurrent operations
final futures = <Future>[];

// Add new messages
futures.add(Future(() async {
for (var i = 128; i < 256; i++) {
handler.enqueue(MockWebSocketMessage('msg_$i'));
await Future.delayed(const Duration(microseconds: 100));
}
}));

// Concurrently remove messages
futures.add(Future(() async {
await Future.delayed(const Duration(milliseconds: 10));
handler.removeMessagesUpToCurrent('msg_64');
}));

// Allow processing to continue after concurrent operations
await Future.delayed(const Duration(milliseconds: 50));
processingDelay.complete();

await Future.wait(futures);
await handler.waitForEmpty();

expect(processedIds.length, lessThanOrEqualTo(192));

// Verify that messages after removal point were processed
for (var id in processedIds.skip(1)) {
final messageNumber = int.parse(id.split('_')[1]);
expect(messageNumber, greaterThan(64),
reason: 'Only messages after msg_64 should be processed');
}
});
});

group('Error Handling', () {
Expand Down Expand Up @@ -179,6 +457,7 @@ void main() {

expect(processedMessages.length, burstSize);
expect(processedMessages, List.generate(burstSize, (i) => '$i'));
expect(handler.queueSize, equals(0));
});

test('Should handle interleaved slow and fast messages', () async {
Expand Down

0 comments on commit 4082c67

Please sign in to comment.