Skip to content

Commit

Permalink
[0.10.0-SNAPSHOT]
Browse files Browse the repository at this point in the history
AutoClosable signature simplified
KafkaConsumer tests close with resources added
KafkaConnectionImpl reuse consumers if possible
JdbcConnection leak detection threshold increase
  • Loading branch information
GoodforGod committed Apr 14, 2024
1 parent 547d61f commit 7f2b713
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ <T, E extends Throwable> List<T> queryMany(@NotNull @Language("CQL") String cql,
*/
boolean checkQueriesEquals(int expected, @NotNull @Language("CQL") String cql);

@Override
void close();

static CassandraConnection forContainer(CassandraContainer<?> container) {
if (!container.isRunning()) {
throw new IllegalStateException(container.getClass().getSimpleName() + " container is not running");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ void stop() {
}

@Override
public void close() throws Exception {
public void close() {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ <T, E extends Throwable> List<T> queryMany(@NotNull @Language("SQL") String sql,
*/
boolean checkDeleted(@Language("SQL") String sql);

@Override
void close();

static JdbcConnection forParams(String driverProtocol,
String host,
int port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ DataSource dataSource() {
hikariConfig.setMinimumIdle(1);
hikariConfig.setMaximumPoolSize(25);
hikariConfig.setPoolName("jdbc-connection");
hikariConfig.setLeakDetectionThreshold(2000);
hikariConfig.setLeakDetectionThreshold(10000);
this.dataSource = new HikariDataSource(hikariConfig);
}

Expand Down Expand Up @@ -454,7 +454,7 @@ void stop() {
}

@Override
public void close() throws Exception {
public void close() {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ default boolean checkReceivedAtLeast(int expectedAtLeast) {
* @return true if received exactly N events during specified time frame or false
*/
boolean checkReceivedEqualsInTime(int expected, @NotNull Duration timeToWait);

@Override
void close();
}

@NotNull
Expand Down Expand Up @@ -231,4 +234,7 @@ static KafkaConnection forBootstrapServers(@NotNull String bootstrapServers) {
static KafkaConnection forProperties(@NotNull Properties properties) {
return new KafkaConnectionClosableImpl(properties, null);
}

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String toString() {
private volatile KafkaProducer<byte[], byte[]> producer;
private volatile Admin admin;

private final List<ConsumerImpl> consumers = new CopyOnWriteArrayList<>();
private final Map<String, ConsumerImpl> consumerByTopic = new ConcurrentHashMap<>();
private final ParamsImpl params;
@Nullable
private final ParamsImpl paramsInNetwork;
Expand Down Expand Up @@ -320,8 +320,12 @@ public void reset() {
messageQueue.clear();
}

boolean isClosed() {
return !isActive.get();
}

@Override
public void close() throws Exception {
public void close() {
stop();
}

Expand Down Expand Up @@ -452,10 +456,23 @@ public void send(@NotNull String topic, @NotNull List<Event> events) {
.collect(Collectors.toSet());

final String id = UUID.randomUUID().toString().substring(0, 8);
var kafkaConsumer = getConsumer(id, params.properties());
var consumer = new ConsumerImpl(kafkaConsumer, id, topicPartition);
consumers.add(consumer);
return consumer;
final String consumerTopicKey = topics.stream()
.sorted()
.collect(Collectors.joining(":"));

final ConsumerImpl consumer = consumerByTopic.computeIfAbsent(consumerTopicKey, k -> {
var kafkaConsumer = getConsumer(id, params.properties());
return new ConsumerImpl(kafkaConsumer, id, topicPartition);
});

if (consumer.isClosed()) {
var kafkaConsumer = getConsumer(id, params.properties());
ConsumerImpl activeConsumer = new ConsumerImpl(kafkaConsumer, id, topicPartition);
consumerByTopic.put(consumerTopicKey, activeConsumer);
return activeConsumer;
} else {
return consumer;
}
} catch (Exception e) {
throw new KafkaConnectionException("Can't create KafkaConsumer", e);
}
Expand Down Expand Up @@ -620,14 +637,14 @@ public void dropTopics(@NotNull Set<String> topics) {
}

void clear() {
for (var consumer : consumers) {
for (var consumer : consumerByTopic.values()) {
try {
consumer.stop();
} catch (Exception e) {
// do nothing
}
}
consumers.clear();
consumerByTopic.clear();
}

void stop() {
Expand Down Expand Up @@ -657,7 +674,7 @@ void stop() {
}

@Override
public void close() throws Exception {
public void close() {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,155 +31,170 @@ void admin() throws Exception {
void getReceived() {
// given
var topic = "example";
var consumer = connection.subscribe(topic);

// when
var event = Event.builder()
.withKey("1")
.withValue(new JSONObject().put("name", "bob"))
.withHeader("1", "1")
.withHeader("2", "2")
.build();
connection.send(topic, event);

// then
var received = consumer.getReceived(Duration.ofSeconds(1));
assertTrue(received.isPresent());
assertNotEquals(-1, received.get().offset());
assertNotEquals(-1, received.get().partition());
assertNotEquals(-1, received.get().timestamp());
assertEquals(topic, received.get().topic());
assertNotNull(received.get().datetime());
assertEquals(event.key(), received.get().key());
assertEquals(event.key().toString(), received.get().key().toString());
assertEquals(event.value(), received.get().value());
assertEquals(event.value().toString(), received.get().value().toString());
assertEquals(event.value().asString(), received.get().value().asString());
assertEquals(event.value().asJson().toString(), received.get().value().asJson().toString());
assertEquals(2, event.headers().size());
assertEquals(event.headers(), received.get().headers());
assertEquals(event.headers().get(0), received.get().headers().get(0));
assertEquals(event.headers().get(0).toString(), received.get().headers().get(0).toString());
assertNotNull(received.get().toString());
try (var consumer = connection.subscribe(topic)) {
// when
var event = Event.builder()
.withKey("1")
.withValue(new JSONObject().put("name", "bob"))
.withHeader("1", "1")
.withHeader("2", "2")
.build();
connection.send(topic, event);

// then
var received = consumer.getReceived(Duration.ofSeconds(1));
assertTrue(received.isPresent());
assertNotEquals(-1, received.get().offset());
assertNotEquals(-1, received.get().partition());
assertNotEquals(-1, received.get().timestamp());
assertEquals(topic, received.get().topic());
assertNotNull(received.get().datetime());
assertEquals(event.key(), received.get().key());
assertEquals(event.key().toString(), received.get().key().toString());
assertEquals(event.value(), received.get().value());
assertEquals(event.value().toString(), received.get().value().toString());
assertEquals(event.value().asString(), received.get().value().asString());
assertEquals(event.value().asJson().toString(), received.get().value().asJson().toString());
assertEquals(2, event.headers().size());
assertEquals(event.headers(), received.get().headers());
assertEquals(event.headers().get(0), received.get().headers().get(0));
assertEquals(event.headers().get(0).toString(), received.get().headers().get(0).toString());
assertNotNull(received.get().toString());
}
}

@Test
void getReceivedAtLeast() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
var received = consumer.getReceivedAtLeast(2, Duration.ofSeconds(1));
assertEquals(2, received.size());
assertNotEquals(received.get(0), received.get(1));
assertNotEquals(received.get(0).toString(), received.get(1).toString());
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
var received = consumer.getReceivedAtLeast(2, Duration.ofSeconds(1));
assertEquals(2, received.size());
assertNotEquals(received.get(0), received.get(1));
assertNotEquals(received.get(0).toString(), received.get(1).toString());
}
}

@Test
void assertReceivedNone() {
var topic = "example";
var consumer = connection.subscribe(topic);
consumer.assertReceivedNone(Duration.ofSeconds(1));
try (var consumer = connection.subscribe(topic)) {
consumer.assertReceivedNone(Duration.ofSeconds(1));
}
}

@Test
void assertReceivedNoneThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value"));
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedNone(Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value"));
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedNone(Duration.ofSeconds(1)));
}
}

@Test
void assertReceived() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.builder().withValue("value").withHeader("1", "1").build());
var receivedEvent = consumer.assertReceivedAtLeast(1, Duration.ofSeconds(1));
assertNotNull(receivedEvent.toString());
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.builder().withValue("value").withHeader("1", "1").build());
var receivedEvent = consumer.assertReceivedAtLeast(1, Duration.ofSeconds(1));
assertNotNull(receivedEvent.toString());
}
}

@Test
void assertReceivedThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedAtLeast(1, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedAtLeast(1, Duration.ofSeconds(1)));
}
}

@Test
void assertReceivedAtLeast() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
consumer.assertReceivedAtLeast(2, Duration.ofSeconds(1));
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
consumer.assertReceivedAtLeast(2, Duration.ofSeconds(1));
}
}

@Test
void assertReceivedAtLeastThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedAtLeast(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedAtLeast(2, Duration.ofSeconds(1)));
}
}

@Test
void assertReceivedEquals() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.builder().withValue("value1").withKey("1").build(),
Event.ofValue("value2"));
var receivedEvents = consumer.assertReceivedEqualsInTime(2, Duration.ofSeconds(1));
assertNotEquals(receivedEvents.get(0), receivedEvents.get(1));
assertNotEquals(receivedEvents.get(0).toString(), receivedEvents.get(1).toString());
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.builder().withValue("value1").withKey("1").build(),
Event.ofValue("value2"));
var receivedEvents = consumer.assertReceivedEqualsInTime(2, Duration.ofSeconds(1));
assertNotEquals(receivedEvents.get(0), receivedEvents.get(1));
assertNotEquals(receivedEvents.get(0).toString(), receivedEvents.get(1).toString());
}
}

