Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional unit tests - ChangeStreams and ConfigManager #63

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 175 additions & 24 deletions src/test/java/com/gravity9/mongocse/ChangeStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;

class ChangeStreamTest extends AbstractMongoDbBase {

Expand Down Expand Up @@ -304,9 +303,9 @@ void givenConfigurationWithMatchStageWithSingleCondition_shouldAcceptOnlyMatchin
@Test
void givenConfigurationWithMatchStageWithMultipleConditions_shouldAcceptOnlyMatchingEvents() throws InterruptedException {
var match = Filters.and(
Filters.gt("fullDocument.testValue", 0),
Filters.lt("fullDocument.testValue", 2),
Filters.in("operationType", List.of("insert"))
Filters.gt("fullDocument.testValue", 0),
Filters.lt("fullDocument.testValue", 2),
Filters.in("operationType", List.of("insert"))
);
var config = buildMongoConfig(match);

Expand Down Expand Up @@ -346,16 +345,16 @@ void givenConfigurationWithMatchStageWithIdFilter_shouldAcceptOnlyMatchingEvents
void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatchingEvents() throws InterruptedException {
var match = Filters.in("fullDocument.testValue", List.of(0, 2));
var config = MongoConfig.builder()
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.match(match)
.keyName("testId")
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName())
.numberOfPartitions(3)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.build();
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.match(match)
.keyName("testId")
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName())
.numberOfPartitions(3)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.build();

MongoCseManager manager = new MongoCseManager(config);
TestChangeStreamListener listener0 = new TestChangeStreamListener();
Expand Down Expand Up @@ -384,17 +383,159 @@ void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatc
assertEquals(2, events2.get(0).getFullDocument().getInteger("testValue"));
}

@Test
void givenDuplicateListener_shouldReceiveEventsIndependently() throws InterruptedException {
MongoCseManager manager = new MongoCseManager(mongoConfig);

// Create the primary listener and duplicate listener.
TestChangeStreamListener primaryListener = new TestChangeStreamListener();
TestChangeStreamListener duplicateListener = new TestChangeStreamListener();

// Register the primary listener for partition 0 and duplicate for the same partition.
manager.registerListener(primaryListener, List.of(0));
manager.registerListener(duplicateListener, List.of(0));

// Start the manager and allow time for listeners to initialize.
manager.start();
Thread.sleep(1000);

// Insert a document into the collection, triggering an event for partition 0.
Document testDoc0 = new Document(Map.of(
"_id", new ObjectId(TestIds.MOD_0_ID),
"testValue", 0
));
collection.insertOne(testDoc0);

// Wait for CDC events to be picked up by the listeners.
Thread.sleep(500);

// Retrieve events for both listeners.
List<ChangeStreamDocument<Document>> primaryEvents = primaryListener.getEvents();
List<ChangeStreamDocument<Document>> duplicateEvents = duplicateListener.getEvents();

// Assert that both listeners receive the event independently.
assertEquals(1, primaryEvents.size(), "Primary listener should receive 1 event.");
assertEquals(1, duplicateEvents.size(), "Duplicate listener should receive 1 event.");

// Verify that both listeners received the same event data.
assertNotEquals(primaryEvents.get(0).getFullDocument(), null);
assertEquals(0, primaryEvents.get(0).getFullDocument().getInteger("testValue"));
assertEquals(0, duplicateEvents.get(0).getFullDocument().getInteger("testValue"));
}

@Test
void givenAfterAllListenersAreDeregistered_shouldStopProcessingEvents() throws InterruptedException {
// Arrange: Create a MongoCseManager and register two listeners for partition 0 and 1
MongoCseManager manager = new MongoCseManager(mongoConfig);

TestChangeStreamListener listener0 = new TestChangeStreamListener();
TestChangeStreamListener listener1 = new TestChangeStreamListener();
manager.registerListener(listener0, List.of(0));
manager.registerListener(listener1, List.of(1));
manager.start();

// Insert a document to trigger an event for both listeners
insertDocumentsToAllPartitions();
Thread.sleep(500);

// Verify that listeners received initial events
assertEquals(1, listener0.getEvents().size(), "Listener 0 should receive 1 event.");
assertEquals(1, listener1.getEvents().size(), "Listener 1 should receive 1 event.");

// Act: Deregister both listeners from all partitions
manager.deregisterListenerFromAllPartitions(listener0);
manager.deregisterListenerFromAllPartitions(listener1);

// Insert another documents to test if deregistered listeners receive any events
insertMultipleDocumentsToAllPartitions(5);
Thread.sleep(500);

// Assert: Verify that no new events are received by either listener
assertEquals(1, listener0.getEvents().size(), "Listener 0 should not receive any new events.");
assertEquals(1, listener1.getEvents().size(), "Listener 1 should not receive any new events.");
}

