Skip to content

Commit

Permalink
Merge pull request #62 from axiomhq/tabular-support
Browse files Browse the repository at this point in the history
move axiom-rs to use tabular results for queries
  • Loading branch information
Licenser authored Aug 29, 2024
2 parents c36032b + 821e2a2 commit 5139218
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 436 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ futures-util = "0.3"
httpmock = "0.7"
structopt = "0.3"
tracing-subscriber = { version = "0.3", features = ["ansi", "env-filter"] }
cli-table = "0.4.9"

[features]
default = ["tokio", "default-tls"]
Expand Down
47 changes: 29 additions & 18 deletions examples/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axiom_rs::{
datasets::{ContentEncoding, ContentType},
Client,
};
use std::time::Duration;
use cli_table::{Cell as _, Style as _, Table as _};
use structopt::StructOpt;
use tokio::io::{stdin, AsyncReadExt};

Expand Down Expand Up @@ -37,13 +37,6 @@ enum Datasets {
},
/// Delete a dataset
Delete { name: String },
/// Trim a dataset
Trim {
name: String,

#[structopt(long)]
seconds: u64,
},
/// Ingest into a dataset from stdin.
Ingest {
name: String,
Expand Down Expand Up @@ -73,19 +66,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("{:?}", dataset);
}),
Datasets::Get { name } => println!("{:?}", client.datasets().get(&name).await?),
// Datasets::Info { name } => println!("{:?}", client.datasets().info(&name).await?),
Datasets::Update { name, description } => {
let dataset = client.datasets().update(&name, description).await?;
println!("{:?}", dataset);
}
Datasets::Delete { name } => client.datasets().delete(&name).await?,
Datasets::Trim { name, seconds } => println!(
"{:?}",
client
.datasets()
.trim(&name, Duration::from_secs(seconds))
.await?
),
Datasets::Ingest {
name,
content_type,
Expand All @@ -99,8 +84,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("{:?}", ingest_status);
}
Datasets::Query { apl } => {
let result = client.query(apl, None).await?;
println!("{:?}", result);
let result = client.query(&apl, None).await?;
for table in result.tables {
println!("{}:", table.name());

let rows_iter = table.iter();
let mut rows = Vec::with_capacity(rows_iter.size_hint().0);
for row in rows_iter {
let field_iter = row.iter();
let mut row_vec = Vec::with_capacity(field_iter.size_hint().0);
for field in field_iter {
row_vec.push(field.map_or_else(
|| "-".to_string(),
|v| serde_json::to_string(v).unwrap(),
));
}
rows.push(row_vec);
}

let mut fields = Vec::with_capacity(table.fields().len());
for field in table.fields() {
fields.push(field.name().to_string().cell().bold(true));
}

let t = rows.table().title(fields).bold(true);

let table_display = t.display().unwrap();
println!("{}", table_display);
}
}
},
Opt::Users(users) => match users {
Expand Down
75 changes: 7 additions & 68 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tracing::instrument;
use crate::{
annotations,
datasets::{
self, ContentEncoding, ContentType, IngestStatus, LegacyQuery, LegacyQueryOptions,
LegacyQueryResult, Query, QueryOptions, QueryParams, QueryResult,
self, ContentEncoding, ContentType, IngestStatus, Query, QueryOptions, QueryParams,
QueryResult,
},
error::{Error, Result},
http::{self, HeaderMap},
Expand Down Expand Up @@ -107,37 +107,14 @@ impl Client {
/// Executes the given query specified using the Axiom Processing Language (APL).
/// To learn more about APL, see the APL documentation at https://www.axiom.co/docs/apl/introduction.
#[instrument(skip(self, opts))]
pub async fn query<S, O>(&self, apl: S, opts: O) -> Result<QueryResult>
pub async fn query<S, O>(&self, apl: &S, opts: O) -> Result<QueryResult>
where
S: Into<String> + FmtDebug,
S: ToString + FmtDebug + ?Sized,
O: Into<Option<QueryOptions>>,
{
let (req, query_params) = match opts.into() {
Some(opts) => {
let req = Query {
apl: apl.into(),
start_time: opts.start_time,
end_time: opts.end_time,
cursor: opts.cursor,
include_cursor: opts.include_cursor,
};

let query_params = QueryParams {
no_cache: opts.no_cache,
save: opts.save,
format: opts.format,
};

(req, query_params)
}
None => (
Query {
apl: apl.into(),
..Default::default()
},
QueryParams::default(),
),
};
let opts: QueryOptions = opts.into().unwrap_or_default();
let query_params = QueryParams::from(&opts);
let req = Query::new(apl, opts);

let query_params = serde_qs::to_string(&query_params)?;
let path = format!("/v1/datasets/_apl?{query_params}");
Expand All @@ -157,44 +134,6 @@ impl Client {
Ok(result)
}

/// Execute the given query on the dataset identified by its id.
#[instrument(skip(self, opts))]
#[deprecated(
since = "0.6.0",
note = "The legacy query will be removed in future versions, use `apl_query` instead"
)]
pub async fn query_legacy<N, O>(
&self,
dataset_name: N,
query: LegacyQuery,
opts: O,
) -> Result<LegacyQueryResult>
where
N: Into<String> + FmtDebug,
O: Into<Option<LegacyQueryOptions>>,
{
let path = format!(
"/v1/datasets/{}/query?{}",
dataset_name.into(),
&opts
.into()
.map_or_else(|| Ok(String::new()), |opts| { serde_qs::to_string(&opts) })?
);
let res = self.http_client.post(path, &query).await?;

let saved_query_id = res
.headers()
.get("X-Axiom-History-Query-Id")
.map(|s| s.to_str())
.transpose()
.map_err(|_e| Error::InvalidQueryId)?
.map(std::string::ToString::to_string);
let mut result = res.json::<LegacyQueryResult>().await?;
result.saved_query_id = saved_query_id;

Ok(result)
}

/// Ingest events into the dataset identified by its id.
/// Restrictions for field names (JSON object keys) can be reviewed here:
/// <https://www.axiom.co/docs/usage/field-restrictions>.
Expand Down
29 changes: 2 additions & 27 deletions src/datasets/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#[allow(deprecated)]
use crate::{
datasets::model::{
Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info, TrimRequest, TrimResult,
},
datasets::model::{Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info},
error::{Error, Result},
http,
};
use std::{
convert::{TryFrom, TryInto},
fmt::Debug as FmtDebug,
result::Result as StdResult,
convert::TryFrom, fmt::Debug as FmtDebug, result::Result as StdResult,
time::Duration as StdDuration,
};
use tracing::instrument;
Expand Down Expand Up @@ -93,27 +89,6 @@ impl<'client> Client<'client> {
self.http_client.get("/v1/datasets").await?.json().await
}

/// Trim the dataset identified by its id to a given length.
/// The max duration given will mark the oldest timestamp an event can have.
/// Older ones will be deleted from the dataset.
/// The duration can either be a [`std::time::Duration`] or a
/// [`chrono::Duration`].
#[instrument(skip(self))]
#[allow(deprecated)]
pub async fn trim<N, D>(&self, dataset_name: N, duration: D) -> Result<TrimResult>
where
N: Into<String> + FmtDebug,
D: TryInto<Duration, Error = Error> + FmtDebug,
{
let duration = duration.try_into()?;
let req = TrimRequest::new(duration.into());
self.http_client
.post(format!("/v1/datasets/{}/trim", dataset_name.into()), &req)
.await?
.json()
.await
}

/// Update a dataset.
#[instrument(skip(self))]
pub async fn update<N, D>(&self, dataset_name: N, new_description: D) -> Result<Dataset>
Expand Down
Loading

0 comments on commit 5139218

Please sign in to comment.