Skip to content

Commit

Permalink
Replace high level requests with raw JSON.
Browse files Browse the repository at this point in the history
  • Loading branch information
dblock committed Oct 30, 2023
1 parent 33a7826 commit c3eab66
Showing 1 changed file with 108 additions and 44 deletions.
152 changes: 108 additions & 44 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,30 @@
* GitHub history for details.
*/

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use aws_config::meta::region::RegionProviderChain;
use std::{convert::TryInto, env, thread, time};
use aws_config::meta::region::RegionProviderChain;
use std::{convert::TryInto, env, thread, time};

use serde_json::Value;
use serde_json::{json, Value};

use opensearch::{
http::transport::{SingleNodeConnectionPool, TransportBuilder},
OpenSearch,
};

use url::Url;
use opensearch::http::headers::HeaderMap;
use opensearch::http::request::JsonBody;
use opensearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
use opensearch::http::{Method, Url};
use opensearch::OpenSearch;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let url = Url::parse(&env::var("ENDPOINT").expect("Missing ENDPOINT"));
let service_name = &env::var("SERVICE").unwrap_or("es".to_string());
let conn_pool = SingleNodeConnectionPool::new(url?);
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
let aws_config = aws_config::from_env()
.region(region_provider)
.load()
.await
.clone();
let transport = TransportBuilder::new(conn_pool)
.auth(aws_config.clone().try_into()?)
.service_name(service_name)
Expand All @@ -38,7 +41,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// TODO: remove when OpenSearch Serverless adds support for GET /
if service_name == "es" {
let info: Value = client.info().send().await?.json().await?;
let info: Value = client
.send::<(), ()>(Method::Get, "/", HeaderMap::new(), None, None, None)
.await?
.json()
.await?;
println!(
"{}: {}",
info["version"]["distribution"].as_str().unwrap(),
Expand All @@ -47,49 +54,106 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

let index_name = "movies";
let document_id = "1";

let index_body: JsonBody<_> = json!({
"settings": {
"index": {
"number_of_shards" : 4
}
}
})
.into();

client
.indices()
.create(opensearch::indices::IndicesCreateParts::Index(index_name))
.send()
let create_index_response = client
.send(
Method::Put,
&format!("/{index_name}"),
HeaderMap::new(),
Option::<&()>::None,
Some(index_body),
None,
)
.await?;

client
.index(opensearch::IndexParts::Index(index_name))
.body(serde_json::json!({
"title": "Moneyball",
"director": "Bennett Miller",
"year": 2011
}
))
.send()
assert_eq!(create_index_response.status_code(), 200);

let document: JsonBody<_> = json!({
"title": "Moneyball",
"director": "Bennett Miller",
"year": "2011"
})
.into();
let create_document_response = client
.send(
Method::Put,
&format!("/{index_name}/_doc/{document_id}"),
HeaderMap::new(),
Some(&[("refresh", "true")]),
Some(document),
None,
)
.await?;

assert_eq!(create_document_response.status_code(), 201);

thread::sleep(time::Duration::from_secs(5));

let response = client
.search(opensearch::SearchParts::Index(&[index_name]))
.body(serde_json::json!({
"query": {
"match": {
"director": "miller"
}
}
let q = "miller";
let query: JsonBody<_> = json!({
"size": 5,
"query": {
"multi_match": {
"query": q,
"fields": ["title^2", "director"]
}
))
.send()
}
})
.into();

let search_response = client
.send(
Method::Post,
&format!("/{index_name}/_search"),
HeaderMap::new(),
Option::<&()>::None,
Some(query),
None,
)
.await?;

let response_body = response.json::<Value>().await?;
println!("{}", serde_json::to_string_pretty(&response_body).unwrap());
assert_eq!(search_response.status_code(), 200);
let search_result = search_response.json::<Value>().await?;
println!(
"Hits: {:#?}",
search_result["hits"]["hits"].as_array().unwrap()
);

client
.indices()
.delete(opensearch::indices::IndicesDeleteParts::Index(&[
index_name,
]))
.send()
let delete_document_response = client
.send::<(), ()>(
Method::Delete,
&format!("/{index_name}/_doc/{document_id}"),
HeaderMap::new(),
None,
None,
None,
)
.await?;

assert_eq!(delete_document_response.status_code(), 200);

let delete_response = client
.send::<(), ()>(
Method::Delete,
&format!("/{index_name}"),
HeaderMap::new(),
None,
None,
None,
)
.await?;

assert_eq!(delete_response.status_code(), 200);

Ok(())
}

0 comments on commit c3eab66

Please sign in to comment.