@Test
void givenListenerIsDeregistered_shouldNotHandleEventsFromSpecificPartition() throws InterruptedException {
// Given: Create a MongoCseManager with a configuration
MongoCseManager manager = new MongoCseManager(mongoConfig);

// Register a single listener to all partitions (0, 1, and 2)
TestChangeStreamListener listener = new TestChangeStreamListener();
manager.registerListenerToAllPartitions(listener);
manager.start();

// Insert documents into all partitions
var numberOfInsertedDocuments = insertDocumentsToAllPartitions();

// Wait for CDC events to be picked up
Thread.sleep(500);

// Verify that the listener received all events for all partitions
List<ChangeStreamDocument<Document>> initialEvents = listener.getEvents();
assertEquals(numberOfInsertedDocuments, initialEvents.size());

// When: Deregister the listener from partition 1 only
manager.deregisterListener(listener, List.of(1));

// Perform a new operation on each partition
var deleteResult0 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_0_ID)));
var deleteResult1 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_1_ID)));
var deleteResult2 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_2_ID)));

assertEquals(1, deleteResult0.getDeletedCount());
assertEquals(1, deleteResult1.getDeletedCount());
assertEquals(1, deleteResult2.getDeletedCount());

// Wait for CDC events to be picked up
Thread.sleep(500);

// Then: Only events from partition 0 and 2 should be handled (since partition 1 was deregistered)
var expectedNewEventCount = 2; // 1 delete event from partition 0 and 1 delete event from partition 2
var totalExpectedEvents = numberOfInsertedDocuments + expectedNewEventCount;

List<ChangeStreamDocument<Document>> finalEvents = listener.getEvents();
assertEquals(totalExpectedEvents, finalEvents.size());
}

@Test
void givenListenerOnSpecificPartition_shouldNotReceiveEventsFromOtherPartitions() throws InterruptedException {
MongoCseManager manager = new MongoCseManager(mongoConfig);

// Create a listener and register it only for partition 1.
TestChangeStreamListener partition1Listener = new TestChangeStreamListener();
manager.registerListener(partition1Listener, List.of(1));

// Start the manager and allow time for listeners to initialize.
manager.start();
Thread.sleep(1000);

// Insert a document in partition 0, which should not trigger an event for partition 1.
Document testDocPartition0 = new Document(Map.of(
"_id", new ObjectId(TestIds.MOD_0_ID),
"testValue", 0
));
collection.insertOne(testDocPartition0);

// Wait for CDC events to be picked up by the listener.
Thread.sleep(500);

// Verify that partition1Listener did not receive any events.
List<ChangeStreamDocument<Document>> partition1Events = partition1Listener.getEvents();
assertEquals(0, partition1Events.size(), "Listener registered for partition 1 should not receive events from partition 0.");
}

private MongoConfig buildMongoConfig(Bson match) {
return MongoConfig.builder()
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.match(match)
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName())
.numberOfPartitions(3)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.build();
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.match(match)
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName())
.numberOfPartitions(3)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.build();
}

