Skip to content

Commit

Permalink
Merge branch 'add-stream-category-filter' into add-stream-categories-…
Browse files Browse the repository at this point in the history
…to-dashboard-widgets
  • Loading branch information
kingzacko1 authored Aug 19, 2024
2 parents 1f84f6e + 865a36d commit 10f384a
Show file tree
Hide file tree
Showing 40 changed files with 598 additions and 155 deletions.
29 changes: 17 additions & 12 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ refactoring. Regular users should not change the setting.
compatibility, the default value is still count-based (`500`). Previously configured count-based values are
still supported.

- The legacy mode configuration setting of newly created Kafka based inputs has been changed to false. The default mode
will now use the high level consumer API that has been available since Kafka 1.x.

## Java API Changes

The following Java Code API changes have been made.
Expand All @@ -30,18 +33,20 @@ The following Java Code API changes have been made.

The following REST API changes have been made.

| Endpoint | Description |
|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| `PUT /system/indices/index_set_defaults` | This endpoint now expects an index set template id as payload. The values of the index set template are used as default values. |
| `GET licenses/{licenseId}` | deprecated |
| `GET licenses` | deprecated |
| `GET licenses/status` | deprecated |
| `GET licenses/status/active` | New: Show status for currently active license |
| `GET licenses/validity/for-subject` | Check for valid license for given subject |
| `GET licenses/status/for-subject` | deprecated |
| `DELETE licenses/{licenseId}` | When called with a contract ID it will delete the contract and all associated licenses |
| `GET licenses/traffic-remaining` | Get the time series data for remaining provisioned traffic |
| `GET licenses/metrics` | Get the stats for consumed and remaining provisioned traffic |
| Endpoint | Description |
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| `PUT /system/indices/index_set_defaults` | This endpoint now expects an index set template id as payload. The values of the index set template are used as default values. |
| `GET licenses/{licenseId}` | deprecated |
| `GET licenses` | deprecated |
| `GET licenses/status` | deprecated |
| `GET licenses/status/active` | New: Show status for currently active license |
| `GET licenses/validity/for-subject` | Check for valid license for given subject |
| `GET licenses/status/for-subject` | deprecated |
| `DELETE licenses/{licenseId}` | When called with a contract ID it will delete the contract and all associated licenses |
| `GET licenses/traffic-remaining` | Get the time series data for remaining provisioned traffic |
| `GET licenses/metrics` | Get the stats for consumed and remaining provisioned traffic |
| `GET licenses/traffic-threshold` | Get info about license traffic threshold warning |
| `PUT licenses/traffic-threshold/acknowledgement` | Acknowledge current traffic threshold warning |

## Deprecated Inputs

Expand Down
4 changes: 4 additions & 0 deletions changelog/unreleased/issue-19693.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type="c"
message="Changed default setting for Kafka transports to non-legacy mode."

issues=["19693"]
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-20081.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "f"
message = "Add support for remote-reindex migration of closed indices"

pulls = ["20081"]
issues = ["20068"]
1 change: 1 addition & 0 deletions changelog/unreleased/pr-20110.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
type = "a"
message = "Added categories to Streams to allow Illuminate content to be scoped to multiple products."

