Skip to content

VishalGawade1/AI-FlowStream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Streaming Generative AI Application on AWS

Overview

This repository contains the code to show how to easily incorporate generative AI into a real-time streaming pipeline. More specifically, we make use of Flink's Async I/O API to make asynchronous requests to the Amazon Bedrock API and process the incoming review stream. The processed reviews are streamed to Amazon OpenSearch and can be viewed in an OpenSearch Dashboard.

This repository is intended to get developers started experimenting with generative AI and streaming data on AWS.

Dashboard Overview

Architecture Diagram

Architecture Overview

The architecture diagram is divided into two parts: the real-time streaming pipeline and access details to the Amazon OpenSearch Dashboard. The pipeline starts with a Python script simulating a producer that sends reviews from the Large Movie Review Dataset to Amazon Kinesis. These reviews are processed through an Amazon Managed Apache Flink application, which makes asynchronous calls to Amazon Bedrock. The results are then loaded into an Amazon OpenSearch cluster for visualization.

For dashboard access, we use a bastion host in the same VPC subnet as the OpenSearch cluster. Connection to this host is secured via Amazon Systems Manager's Session Manager, which allows secure connectivity without open inbound ports, enabling local access to the dashboard through port forwarding.

Pre-requisites

Getting Started

  1. Clone the repository to your desired workspace:
git clone https://github.com/aws-samples/aws-streaming-generative-ai-application.git
  1. Move to the flink-async-bedrock directory and build the JAR file:
cd flink-async-bedrock && mvn clean package
  1. Afterwards move back to the root of the directory, and then to the cdk directory to deploy the resources in your AWS account. Note that you have configured the AWS CLI before with your credentials (for more info see here).
cd cdk && npm install & cdk deploy
  1. Take note of the output values. The output will similar to the output below:
 ✅  StreamingGenerativeAIStack

✨  Deployment time: 1414.26s

Outputs:
StreamingGenerativeAIStack.BastionHostBastionHostIdC743CBD6 = i-0970816fa778f9821
StreamingGenerativeAIStack.accessOpenSearchClusterOutput = aws ssm start-session --target i-0970816fa778f9821 --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com"]}'
StreamingGenerativeAIStack.bastionHostIdOutput = i-0970816fa778f9821
StreamingGenerativeAIStack.domainEndpoint = vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com
StreamingGenerativeAIStack.regionOutput = us-east-1
Stack ARN:
arn:aws:cloudformation:us-east-1:<AWS Account ID>:stack/StreamingGenerativeAIStack/3dec75f0-cc9e-11ee-9b16-12348a4fbf87

✨  Total time: 1418.61s
  1. Establish connection to the OpenSearch cluster:

For Linux/Mac:

Run the following command to establish connection to OpenSearch in a separate terminal window. The command can be found as output accessOpenSearchClusterOutput:

aws ssm start-session --target <BastionHostId> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["<OpenSearchDomainHost>"]}'

For Windows:

Open a separate Windows cmd terminal.

aws ssm start-session ^
    --target <BastionHostId> ^
    --document-name AWS-StartPortForwardingSessionToRemoteHost ^
    --parameters host="<OpenSearchDomainHost>",portNumber="443",localPortNumber="8157"
  1. Create the required index in Amazon OpenSearch:

For Linux/Mac:

curl --location -k --request PUT https://localhost:8157/processed_reviews \
--header 'Content-Type: application/json' \
--data-raw '{
  "mappings": {
    "properties": {
        "reviewId": {"type": "integer"},
        "userId": {"type": "keyword"},
        "summary": {"type": "keyword"},
        "sentiment": {"type": "keyword"},
        "dateTime": {"type": "date"}}
    }
  }
}'

For Windows: (Note: Ensure you are using Powershell 7+):

$url = "https://localhost:8157/processed_reviews"
$headers = @{
    "Content-Type" = "application/json"
}
$body = @{
    "mappings" = @{
        "properties" = @{
            "reviewId" = @{ "type" = "integer" }
            "userId" = @{ "type" = "keyword" }
            "summary" = @{ "type" = "keyword" }
            "sentiment" = @{ "type" = "keyword" }
            "dateTime" = @{ "type" = "date" }
        }
    }
} | ConvertTo-Json -Depth 3

Invoke-RestMethod -Method Put -Uri $url -Headers $headers -Body $body -SkipCertificateCheck
  1. Open the OpenSearch Dashboard:
  • Open your browser and access https://localhost:8157/_dashboards
  • Open the menu and click on Dashboards Management under Management, then click on Saved Objects and import export.ndjson which can be found in the resources folder.
  1. Download the review data here.

  2. After the download is complete, extract the .tar.gz file to retrieve the folder named aclImdb 3 or similar that contains the review data. Rename the review data folder to aclImdb.

  3. Move the extracted folder inside the data/ directory within the downloaded repository.

  4. Modify the DATA_DIR path in producer/produce.py as required. Be sure to also adapt the AWS_REGION constant if you are deploying this in a region other than us-east-1.

  5. Install the required dependencies and start generating data:

cd producer 
pip install -r requirements.txt
python produce.py

Clean up

Delete the StreamingGenerativeAI-Stack in your AWS account.

cd cdk && cdk destroy

Note: You may have to delete the AWSServiceRoleForAmazonElasticsearchService separately.

Authors

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published