private int insertDocumentsToAllPartitions() {
Expand Down Expand Up @@ -433,6 +574,16 @@ private int insertDocumentsWithTestIdToAllPartitions() {
return result.getInsertedIds().size();
}

private void insertMultipleDocumentsToAllPartitions(int numberOfDocuments) {
for (int i = 0; i < numberOfDocuments; i++) {
Document testDoc = new Document(Map.of(
"testId", new ObjectId(TestIds.MOD_3_ID),
"testValue", i
));
collection.insertOne(testDoc);
}
}

private void assertEventsContainNumberOfOpTypes(List<ChangeStreamDocument<Document>> events, OperationType operationType, long expectedCount) {
long actualCount = events.stream().filter(op -> op.getOperationType() == operationType).count();
assertEquals(expectedCount, actualCount);
Expand Down
115 changes: 76 additions & 39 deletions src/test/java/com/gravity9/mongocse/ConfigManagerTest.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,88 @@
package com.gravity9.mongocse;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ConfigManagerTest extends AbstractMongoDbBase {

private MongoConfig.MongoConfigBuilder mongoConfigBuilder;

@BeforeEach
public void setup() {
super.setup();
mongoConfigBuilder = new MongoConfig.MongoConfigBuilder()
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName());
}

@Test
void givenSameConfiguration_shouldNotThrowException() {
int partitions = 3;
var mongoConfig = mongoConfigBuilder
.numberOfPartitions(partitions)
.build();
new MongoCseManager(mongoConfig);
assertDoesNotThrow(() -> new MongoCseManager(mongoConfig));

WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions);
assertEquals(getTestCollectionName(), config.getCollection());
assertEquals(partitions, config.getPartitions());
}

@Test
void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() {
var firstConfig = mongoConfigBuilder
.numberOfPartitions(3)
.build();
var secondConfig = mongoConfigBuilder
.numberOfPartitions(1)
.build();
new MongoCseManager(firstConfig);
assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig));
}
private MongoConfig.MongoConfigBuilder mongoConfigBuilder;

@BeforeEach
public void setup() {
super.setup();
mongoConfigBuilder = new MongoConfig.MongoConfigBuilder()
.connectionUri(getConnectionUri())
.databaseName(getDatabaseName())
.collectionName(getTestCollectionName())
.workerConfigCollectionName(getWorkerConfigCollectionName())
.clusterConfigCollectionName(getClusterConfigCollectionName());
}

@Test
void givenSameConfiguration_shouldNotThrowException() {
int partitions = 3;
var mongoConfig = mongoConfigBuilder
.numberOfPartitions(partitions)
.build();
new MongoCseManager(mongoConfig);
assertDoesNotThrow(() -> new MongoCseManager(mongoConfig));

WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions);
assertEquals(getTestCollectionName(), config.getCollection());
assertEquals(partitions, config.getPartitions());
}

@Test
void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() {
var firstConfig = mongoConfigBuilder
.numberOfPartitions(3)
.build();
var secondConfig = mongoConfigBuilder
.numberOfPartitions(1)
.build();
new MongoCseManager(firstConfig);
assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig));
}

@Test
void givenNullConfig_shouldThrowNullPointerException() {
assertThrows(NullPointerException.class, () -> new MongoCseManager(null));
}

@Test
void givenWrongDatabaseName_shouldThrowException() {
var mongoConfig = mongoConfigBuilder
.databaseName("invalid_db_name")
.build();
assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig));
}

@Test
void givenInvalidConnectionUri_shouldThrowException() {
var mongoConfig = mongoConfigBuilder
.connectionUri("invalid_uri")
.build();
assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig));
}

@Test
void givenInvalidCollectionName_shouldThrowException() {
var mongoConfig = mongoConfigBuilder
.collectionName("invalid_collection_name")
.build();
assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig));
}

@Test
void givenSameConfigTwice_shouldCreateSeparateInstances() {
var mongoConfig = mongoConfigBuilder.numberOfPartitions(2).build();
MongoCseManager firstInstance = new MongoCseManager(mongoConfig);
MongoCseManager secondInstance = new MongoCseManager(mongoConfig);
assertNotSame(firstInstance, secondInstance);
}
}
8 changes: 5 additions & 3 deletions src/test/java/com/gravity9/mongocse/constants/TestIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

public class TestIds {

public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7";
public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7";

public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843";
public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843";

public static final String MOD_2_ID = "652ea13969adc932efb550d4";
public static final String MOD_2_ID = "652ea13969adc932efb550d4";

public static final String MOD_3_ID = "000e9fdc597d12ddbf738000";
}