issues = ["graylog-plugin-enterprise#7945"]
pulls = ["20110"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.rest;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.graylog.datanode.configuration.DatanodeTrustManagerProvider;
import org.graylog.storage.opensearch2.IndexState;
import org.graylog.storage.opensearch2.IndexStateChangeRequest;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Locale;
import java.util.Objects;

import static org.graylog.datanode.rest.OpensearchConnectionCheckController.CONNECT_TIMEOUT;
import static org.graylog.datanode.rest.OpensearchConnectionCheckController.READ_TIMEOUT;
import static org.graylog.datanode.rest.OpensearchConnectionCheckController.WRITE_TIMEOUT;

@Path("/index-state")
@Produces(MediaType.APPLICATION_JSON)
public class IndexStateController {

private final DatanodeTrustManagerProvider datanodeTrustManagerProvider;
private final OkHttpClient httpClient;

@Inject
public IndexStateController(DatanodeTrustManagerProvider datanodeTrustManagerProvider) {
this.datanodeTrustManagerProvider = datanodeTrustManagerProvider;
this.httpClient = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.connectTimeout(CONNECT_TIMEOUT)
.writeTimeout(WRITE_TIMEOUT)
.readTimeout(READ_TIMEOUT)
.build();
}

@POST
@Path("/get")
public IndexState get(IndexStateChangeRequest indexStateChangeRequest) {
final String host = indexStateChangeRequest.host().endsWith("/") ? indexStateChangeRequest.host() : indexStateChangeRequest.host() + "/";
final Request.Builder request = new Request.Builder()
.url(host + "_cat/indices/" + indexStateChangeRequest.indexName() + "/?h=status");
if (Objects.nonNull(indexStateChangeRequest.username()) && Objects.nonNull(indexStateChangeRequest.password())) {
request.header("Authorization", Credentials.basic(indexStateChangeRequest.username(), indexStateChangeRequest.password()));
}
try (var response = getClient().newCall(request.build()).execute()) {
if (response.isSuccessful() && response.body() != null) {
final String state = response.body().string().trim().toUpperCase(Locale.ROOT);
return IndexState.valueOf(state);
} else {
throw new RuntimeException("Failed to detect open/close index status " + indexStateChangeRequest.indexName() + ". Code: " + response.code() + "; message=" + response.message());
}
} catch (IOException e) {
throw new RuntimeException("Failed to open/close index" + indexStateChangeRequest.indexName(), e);
}
}

@POST
@Path("/set")
public IndexState change(IndexStateChangeRequest indexStateChangeRequest) {
return performAction(indexStateChangeRequest);
}

private IndexState performAction(IndexStateChangeRequest indexStateChangeRequest) {
final String host = indexStateChangeRequest.host().endsWith("/") ? indexStateChangeRequest.host() : indexStateChangeRequest.host() + "/";
final Request.Builder request = new Request.Builder()
.post(RequestBody.create("", okhttp3.MediaType.parse(MediaType.APPLICATION_JSON)))
.url(host + indexStateChangeRequest.indexName() + "/" + (indexStateChangeRequest.action() == IndexState.OPEN ? "_open" : "_close"));
if (Objects.nonNull(indexStateChangeRequest.username()) && Objects.nonNull(indexStateChangeRequest.password())) {
request.header("Authorization", Credentials.basic(indexStateChangeRequest.username(), indexStateChangeRequest.password()));
}
try (var response = getClient().newCall(request.build()).execute()) {
if (response.isSuccessful()) {
return indexStateChangeRequest.action();
} else {
throw new RuntimeException("Failed to open/close index " + indexStateChangeRequest.indexName() + ". Code: " + response.code() + "; message=" + response.message());
}
} catch (IOException e) {
throw new RuntimeException("Failed to open/close index" + indexStateChangeRequest.indexName(), e);
}
}

private OkHttpClient getClient() {
try {
final SSLContext ctx = SSLContext.getInstance("TLS");
final X509TrustManager trustManager = datanodeTrustManagerProvider.get();
ctx.init(null, new TrustManager[]{trustManager}, new SecureRandom());
return httpClient.newBuilder().sslSocketFactory(ctx.getSocketFactory(), trustManager).build();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.graylog.datanode.configuration.DatanodeTrustManagerProvider;
import org.graylog.storage.opensearch2.ConnectionCheckIndex;
import org.graylog.storage.opensearch2.ConnectionCheckRequest;
import org.graylog.storage.opensearch2.ConnectionCheckResponse;
import org.graylog2.security.TrustAllX509TrustManager;
Expand All @@ -42,6 +43,7 @@
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -74,19 +76,24 @@ public ConnectionCheckResponse status(ConnectionCheckRequest request) {
final List<X509Certificate> unknownCertificates = new LinkedList<>();
try {
unknownCertificates.addAll(extractUnknownCertificates(request.host()));
final List<String> indices = getAllIndicesFrom(request.host(), request.username(), request.password(), request.trustUnknownCerts());
final List<ConnectionCheckIndex> indices = getAllIndicesFrom(request.host(), request.username(), request.password(), request.trustUnknownCerts());
return ConnectionCheckResponse.success(indices, unknownCertificates);
} catch (Exception e) {
return ConnectionCheckResponse.error(e, unknownCertificates);
}
}

List<String> getAllIndicesFrom(final String host, final String username, final String password, boolean trustUnknownCerts) {
var url = (host.endsWith("/") ? host : host + "/") + "_cat/indices?h=index";
List<ConnectionCheckIndex> getAllIndicesFrom(final String host, final String username, final String password, boolean trustUnknownCerts) {
var url = (host.endsWith("/") ? host : host + "/") + "_cat/indices?h=index,status";
try (var response = getClient(trustUnknownCerts).newCall(new Request.Builder().url(url).header("Authorization", Credentials.basic(username, password)).build()).execute()) {
if (response.isSuccessful() && response.body() != null) {
// filtering all indices that start with "." as they indicate a system index - we don't want to reindex those
return new BufferedReader(new StringReader(response.body().string())).lines().filter(i -> !i.startsWith(".")).sorted().toList();
return new BufferedReader(new StringReader(response.body().string()))
.lines()
.filter(i -> !i.startsWith("."))
.map(this::parseIndexLine)
.sorted(Comparator.comparing(ConnectionCheckIndex::name, Comparator.naturalOrder()))
.toList();
} else {
String message = String.format(Locale.ROOT, "Could not read list of indices from %s. Code=%d, message=%s", host, response.code(), response.message());
throw new RuntimeException(message);
Expand All @@ -96,6 +103,11 @@ List<String> getAllIndicesFrom(final String host, final String username, final S
}
}

private ConnectionCheckIndex parseIndexLine(String line) {
final String[] parts = line.split("\\s+");
return new ConnectionCheckIndex(parts[0], parts[1].contains("close"));
}


private OkHttpClient getClient(boolean trustUnknownCerts) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected void configure() {
addSystemRestResource(ManagementController.class);
addSystemRestResource(IndicesDirectoryController.class);
addSystemRestResource(OpensearchConnectionCheckController.class);
addSystemRestResource(IndexStateController.class);
addSystemRestResource(CertificatesController.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.assertj.core.api.Assertions;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.indices.CloseIndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.indices.CreateIndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.indices.CreateIndexResponse;
import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
Expand Down Expand Up @@ -78,6 +79,8 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException {
final String messageContent = ingestRandomMessage(indexName);
final String messageContent2 = ingestRandomMessage(indexName2);

closeIndex(indexName);

// flush the newly created document
openSearchInstance.client().refreshNode();

Expand All @@ -99,11 +102,14 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException {
final String status = response.extract().body().jsonPath().get("status");
Assertions.assertThat(status).isEqualTo("FINISHED");

Assertions.assertThat(waitForMessage(indexName, messageContent)).containsEntry("message", messageContent);
Assertions.assertThat(waitForMessage(indexName2, messageContent2)).containsEntry("message", messageContent2);

}

private void closeIndex(String indexName) {
openSearchInstance.openSearchClient().execute((restHighLevelClient, requestOptions) -> restHighLevelClient.indices().close(new CloseIndexRequest(indexName), requestOptions));
}

/**
* @return name of the newly created index
*/
Expand Down
2 changes: 1 addition & 1 deletion graylog-plugin-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<web.build-dir>${project.build.directory}/web/build</web.build-dir>

<!-- Plugin versions -->
<jdeb.version>1.10</jdeb.version>
<jdeb.version>1.11</jdeb.version>
<rpm-maven.version>2.2.0</rpm-maven.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@

public record AggregatedConnectionResponse(Map<String, ConnectionCheckResponse> responses) {
@Nonnull
public List<String> indices() {
public List<ConnectionCheckIndex> indices() {
return responses.values().stream()
.filter(v -> Objects.nonNull(v.indices()))
.flatMap(v -> v.indices().stream())
.sorted(Comparator.naturalOrder())
.sorted(Comparator.comparing(ConnectionCheckIndex::name, Comparator.naturalOrder()))
.distinct()
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch2;

public record ConnectionCheckIndex(String name, boolean closed) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.stream.Collectors;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
public record ConnectionCheckResponse(List<String> indices, List<String> certificates,
public record ConnectionCheckResponse(List<ConnectionCheckIndex> indices, List<String> certificates,
String error) {
public static ConnectionCheckResponse success(List<String> indices, List<X509Certificate> certificates) {
public static ConnectionCheckResponse success(List<ConnectionCheckIndex> indices, List<X509Certificate> certificates) {
return new ConnectionCheckResponse(indices, encodeCerts(certificates), null);
}

Expand Down
Loading

0 comments on commit 10f384a

Please sign in to comment.