@Test
void assertReceivedEqualsThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedEqualsInTime(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertThrows(AssertionFailedError.class, () -> consumer.assertReceivedEqualsInTime(2, Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedNone() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertTrue(consumer.checkReceivedNone(Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertTrue(consumer.checkReceivedNone(Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedNoneThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value"));
assertFalse(consumer.checkReceivedNone(Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value"));
assertFalse(consumer.checkReceivedNone(Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedAtLeast() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
assertTrue(consumer.checkReceivedAtLeast(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
assertTrue(consumer.checkReceivedAtLeast(2, Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedAtLeastThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertFalse(consumer.checkReceivedAtLeast(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertFalse(consumer.checkReceivedAtLeast(2, Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedEquals() {
var topic = "example";
var consumer = connection.subscribe(topic);
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
assertTrue(consumer.checkReceivedEqualsInTime(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
connection.send(topic, Event.ofValue("value1"), Event.ofValue("value2"));
assertTrue(consumer.checkReceivedEqualsInTime(2, Duration.ofSeconds(1)));
}
}

@Test
void checkReceivedEqualsThrows() {
var topic = "example";
var consumer = connection.subscribe(topic);
assertFalse(consumer.checkReceivedEqualsInTime(2, Duration.ofSeconds(1)));
try (var consumer = connection.subscribe(topic)) {
assertFalse(consumer.checkReceivedEqualsInTime(2, Duration.ofSeconds(1)));
}
}
}
Loading

0 comments on commit 7f2b713

Please sign in to comment.