Skip to content

Commit

Permalink
Merge pull request #293 from Aiven-Open/issue284
Browse files Browse the repository at this point in the history
Datastreams index template creation or check for existence
  • Loading branch information
reta authored Aug 6, 2024
2 parents f8864d9 + d8180d3 commit 5e03cda
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 23 deletions.
7 changes: 7 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ Data Stream
* Valid Values: non-empty string
* Importance: medium

``data.streams.existing.index.template.name``
If data.streams.create.index.template is provided, data stream and index template will be created if it doesn't exist.

* Type: string
* Default: null
* Importance: medium

Authentication
^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void setup() throws Exception {
opensearchClient = new OpensearchClient(config);
}

protected Map<String, String> getDefaultProperties() {
protected static Map<String, String> getDefaultProperties() {
return Map.of(CONNECTION_URL_CONFIG, opensearchContainer.getHttpHostAddress(), CONNECTION_USERNAME_CONFIG,
"admin", CONNECTION_PASSWORD_CONFIG, "admin");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Optional<Boolean> assertConnectorAndTasksRunning(final String connectorName, fin
}
}

Map<String, String> connectorProperties() {
static Map<String, String> connectorProperties(String topicName) {
final var props = new HashMap<>(getDefaultProperties());
props.put(CONNECTOR_CLASS_CONFIG, OpensearchSinkConnector.class.getName());
props.put(TOPICS_CONFIG, topicName);
Expand All @@ -111,9 +111,9 @@ Map<String, String> connectorProperties() {
return props;
}

void writeRecords(final int numRecords) {
void writeRecords(final int numRecords, String topicNameToWrite) {
for (int i = 0; i < numRecords; i++) {
connect.kafka().produce(topicName, String.valueOf(i), String.format("{\"doc_num\":%d}", i));
connect.kafka().produce(topicNameToWrite, String.valueOf(i), String.format("{\"doc_num\":%d}", i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public OpensearchSinkConnectorIT() {

@Test
public void testConnector() throws Exception {
connect.configureConnector(CONNECTOR_NAME, connectorProperties());
connect.configureConnector(CONNECTOR_NAME, connectorProperties(TOPIC_NAME));
waitForConnectorToStart(CONNECTOR_NAME, 1);

writeRecords(10);
writeRecords(10, TOPIC_NAME);

waitForRecords(TOPIC_NAME, 10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.DataStreamsStatsRequest;
import org.opensearch.client.indices.DeleteDataStreamRequest;
import org.opensearch.client.indices.PutComposableIndexTemplateRequest;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpensearchSinkDataStreamConnectorIT extends AbstractKafkaConnectIT {

static final Logger LOGGER = LoggerFactory.getLogger(OpensearchSinkConnectorIT.class);

static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(60);

static final String TOPIC_NAME = "ds-topic";

static final String TOPIC_NAME1 = "ds-topic1";

static final String DATA_STREAM_PREFIX = "os-data-stream";

static final String DATA_STREAM_PREFIX_WITH_TIMESTAMP = "os-data-stream-ts";
Expand All @@ -52,13 +54,13 @@ public OpensearchSinkDataStreamConnectorIT() {

@Test
void testConnector() throws Exception {
final var props = connectorProperties();
final var props = connectorProperties(TOPIC_NAME);
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_ENABLED, "true");
connect.configureConnector(CONNECTOR_NAME, props);

waitForConnectorToStart(CONNECTOR_NAME, 1);

writeRecords(10);
writeRecords(10, TOPIC_NAME);

waitForRecords(TOPIC_NAME, 10);

Expand All @@ -68,7 +70,7 @@ void testConnector() throws Exception {

@Test
void testConnectorWithDataStreamCustomTimestamp() throws Exception {
final var props = connectorProperties();
final var props = connectorProperties(topicName);
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_ENABLED, "true");
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_PREFIX, DATA_STREAM_PREFIX_WITH_TIMESTAMP);
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_FIELD, "custom_timestamp");
Expand All @@ -90,20 +92,73 @@ void testConnectorWithDataStreamCustomTimestamp() throws Exception {

@Test
void testConnectorWithDataStreamPrefix() throws Exception {
final var props = connectorProperties();
final var props = connectorProperties(TOPIC_NAME);
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_ENABLED, "true");
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_PREFIX, DATA_STREAM_PREFIX);
connect.configureConnector(CONNECTOR_NAME, props);

waitForConnectorToStart(CONNECTOR_NAME, 1);
writeRecords(10);
writeRecords(10, TOPIC_NAME);
waitForRecords(DATA_STREAM_WITH_PREFIX_INDEX_NAME, 10);

assertDataStream(DATA_STREAM_WITH_PREFIX_INDEX_NAME);
assertDocs(DATA_STREAM_WITH_PREFIX_INDEX_NAME,
OpensearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_FIELD_DEFAULT);
}

/*
* User provided template doesn't exist, so create one
*/
@Test
void testConnectorWithUserProvidedTemplateDoesNotExist() throws Exception {
final var props = connectorProperties(TOPIC_NAME1);
connect.kafka().createTopic(TOPIC_NAME1);
String userProvidedTemplateName = "test-template1";
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_ENABLED, "true");
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_INDEX_TEMPLATE_NAME, userProvidedTemplateName);
connect.configureConnector(CONNECTOR_NAME, props);

waitForConnectorToStart(CONNECTOR_NAME, 1);

writeRecords(10, TOPIC_NAME1);
waitForRecords(TOPIC_NAME1, 10);

// Search for datastreams with topic name, and it should exist
final var dsStats = opensearchClient.client.indices()
.dataStreamsStats(new DataStreamsStatsRequest(userProvidedTemplateName), RequestOptions.DEFAULT);
assertEquals(1, dsStats.getDataStreamCount());
deleteTopic(TOPIC_NAME1);

// Delete datastream
DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest(userProvidedTemplateName);
opensearchClient.client.indices().deleteDataStream(deleteDataStreamRequest, RequestOptions.DEFAULT);
}

// A new template will not be created, as the one provided by user already exists
@Test
void testConnectorWithUserProvidedTemplateAlreadyExists() throws Exception {
final var props = connectorProperties(TOPIC_NAME1);
connect.kafka().createTopic(TOPIC_NAME1);
String existingTemplate = "test-template2";
String dataStream = "test-data-stream_1";
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_ENABLED, "true");
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_INDEX_TEMPLATE_NAME, existingTemplate);
connect.configureConnector(CONNECTOR_NAME, props);

waitForConnectorToStart(CONNECTOR_NAME, 1);
// make sure index template exists
createDataStreamAndTemplate(dataStream, existingTemplate);

writeRecords(10, TOPIC_NAME1);
waitForRecords(TOPIC_NAME1, 10);

// Search for datastreams with default index - topic name, and it should not exist
final var dsStats = opensearchClient.client.indices()
.dataStreamsStats(new DataStreamsStatsRequest(TOPIC_NAME1), RequestOptions.DEFAULT);
assertEquals(0, dsStats.getDataStreamCount());
deleteTopic(TOPIC_NAME1);
}

void assertDataStream(final String dataStreamName) throws Exception {
final var dsStats = opensearchClient.client.indices()
.dataStreamsStats(new DataStreamsStatsRequest(dataStreamName), RequestOptions.DEFAULT);
Expand All @@ -124,4 +179,22 @@ void assertDocs(final String dataStreamIndexName, final String timestampFieldNam
}
}

void createDataStreamAndTemplate(String dataStream, String dataStreamTemplate) throws IOException {
final ComposableIndexTemplate template = new ComposableIndexTemplate(Arrays.asList(dataStream, "index-logs-*"),
null, null, 100L, null, null, new ComposableIndexTemplate.DataStreamTemplate());
final PutComposableIndexTemplateRequest request = new PutComposableIndexTemplateRequest();
request.name(dataStreamTemplate);
request.indexTemplate(template);

opensearchClient.client.indices().putIndexTemplate(request, RequestOptions.DEFAULT);
}

void deleteTopic(String topicName) {
try (final var admin = connect.kafka().createAdminClient()) {
final var result = admin.deleteTopics(List.of(topicName));
result.all().get();
} catch (final ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public OpensearchSinkUpsertConnectorIT() {

@Test
public void testConnector() throws Exception {
final var props = connectorProperties();
final var props = connectorProperties(TOPIC_NAME);
props.put(OpensearchSinkConnectorConfig.INDEX_WRITE_METHOD,
IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT));
props.put(OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "false");
connect.configureConnector(CONNECTOR_NAME, props);
waitForConnectorToStart(CONNECTOR_NAME, 1);

writeRecords(3);
writeRecords(3, TOPIC_NAME);

waitForRecords(TOPIC_NAME, 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,16 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT));

public static final String DATA_STREAM_ENABLED = "data.stream.enabled";
public static final String DATA_STREAM_INDEX_TEMPLATE_NAME = "data.streams.existing.index.template.name";

public static final String DATA_STREAM_ENABLED_DOC = "Enable use of data streams. "
+ "If set to true the connector will write to data streams instead of regular indices. "
+ "Default is false.";

public static final String DATA_STREAM_EXISTING_INDEX_TEMPLATE_NAME_DOC = "If "
+ "data.streams.existing.index.template.name is provided, and if that index "
+ "template does not exist, a template will be created with that name, else no template is created.";

public static final String DATA_STREAM_PREFIX = "data.stream.prefix";

public static final String DATA_STREAM_NAME_DOC = "Generic data stream name to write into. "
Expand Down Expand Up @@ -298,7 +303,10 @@ private static void addDataStreamConfig(final ConfigDef configDef) {
DATA_STREAM_NAME_DOC, DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream name")
.define(DATA_STREAM_TIMESTAMP_FIELD, Type.STRING, DATA_STREAM_TIMESTAMP_FIELD_DEFAULT,
new ConfigDef.NonEmptyString(), Importance.MEDIUM, DATA_STREAM_TIMESTAMP_FIELD_DOC,
DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream timestamp field");
DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream timestamp field")
.define(DATA_STREAM_INDEX_TEMPLATE_NAME, Type.STRING, null, Importance.MEDIUM,
DATA_STREAM_EXISTING_INDEX_TEMPLATE_NAME_DOC, DATA_STREAM_GROUP_NAME, ++order, Width.LONG,
"Data stream name");
}

public static final ConfigDef CONFIG = baseConfigDef();
Expand Down Expand Up @@ -373,6 +381,10 @@ public boolean dataStreamEnabled() {
return getBoolean(DATA_STREAM_ENABLED);
}

public Optional<String> dataStreamExistingIndexTemplateName() {
return Optional.ofNullable(getString(OpensearchSinkConnectorConfig.DATA_STREAM_INDEX_TEMPLATE_NAME));
}

public Optional<String> dataStreamPrefix() {
return Optional.ofNullable(getString(OpensearchSinkConnectorConfig.DATA_STREAM_PREFIX));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,20 @@ private void ensureIndexOrDataStreamExists(final String index) {
if (!indexCache.contains(index)) {
if (!client.indexOrDataStreamExists(index)) {
if (config.dataStreamEnabled()) {
LOGGER.info("Create data stream {}", index);
client.createIndexTemplateAndDataStream(index, config.dataStreamTimestampField());
if (config.dataStreamExistingIndexTemplateName().isPresent()) {
String userProvidedTemplate = config.dataStreamExistingIndexTemplateName().get();
if (!client.dataStreamIndexTemplateExists(userProvidedTemplate)) {
LOGGER.info("Creating index template {} for data stream {}", userProvidedTemplate, index);
client.createIndexTemplateAndDataStream(userProvidedTemplate,
config.dataStreamTimestampField());
} else {
LOGGER.info("Using existing index template {} for data stream {}", userProvidedTemplate,
index);
}
} else {
LOGGER.info("Create data stream {}", index);
client.createIndexTemplateAndDataStream(index, config.dataStreamTimestampField());
}
} else {
LOGGER.info("Create index {}", index);
client.createIndex(index);
Expand Down

0 comments on commit 5e03cda

Please sign in to comment.