Skip to content

Commit

Permalink
Adding test case for stream isolation in event definition evaluation. (
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisoelkers authored Aug 12, 2024
1 parent 9d2355c commit 1d05c0c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import org.graylog.testing.completebackend.Lifecycle;
import org.graylog.testing.completebackend.WebhookRequest;
import org.graylog.testing.completebackend.WebhookServerInstance;
import org.graylog.testing.completebackend.apis.DefaultStreamMatches;
import org.graylog.testing.completebackend.apis.GraylogApis;
import org.graylog.testing.completebackend.apis.Streams;
import org.graylog.testing.containermatrix.SearchServer;
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTest;
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTestsConfiguration;
import org.graylog.testing.utils.GelfInputUtils;
import org.graylog2.plugin.streams.StreamRuleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID;

@ContainerMatrixTestsConfiguration(serverLifecycle = Lifecycle.CLASS, searchVersions = {SearchServer.ES7, SearchServer.OS2_LATEST, SearchServer.DATANODE_DEV}, withWebhookServerEnabled = true)
public class PivotAggregationSearchIT {

Expand Down Expand Up @@ -60,14 +64,7 @@ void testPivotAggregationSearchAllKnownFields() throws ExecutionException, Retry

postMessages();

final List<WebhookRequest> requests = webhookTester.waitForRequests((req) -> req.bodyAsJsonPath().read("event_definition_id").equals(eventDefinitionID));

Assertions.assertThat(requests)
.isNotEmpty()
.allSatisfy(req -> {
final String message = req.bodyAsJsonPath().read("event.message");
Assertions.assertThat(message).isEqualTo("my alert def: 200|ssh - count()=3.0");
});
waitForWebHook(eventDefinitionID, "my alert def: 200|ssh - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
Expand All @@ -87,14 +84,7 @@ void testPivotAggregationSearchOneUnknownField() throws ExecutionException, Retr

postMessages();

final List<WebhookRequest> requests = webhookTester.waitForRequests((req) -> req.bodyAsJsonPath().read("event_definition_id").equals(eventDefinitionID));

Assertions.assertThat(requests)
.isNotEmpty()
.allSatisfy(req -> {
final String message = req.bodyAsJsonPath().read("event.message");
Assertions.assertThat(message).isEqualTo("my alert def: 200|(Empty Value)|ssh - count()=3.0");
});
waitForWebHook(eventDefinitionID, "my alert def: 200|(Empty Value)|ssh - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
Expand All @@ -114,23 +104,52 @@ void testPivotAggregationSearchAllUnknownFields() throws ExecutionException, Ret

postMessages();

waitForWebHook(eventDefinitionID, "my alert def: (Empty Value)|(Empty Value)|(Empty Value) - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

@ContainerMatrixTest
void testPivotAggregationIsolatedToStream() throws ExecutionException, RetryException {
graylogApis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = graylogApis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

final String defaultStreamIndexSetId = graylogApis.streams().getStream(DEFAULT_STREAM_ID).extract().path("index_set_id");
final var streamId = graylogApis.streams().createStream(
"Stream for testing event definition isolation",
defaultStreamIndexSetId,
true,
DefaultStreamMatches.KEEP,
new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "stream_isolation_test", "facility", true)
);

final String eventDefinitionID = graylogApis.eventDefinitions().createEventDefinition(notificationID, List.of("http_response_code"), List.of(streamId));

postMessagesToOtherStream();
postMessages();

waitForWebHook(eventDefinitionID, "my alert def: 200 - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

private void waitForWebHook(String eventDefinitionID, String eventMessage) throws ExecutionException, RetryException {
try {
final List<WebhookRequest> requests = webhookTester.waitForRequests((req) -> req.bodyAsJsonPath().read("event_definition_id").equals(eventDefinitionID));
Assertions.assertThat(requests)
.isNotEmpty()
.allSatisfy(req -> {
final String message = req.bodyAsJsonPath().read("event.message");
Assertions.assertThat(message).isEqualTo("my alert def: (Empty Value)|(Empty Value)|(Empty Value) - count()=3.0");
Assertions.assertThat(message).isEqualTo(eventMessage);
});

} catch (ExecutionException | RetryException e) {
LOG.error(this.graylogApis.backend().getLogs());
throw e;
}


graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

private void postMessages() {
Expand Down Expand Up @@ -164,4 +183,19 @@ private void postMessages() {
}""");
graylogApis.search().waitForMessagesCount(3);
}

private void postMessagesToOtherStream() {
graylogApis.gelf().createGelfHttpInput()
.postMessage("""
{
"short_message":"pivot-aggregation-search-test-1",
"host":"example.org",
"type":"ssh",
"source":"example.org",
"http_response_code":200,
"resource": "posts",
"facility": "stream_isolation_test"
}""");
graylogApis.search().waitForMessagesCount(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.restassured.response.ValidatableResponse;

import java.net.URI;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
Expand All @@ -35,6 +34,10 @@ public void deleteDefinition(String notificationID) {
}

public String createEventDefinition(String httpNotificationID, List<String> groupByFields) {
return createEventDefinition(httpNotificationID, groupByFields, List.of());
}

public String createEventDefinition(String httpNotificationID, List<String> groupByFields, List<String> streams) {
final String body = """
{
"title": "my alert def",
Expand All @@ -43,7 +46,7 @@ public String createEventDefinition(String httpNotificationID, List<String> grou
"config": {
"query": "",
"query_parameters": [],
"streams": [],
"streams": [%s],
"search_within_ms": 5000,
"execute_every_ms": 5000,
"event_limit": 100,
Expand Down Expand Up @@ -82,8 +85,9 @@ public String createEventDefinition(String httpNotificationID, List<String> grou
}
""";

final var streamsList = streams.stream().map(stream -> "\"" + stream + "\"").collect(Collectors.joining(","));
final String groupByClause = groupByFields.stream().map(f -> "\"" + f + "\"").collect(Collectors.joining(","));
final String req = String.format(Locale.ROOT, body, groupByClause, httpNotificationID);
final String req = String.format(Locale.ROOT, body, streamsList, groupByClause, httpNotificationID);

final ValidatableResponse response = api.post("/events/definitions", req, 200);
return response.extract().body().jsonPath().getString("id");
Expand Down

0 comments on commit 1d05c0c

Please sign in to comment.