-
Notifications
You must be signed in to change notification settings - Fork 480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a new post for creating a new database integration with data prepper #3338
Changes from 5 commits
8c678f3
da9b752
e62c853
0f368a9
acc61a0
4eb2875
3cb65f1
efdefd2
918a59c
c9321c4
76716f5
25ef1ea
23239b5
24d7a11
1e21412
9a6b67b
f81c5ea
7d6e998
2a202d6
345d080
f7d7111
7c69248
7b69ff5
d5b7c28
120c629
f387f59
48fc976
708dda4
3d6f563
74457cf
349a8c2
0eae00d
81fe3e3
ccdf47f
93ce7d5
30fa3b3
38d4d8b
33d183b
b435caf
6ce6067
6ffe942
99175d0
c0653f1
bf7deea
abfed1e
19eae96
0f84c07
d398203
6de1340
a968bc6
df1858f
4fc360d
e029d6a
f3ed634
ee68a61
3498d1b
eab1f31
7f2ff64
ceed395
4f1d91d
34a09d5
e73b08e
e861af1
267ff08
4935010
f1b9134
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,364 @@ | ||
--- | ||
layout: post | ||
title: "Step-by-step: Creating a new database integration using Data Prepper" | ||
authors: | ||
- tylgry | ||
- dinujoh | ||
date: 2022-09-21 10:00:00 -0500 | ||
categories: | ||
- technical-post | ||
twittercard: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the following meta: meta_description: Use this step-by-step guide to set up and test a Data Prepper pipeline. Create a new database integration that streams data from MongoDB to OpenSearch using source coordination. |
||
description: "Data Prepper offers a flexible framework for database migration, supporting sources like MongoDB and DynamoDB. You can extend this capability to new databases by implementing a Data Prepper source plugin." | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
--- | ||
|
||
Data Prepper, an open source data collector, enables you to collect, filter, enrich, and aggregate trace and log data. With Data Prepper, you can prepare your data for downstream analysis and visualization in OpenSearch. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Data Prepper pipelines consist of three main components: a source, an optional set of processors, and one or more sinks. For more information, see [ Data Prepper key concepts and fundamentals](https://opensearch.org/docs/latest/data-prepper/#key-concepts-and-fundamentals). The following sections outline the steps necessary for implementing a new database source integration within Data Prepper. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
### Understanding push-based and pull-based sources | ||
|
||
Data Prepper source plugins fall into two categories: push-based and pull-based. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
_Pull-based sources_ such as HTTP and OpenTelemetry (OTel), scale easily across Data Prepper containers. _Push-based sources_ rely load balancing solutions, such as Kubernetes, NGINX, or Docker Swarm, to distribute workload across Data Prepper containers. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Unlike push-based sources, pull-based sources in Data Prepper use [Source coordination](https://opensearch.org/docs/latest/data-prepper/managing-data-prepper/source-coordination/) to achieve scalability and work distribution across multiple containers. Source coordination uses an external store functioning as a lease table, similar to the approach used by the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
kolchfa-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
### Defining work partitions for source coordination | ||
|
||
Data Prepper uses source coordination to distribute work partitions" across Data Prepper containers. | ||
Check failure on line 29 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
For new Data Prepper sources using source coordination, identifying and delineating work partitions is a fundamental first step. | ||
|
||
Data Prepper defines work partitions differently for various sources. In the S3 source, each S3 object represents a partition. For OpenSearch, an index serves as a partition. DynamoDB sources have dual partition types: S3 data files for exports and shards for stream processing. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
### Creating a source coordination-enabled Data Prepper plugin | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
A source coordination plugin consists of to two classes: the main plugin class and a configuration class. The configuration class specifies all required users inputs, from the data endpoints to authorization details and performance tuning parameters. All user-required inputs for plugin operation should be specified within this configuration class. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
For a practical starting point, refer to the [sample source code](https://github.com/graytaylor0/data-prepper/blob/SourceCoordinationSampleSource/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java) in the Data Prepper repository. | ||
|
||
This example demonstrates a basic configuration for a [hypothetical database source](https://github.com/graytaylor0/data-prepper/blob/SourceCoordinationSampleSource/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSourceConfig.java), requiring only `database_name`, `username`, and `password`. The plugin name and configuration class are defined in the `@DataPrepperPlugin` annotation. | ||
|
||
The `pipeline.yaml` for running this source in Data Prepper would be structured as follows: | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```yaml | ||
version: 2 | ||
sample-source-pipeline: | ||
source: | ||
sample_source: | ||
database_name: "my-database" | ||
username: 'my-username' | ||
password: 'my-password' | ||
sink: | ||
- stdout: | ||
``` | ||
|
||
### Using the source coordination APIs | ||
|
||
The [source coordination interface](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java) defines the methods available for interacting with the [source coordination store](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java). | ||
|
||
These methods provide for managing partition CRUD operations and getting the next available partition using `acquireAvailablePartition(String partitionType)`. A common source coordination pattern assigns a "leader" Data Prepper container for partition discovery and creation. This is done by initializing a "leader partition" at startup and using `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)` to assign partition management responsibilities. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The following code snippet shows a basic source coordination workflow, using a hypothetical database where each partition represent an individual database file. See the [full code](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/DatabaseWorker.java#L40) on GitHub. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The key components and workflow in the code for implementing source coordination in Data Prepper are as follows: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either "The key code components and workflow for implementing" or "The key components and code workflow for implementing", depending on the meaning. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably aligns more with the second one "The key components and code workflow for implementing" |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 68: Is there an extra parenthesis at the end of the second sentence? Also, would it be appropriate to add "for more information"? |
||
1. Upon Data Prepper's startup, a leader partition is established. See the [code reference](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java#L41)). The single leader partition is assigned to the Data Prepper node that successfully calls `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)`, assigning it the task of identifying new database files. | ||
Check failure on line 68 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
2. When a Data Prepper node owns the leader partition, it queries the hypothetical database and creates new partitions in the source coordination store, enabling all nodes running this source to access these database file partitions. | ||
3. A database file partition is acquired for processing. In cases where no partitions need processing, an empty `Optional` is returned. | ||
4. The database file undergoes processing, with its records written into the Data Prepper buffer as individual `Events`. Once all records have been written to the buffer, the source coordination store marks the database file partition as `COMPLETED`, ensuring it is not not processed again. | ||
Check failure on line 71 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```java | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this code snippet to follow its introductory paragraph. The code snippet should be at line 76. Then, move the overview/description (lines 77-82) after the code snippet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will update |
||
public void run() { | ||
|
||
while (!Thread.currentThread().isInterrupted()) { | ||
try { | ||
|
||
// 1 - Check if this node is already the leader. If it is not, then try to acquire leadership in case the leader node has crashed | ||
if (leaderPartition == null) { | ||
final Optional<EnhancedSourcePartition> sourcePartition = sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); | ||
if (sourcePartition.isPresent()) { | ||
LOG.info("Running as a LEADER that will discover new database files and create partitions"); | ||
leaderPartition = (LeaderPartition) sourcePartition.get(); | ||
} | ||
} | ||
|
||
// 2- If this node is the leader, run discovery of new database files and create partitions | ||
if (leaderPartition != null) { | ||
final List<EnhancedSourcePartition<DatabaseFilePartitionProgressState>> databaseFilePartitions = discoverDatabaseFilePartitions(); | ||
LOG.info("Discovered {} new database file partitions", databaseFilePartitions.size()); | ||
|
||
databaseFilePartitions.forEach(databaseFilePartition -> { | ||
sourceCoordinator.createPartition(databaseFilePartition); | ||
}); | ||
|
||
LOG.info("Created {} database file partitions in the source coordination store", databaseFilePartitions.size()); | ||
} | ||
|
||
// 3 - Grab a database file partition, process it by writing to the buffer, and mark that database file partition as completed | ||
final Optional<EnhancedSourcePartition> databaseFilePartition = sourceCoordinator.acquireAvailablePartition(DatabaseFilePartition.PARTITION_TYPE); | ||
|
||
// 4 - If it's empty that means there are no more database files to process for now. If it's not empty, the database file is processed and then marked as COMPLETED in the source coordination store | ||
if (databaseFilePartition.isPresent()) { | ||
processDataFile(databaseFilePartition.get().getPartitionKey()); | ||
sourceCoordinator.completePartition(databaseFilePartition.get()); | ||
} | ||
|
||
} catch (final Exception e) { | ||
LOG.error("Received an exception in DatabaseWorker loop, retrying"); | ||
} | ||
} | ||
} | ||
``` | ||
|
||
### Running and testing Data Prepper using source coordination | ||
|
||
Before creating a new plugin, you must set up and run Data Prepper locally. The following steps guide you in configuring Data Prepper for streaming documents from MongoDB to OpenSearch using source coordination. While this example uses a single Data Prepper instance, the source coordination allows for scalability when running multiple instances with identical pipeline configurations and shared source coordination store settings defined in `data-prepper-config.yaml`. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#### Step 1 - Set up Data Prepper for local development | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The [Data Prepper developer guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) provides a complete overview for running Data Prepper in various environments. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Creating a new source plugin requires cloning the Data Prepper repository and building it from source using the following commands: | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- Clone the Data Prepper repository | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
git clone https://github.com/opensearch-project/data-prepper.git | ||
``` | ||
|
||
- Build Data Prepper from source | ||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
``` | ||
./gradlew assemble | ||
``` | ||
|
||
#### Step 2 - Set up MongoDB locally | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
First, install and configure MongoDB using the [MongoDB installation guide](https://www.mongodb.com/docs/manual/installation/). Before running MongoDB, enable [MongoDB change streams](https://www.mongodb.com/docs/manual/changeStreams/) by following the instructions in [Convert a Standalone Self-Managed mongod to a Replica Set](https://www.mongodb.com/docs/manual/tutorial/convert-standalone-to-replica-set/). | ||
Check failure on line 139 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
Next, launch the MongoDB shell by running `mongosh`, and then create a new user and password within the shell using the following syntax. The username and password are required later in the Data Prepper `pipeline.yaml`. See [Create User Documentation](https://www.mongodb.com/docs/manual/reference/method/db.createUser/) for more information about MongoDB user creation. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
use admin | ||
db.createUser({"user": "dbuser","pwd": "admin1234","roles": []}); | ||
``` | ||
|
||
Then, create a new database named `demo`: | ||
Check failure on line 148 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
use demo | ||
``` | ||
|
||
Next, create a new MongoDB collection named `demo_collection` in your `demo` database with this syntax: | ||
|
||
``` | ||
db.createCollection("demo_collection") | ||
``` | ||
|
||
Finally, add sample records to the collection using the following syntax. These records are processed during the MongoDB pipeline's export phase: | ||
|
||
``` | ||
db.demo_collection.insertOne({"key-one": "value-one"}) | ||
db.demo_collection.insertOne({"key-two": "value-two"}) | ||
db.demo_collection.insertOne({"key-three": "value-three"}) | ||
``` | ||
|
||
#### Step 3 - Set up OpenSearch locally | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
To run OpenSearch locally, follow the steps in the [Installation quickstart](https://opensearch.org/docs/latest/getting-started/quickstart/). | ||
|
||
#### Step 4 - Create an Amazon S3 bucket | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Follow the steps in [Create a new S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html). You can skip this step if you have an existing bucket. This S3 buckets enables parallel processing and writing to OpenSearch across multiple Data Prepper containers in a multi-node setup, as only one node can read from MongoDB streams at a time. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#### Step 5 - Get AWS credentials for DynamoDB and S3 access | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Set up an AWS role with the following policy permissions to enable Data Prepper to interact with the DynamoDB source coordination store and the S3 bucket from step 4. Make sure to replace `MONGODB_BUCKET`, `REGION` and `AWS_ACCOUNT_ID` with your unique values. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```json | ||
{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Sid": "s3Access", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"s3:PutObject" | ||
], | ||
"Resource": [ "arn:aws:s3:::{{MONGODB_BUCKET}}/*" ] | ||
}, | ||
{ | ||
"Sid": "allowReadingFromS3Buckets", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"s3:GetObject", | ||
"s3:DeleteObject", | ||
"s3:GetBucketLocation", | ||
"s3:ListBucket" | ||
], | ||
"Resource": [ | ||
"arn:aws:s3:::{{MONGODB_BUCKET}}", | ||
"arn:aws:s3:::{{MONGODB_BUCKET}}/*" | ||
] | ||
}, | ||
{ | ||
"Sid": "allowListAllMyBuckets", | ||
"Effect":"Allow", | ||
"Action":"s3:ListAllMyBuckets", | ||
"Resource":"arn:aws:s3:::*" | ||
}, | ||
{ | ||
"Sid": "ReadWriteSourceCoordinationDynamoStore", | ||
"Effect": "Allow", | ||
"Action": [ | ||
"dynamodb:DescribeTimeToLive", | ||
"dynamodb:UpdateTimeToLive", | ||
"dynamodb:DescribeTable", | ||
"dynamodb:CreateTable", | ||
"dynamodb:GetItem", | ||
"dynamodb:DeleteItem", | ||
"dynamodb:PutItem", | ||
"dynamodb:Query" | ||
], | ||
"Resource": [ | ||
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/DataPrepperSourceCoordinationStore", | ||
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/DataPrepperSourceCoordinationStore/index/source-status" | ||
] | ||
} | ||
] | ||
} | ||
``` | ||
|
||
Run the following command, then enter the `Access Key Id` and `Secret Access Key` associated with credentials that correspond to the previously defined role: | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
``` | ||
aws configure | ||
``` | ||
|
||
|
||
Then, set the following environment variables: | ||
Check failure on line 240 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
export AWS_REGION="{{REGION}}" | ||
export SOURCE_COORDINATION_PIPELINE_IDENTIFIER="test-mongodb" | ||
``` | ||
|
||
The `SOURCE_COORDINATION_PIPELINE_IDENTIFIER` must correspond to the `partition_prefix` that you will define in the `data-prepper-config.yaml` in step 6. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#### Step 6 - Create the data-prepper-config.yaml | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Configure the source coordination store for Data Prepper using the `data-prepper-config.yaml` file. Currently, this store exclusively supports DynamoDB. | ||
|
||
In the `data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64/config/` directory, create a file named `data-prepper-config.yaml`. Insert the following content, replacing `REGION` with your desired DynamoDB table region and `ROLE_ARN_FROM_STEP_5` with the appropriate role ARN: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "ARN" => "Amazon Resource Name (ARN)"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @graytaylor0 Revised
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```yaml | ||
ssl: false | ||
source_coordination: | ||
partition_prefix: "test-mongodb" | ||
store: | ||
dynamodb: | ||
sts_role_arn: "{{ROLE_ARN_FROM_STEP_5}}" | ||
table_name: "DataPrepperSourceCoordinationStore" | ||
region: "{{REGION}}" | ||
skip_table_creation: false | ||
``` | ||
|
||
The `skip_table_creation` parameter is set to `false`, instructing Data Prepper create the table on startup if it is missing. For subsequent runs, you can set this flag to `true` to accelerate startup speed. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The `partition_prefix` enables soft resets of the pipeline in the source coordination store. When testing a new source plugin, incrementing this prefix (for example, `test-mongodb-1`, `test-mongodb-2`) ensures Data Prepper ignores DynamoDB items from the previous test runs. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#### Step 7 - Create the `pipeline.yaml` file | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
In the `data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64/pipelines/` directory, create a `pipeline.yaml` file with the following content. Make sure to update `S3_BUCKET_NAME`, `S3_BUCKET_REGION, ROLE_ARN_FROM_STEP_5`, and your OpenSearch password: | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```yaml | ||
pipeline: | ||
workers: 2 | ||
delay: 0 | ||
buffer: | ||
bounded_blocking: | ||
batch_size: 125000 | ||
buffer_size: 1000000 | ||
source: | ||
mongodb: | ||
host: "localhost" | ||
port: 27017 | ||
acknowledgments: true | ||
s3_bucket: "{{S3_BUCKET_NAME}}" | ||
s3_region: "{{S3_BUCKET_REGION}}" | ||
s3_prefix: "mongodb-opensearch" | ||
insecure: "true" | ||
ssl_insecure_disable_verification: "true" | ||
authentication: | ||
username: "dbuser" | ||
password: "admin1234" | ||
collections: | ||
- collection: "demo.demo_collection" | ||
export: true | ||
stream: true | ||
aws: | ||
sts_role_arn: "{{ROLE_ARN_FROM_STEP_5}}" | ||
sink: | ||
- opensearch: | ||
hosts: [ "http://localhost:9200" ] | ||
index: "mongodb-index" | ||
document_id: "${getMetadata(\"primary_key\")}" | ||
action: "${getMetadata(\"opensearch_action\")}" | ||
document_version: "${getMetadata(\"document_version\")}" | ||
document_version_type: "external" | ||
# Default username | ||
username: "admin" | ||
# Change to your OpenSearch password if needed. For running OpenSearch with Docker Compose, this is set by the environment variable OPENSEARCH_INITIAL_ADMIN_PASSWORD | ||
password: "OpenSearchMongoDB1#" | ||
flush_timeout: -1 | ||
``` | ||
|
||
#### Step 8 - Run the pipeline | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
With AWS credentials configured and both MongoDB and OpenSearch running on your local machine, you can launch the pipeline. | ||
|
||
First, navigate to the directory containing the Data Prepper binaries: | ||
|
||
``` | ||
cd data-prepper/release/archives/linux/build/install/opensearch-data-prepper-$VERSION-linux-x64 | ||
``` | ||
|
||
Then, start Data Prepper using the following command: | ||
Check failure on line 327 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
|
||
``` | ||
bin/data-prepper | ||
``` | ||
|
||
#### Step 9 - Review the documents in OpenSearch | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Wait for the export to complete, which may take a minute. Once Data Prepper displays a log, for example, `org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker - Received all acknowledgments for folder partition`, open `http://localhost:5601` to access OpenSearch Dashboards. | ||
Check failure on line 335 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Go to the **Dev Tools** application and enter `GET mongodb-index/_search` into the console editor to retrieve the MongoDB documents you created in step 2. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
kolchfa-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#### Step 10 - Add sample documents to MongoDB | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Add sample documents to MongoDB using the following command: | ||
|
||
``` | ||
db.demo_collection.insertOne({"key-four": "value-four"}) | ||
db.demo_collection.insertOne({"key-five": "value-five"}) | ||
db.demo_collection.insertOne({"key-six": "value-six"}) | ||
``` | ||
|
||
The MongoDB source in Data Prepper will now extract these documents from the MongoDB streams. | ||
Check failure on line 348 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#### Step 11 - Review the documents in OpenSearch | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
As soon as Data Prepper generates another log, for example, `org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker - Received all acknowledgments for folder partition`, return to **Dev Tools** and run another search on the index using `GET mongodb-index/_search`. | ||
|
||
#### Step 12 - Clean up resources | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
As you complete this process, make sure perform the following cleanup tasks: delete the DynamoDB source coordination store and S3 bucket, and stop the Data Prepper, MongoDB, and OpenSearch instances. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
### Summary | ||
|
||
natebower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
We hope this walkthrough deepens. your knowledge of the Data Prepper architecture and process of creating scalable plugins with source coordination. For any suggestions regarding new database plugins, assistance with plugin creation, or general Data Prepper questions, [create a new discussion](https://github.com/opensearch-project/data-prepper/discussions). The Data Prepper community and maintenance team are committed to supporting your efforts. | ||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
that can scale via Source Coordination. If you have any ideas or proposals on new database plugins for Data Prepper, | ||
Check warning on line 361 in _posts/2024-09-24-creating-a-new-database-integration-with-data-prepper.markdown GitHub Actions / style-job
|
||
graytaylor0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the date to: 2024-11-05