Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate/update schema queries #51

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/app.env
target/
.vscode/**
18 changes: 15 additions & 3 deletions examples/measurement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,31 @@ async fn main() -> Result<(), Box<dyn Error>> {

let client = influxdb2::Client::new(influx_url, org, token);

let measurements = client.list_measurements(bucket).await.unwrap();
let measurements = client.list_measurements(bucket, Some(365)).await.unwrap();
println!("measurements: {:?}", measurements);

for m in measurements.iter() {
let field_keys = client
.list_measurement_field_keys(bucket, &m)
.list_measurement_field_keys(bucket, &m, Some(365))
.await
.unwrap();
println!("field keys: {:?}", field_keys);
}

for m in measurements.iter() {
let tag_values = client.list_measurement_tag_values(bucket, &m, "host").await;
let tag_values = client
.list_measurement_tag_values(bucket, &m, "host", Some(365))
.await;
println!(
"tag values for measurement {} and tag {}: {:?}",
&m, "host", tag_values
);
}

for m in measurements.iter() {
let tag_values = client
.list_measurement_tag_keys(bucket, &m, Some(365))
.await;
println!(
"tag values for measurement {} and tag {}: {:?}",
&m, "host", tag_values
Expand Down
149 changes: 72 additions & 77 deletions src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,117 +251,112 @@ impl Client {
}

/// Returns bucket measurements
pub async fn list_measurements(&self, bucket: &str) -> Result<Vec<String>, RequestError> {
let req_url = self.url("/api/v2/query");
pub async fn list_measurements(
&self,
bucket: &str,
days_ago: Option<i64>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit late to this PR, but I'd like to note that this introduces a backwards incompatible change to anyone using one of these 3 functions.

Backwards compatibility could be maintained by restoring the original functions and adding new functions with the extra param. Alternatively (and this would be my preference) we could mention it in the release notes

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Boudewijn26 that's a valid point, I'll yank version 0.4.6 and bump minor version to v0.5 and create a release note, bumping major version seems to be unnecessary since we haven't reach v1 yet.

Also a clean up seems necessary like perhaps introducing both start and stop optional parameter so that it more closely resembles the API documentation.

I'll do this either today or tomorrow.

) -> Result<Vec<String>, RequestError> {
let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"

schema.measurements(bucket: "{bucket}") "#
));
let body = serde_json::to_string(&query).context(Serializing)?;

let response = self
.request(Method::POST, &req_url)
.header("Accepting-Encoding", "identity")
.header("Content-Type", "application/json")
.query(&[("org", &self.org)])
.body(body)
.send()
.await
.context(ReqwestProcessing)?;

match response.status() {
StatusCode::OK => {
let text = response.text().await.unwrap();
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.comment(Some(b'#'))
.from_reader(text.as_bytes());

Ok(reader
.records()
.into_iter()
.flatten()
.map(|r| r.get(3).map(|s| s.to_owned()))
.flatten()
.collect())
}
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
schema.measurements(bucket: "{bucket}"{}) "#,
match days_ago {
Some(days_ago) => {
format!(", start: -{}d", days_ago)
}
None => {
String::from("")
}
}
}
));
self.exec_schema_query(query).await
}

/// List a measurement's field keys
/// List field keys for measurement
pub async fn list_measurement_field_keys(
&self,
bucket: &str,
measurement: &str,
days_ago: Option<i64>,
) -> Result<Vec<String>, RequestError> {
let req_url = self.url("/api/v2/query");
let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"

schema.measurementFieldKeys(
bucket: "{bucket}",
measurement: "{measurement}",
)"#
));

let body = serde_json::to_string(&query).context(Serializing)?;

let response = self
.request(Method::POST, &req_url)
.header("Accepting-Encoding", "identity")
.header("Content-Type", "application/json")
.query(&[("org", &self.org)])
.body(body)
.send()
.await
.context(ReqwestProcessing)?;

match response.status() {
StatusCode::OK => {
let text = response.text().await.unwrap();
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.comment(Some(b'#'))
.from_reader(text.as_bytes());

Ok(reader
.records()
.into_iter()
.flatten()
.map(|r| r.get(3).map(|s| s.to_owned()))
.flatten()
.collect())
}
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
}
));
self.exec_schema_query(query).await
}

/// List keys of measurement tag
/// List all tag values for measurement tag
pub async fn list_measurement_tag_values(
&self,
bucket: &str,
measurement: &str,
tag: &str,
days_ago: Option<i64>,
) -> Result<Vec<String>, RequestError> {
let req_url = self.url("/api/v2/query");
let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"

schema.measurementTagValues(
bucket: "{bucket}",
measurement: "{measurement}",
tag: "{tag}"
)"#
tag: "{tag}",
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
));
self.exec_schema_query(query).await
}

/// List all tag keys for measurement
pub async fn list_measurement_tag_keys(
&self,
bucket: &str,
measurement: &str,
days_ago: Option<i64>,
) -> Result<Vec<String>, RequestError> {
let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"

schema.measurementTagKeys(
bucket: "{bucket}",
measurement: "{measurement}",
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
));
self.exec_schema_query(query).await
}

async fn exec_schema_query(&self, query: Query) -> Result<Vec<String>, RequestError> {
let req_url = self.url("/api/v2/query");
let body = serde_json::to_string(&query).context(Serializing)?;

let response = self
Expand Down
Loading