diff --git a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java index 45dd396..f72218c 100644 --- a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java +++ b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java @@ -22,8 +22,7 @@ import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; class ChangeStreamTest extends AbstractMongoDbBase { @@ -304,9 +303,9 @@ void givenConfigurationWithMatchStageWithSingleCondition_shouldAcceptOnlyMatchin @Test void givenConfigurationWithMatchStageWithMultipleConditions_shouldAcceptOnlyMatchingEvents() throws InterruptedException { var match = Filters.and( - Filters.gt("fullDocument.testValue", 0), - Filters.lt("fullDocument.testValue", 2), - Filters.in("operationType", List.of("insert")) + Filters.gt("fullDocument.testValue", 0), + Filters.lt("fullDocument.testValue", 2), + Filters.in("operationType", List.of("insert")) ); var config = buildMongoConfig(match); @@ -346,16 +345,16 @@ void givenConfigurationWithMatchStageWithIdFilter_shouldAcceptOnlyMatchingEvents void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatchingEvents() throws InterruptedException { var match = Filters.in("fullDocument.testValue", List.of(0, 2)); var config = MongoConfig.builder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .match(match) - .keyName("testId") - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()) - .numberOfPartitions(3) - .fullDocument(FullDocument.UPDATE_LOOKUP) - .build(); + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .match(match) + .keyName("testId") + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()) + .numberOfPartitions(3) + .fullDocument(FullDocument.UPDATE_LOOKUP) + .build(); MongoCseManager manager = new MongoCseManager(config); TestChangeStreamListener listener0 = new TestChangeStreamListener(); @@ -384,17 +383,159 @@ void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatc assertEquals(2, events2.get(0).getFullDocument().getInteger("testValue")); } + @Test + void givenDuplicateListener_shouldReceiveEventsIndependently() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create the primary listener and duplicate listener. + TestChangeStreamListener primaryListener = new TestChangeStreamListener(); + TestChangeStreamListener duplicateListener = new TestChangeStreamListener(); + + // Register the primary listener for partition 0 and duplicate for the same partition. + manager.registerListener(primaryListener, List.of(0)); + manager.registerListener(duplicateListener, List.of(0)); + + // Start the manager and allow time for listeners to initialize. + manager.start(); + Thread.sleep(1000); + + // Insert a document into the collection, triggering an event for partition 0. + Document testDoc0 = new Document(Map.of( + "_id", new ObjectId(TestIds.MOD_0_ID), + "testValue", 0 + )); + collection.insertOne(testDoc0); + + // Wait for CDC events to be picked up by the listeners. + Thread.sleep(500); + + // Retrieve events for both listeners. + List> primaryEvents = primaryListener.getEvents(); + List> duplicateEvents = duplicateListener.getEvents(); + + // Assert that both listeners receive the event independently. + assertEquals(1, primaryEvents.size(), "Primary listener should receive 1 event."); + assertEquals(1, duplicateEvents.size(), "Duplicate listener should receive 1 event."); + + // Verify that both listeners received the same event data. + assertNotEquals(primaryEvents.get(0).getFullDocument(), null); + assertEquals(0, primaryEvents.get(0).getFullDocument().getInteger("testValue")); + assertEquals(0, duplicateEvents.get(0).getFullDocument().getInteger("testValue")); + } + + @Test + void givenAfterAllListenersAreDeregistered_shouldStopProcessingEvents() throws InterruptedException { + // Arrange: Create a MongoCseManager and register two listeners for partition 0 and 1 + MongoCseManager manager = new MongoCseManager(mongoConfig); + + TestChangeStreamListener listener0 = new TestChangeStreamListener(); + TestChangeStreamListener listener1 = new TestChangeStreamListener(); + manager.registerListener(listener0, List.of(0)); + manager.registerListener(listener1, List.of(1)); + manager.start(); + + // Insert a document to trigger an event for both listeners + insertDocumentsToAllPartitions(); + Thread.sleep(500); + + // Verify that listeners received initial events + assertEquals(1, listener0.getEvents().size(), "Listener 0 should receive 1 event."); + assertEquals(1, listener1.getEvents().size(), "Listener 1 should receive 1 event."); + + // Act: Deregister both listeners from all partitions + manager.deregisterListenerFromAllPartitions(listener0); + manager.deregisterListenerFromAllPartitions(listener1); + + // Insert another documents to test if deregistered listeners receive any events + insertMultipleDocumentsToAllPartitions(5); + Thread.sleep(500); + + // Assert: Verify that no new events are received by either listener + assertEquals(1, listener0.getEvents().size(), "Listener 0 should not receive any new events."); + assertEquals(1, listener1.getEvents().size(), "Listener 1 should not receive any new events."); + } + + @Test + void givenListenerIsDeregistered_shouldNotHandleEventsFromSpecificPartition() throws InterruptedException { + // Given: Create a MongoCseManager with a configuration + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Register a single listener to all partitions (0, 1, and 2) + TestChangeStreamListener listener = new TestChangeStreamListener(); + manager.registerListenerToAllPartitions(listener); + manager.start(); + + // Insert documents into all partitions + var numberOfInsertedDocuments = insertDocumentsToAllPartitions(); + + // Wait for CDC events to be picked up + Thread.sleep(500); + + // Verify that the listener received all events for all partitions + List> initialEvents = listener.getEvents(); + assertEquals(numberOfInsertedDocuments, initialEvents.size()); + + // When: Deregister the listener from partition 1 only + manager.deregisterListener(listener, List.of(1)); + + // Perform a new operation on each partition + var deleteResult0 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_0_ID))); + var deleteResult1 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_1_ID))); + var deleteResult2 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_2_ID))); + + assertEquals(1, deleteResult0.getDeletedCount()); + assertEquals(1, deleteResult1.getDeletedCount()); + assertEquals(1, deleteResult2.getDeletedCount()); + + // Wait for CDC events to be picked up + Thread.sleep(500); + + // Then: Only events from partition 0 and 2 should be handled (since partition 1 was deregistered) + var expectedNewEventCount = 2; // 1 delete event from partition 0 and 1 delete event from partition 2 + var totalExpectedEvents = numberOfInsertedDocuments + expectedNewEventCount; + + List> finalEvents = listener.getEvents(); + assertEquals(totalExpectedEvents, finalEvents.size()); + } + + @Test + void givenListenerOnSpecificPartition_shouldNotReceiveEventsFromOtherPartitions() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create a listener and register it only for partition 1. + TestChangeStreamListener partition1Listener = new TestChangeStreamListener(); + manager.registerListener(partition1Listener, List.of(1)); + + // Start the manager and allow time for listeners to initialize. + manager.start(); + Thread.sleep(1000); + + // Insert a document in partition 0, which should not trigger an event for partition 1. + Document testDocPartition0 = new Document(Map.of( + "_id", new ObjectId(TestIds.MOD_0_ID), + "testValue", 0 + )); + collection.insertOne(testDocPartition0); + + // Wait for CDC events to be picked up by the listener. + Thread.sleep(500); + + // Verify that partition1Listener did not receive any events. + List> partition1Events = partition1Listener.getEvents(); + assertEquals(0, partition1Events.size(), "Listener registered for partition 1 should not receive events from partition 0."); + } + private MongoConfig buildMongoConfig(Bson match) { return MongoConfig.builder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .match(match) - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()) - .numberOfPartitions(3) - .fullDocument(FullDocument.UPDATE_LOOKUP) - .build(); + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .match(match) + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()) + .numberOfPartitions(3) + .fullDocument(FullDocument.UPDATE_LOOKUP) + .build(); } private int insertDocumentsToAllPartitions() { @@ -433,6 +574,16 @@ private int insertDocumentsWithTestIdToAllPartitions() { return result.getInsertedIds().size(); } + private void insertMultipleDocumentsToAllPartitions(int numberOfDocuments) { + for (int i = 0; i < numberOfDocuments; i++) { + Document testDoc = new Document(Map.of( + "testId", new ObjectId(TestIds.MOD_3_ID), + "testValue", i + )); + collection.insertOne(testDoc); + } + } + private void assertEventsContainNumberOfOpTypes(List> events, OperationType operationType, long expectedCount) { long actualCount = events.stream().filter(op -> op.getOperationType() == operationType).count(); assertEquals(expectedCount, actualCount); diff --git a/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java b/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java index a88b906..cecfbf3 100644 --- a/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java +++ b/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java @@ -1,51 +1,88 @@ package com.gravity9.mongocse; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertThrows; public class ConfigManagerTest extends AbstractMongoDbBase { - private MongoConfig.MongoConfigBuilder mongoConfigBuilder; - - @BeforeEach - public void setup() { - super.setup(); - mongoConfigBuilder = new MongoConfig.MongoConfigBuilder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()); - } - - @Test - void givenSameConfiguration_shouldNotThrowException() { - int partitions = 3; - var mongoConfig = mongoConfigBuilder - .numberOfPartitions(partitions) - .build(); - new MongoCseManager(mongoConfig); - assertDoesNotThrow(() -> new MongoCseManager(mongoConfig)); - - WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions); - assertEquals(getTestCollectionName(), config.getCollection()); - assertEquals(partitions, config.getPartitions()); - } - - @Test - void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() { - var firstConfig = mongoConfigBuilder - .numberOfPartitions(3) - .build(); - var secondConfig = mongoConfigBuilder - .numberOfPartitions(1) - .build(); - new MongoCseManager(firstConfig); - assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig)); - } + private MongoConfig.MongoConfigBuilder mongoConfigBuilder; + + @BeforeEach + public void setup() { + super.setup(); + mongoConfigBuilder = new MongoConfig.MongoConfigBuilder() + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()); + } + + @Test + void givenSameConfiguration_shouldNotThrowException() { + int partitions = 3; + var mongoConfig = mongoConfigBuilder + .numberOfPartitions(partitions) + .build(); + new MongoCseManager(mongoConfig); + assertDoesNotThrow(() -> new MongoCseManager(mongoConfig)); + + WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions); + assertEquals(getTestCollectionName(), config.getCollection()); + assertEquals(partitions, config.getPartitions()); + } + + @Test + void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() { + var firstConfig = mongoConfigBuilder + .numberOfPartitions(3) + .build(); + var secondConfig = mongoConfigBuilder + .numberOfPartitions(1) + .build(); + new MongoCseManager(firstConfig); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig)); + } + + @Test + void givenNullConfig_shouldThrowNullPointerException() { + assertThrows(NullPointerException.class, () -> new MongoCseManager(null)); + } + + @Test + void givenWrongDatabaseName_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .databaseName("invalid_db_name") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenInvalidConnectionUri_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .connectionUri("invalid_uri") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenInvalidCollectionName_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .collectionName("invalid_collection_name") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenSameConfigTwice_shouldCreateSeparateInstances() { + var mongoConfig = mongoConfigBuilder.numberOfPartitions(2).build(); + MongoCseManager firstInstance = new MongoCseManager(mongoConfig); + MongoCseManager secondInstance = new MongoCseManager(mongoConfig); + assertNotSame(firstInstance, secondInstance); + } } diff --git a/src/test/java/com/gravity9/mongocse/constants/TestIds.java b/src/test/java/com/gravity9/mongocse/constants/TestIds.java index ed480e0..5e2a03d 100644 --- a/src/test/java/com/gravity9/mongocse/constants/TestIds.java +++ b/src/test/java/com/gravity9/mongocse/constants/TestIds.java @@ -2,9 +2,11 @@ public class TestIds { - public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7"; + public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7"; - public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843"; + public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843"; - public static final String MOD_2_ID = "652ea13969adc932efb550d4"; + public static final String MOD_2_ID = "652ea13969adc932efb550d4"; + + public static final String MOD_3_ID = "000e9fdc597d12ddbf738000"; }